# Fase 1.4: Optimización del modelo en HAR (Hailo Archive)

In [None]:
# Imports

import json
import os
import gc

import math
import numpy as np
import tensorflow as tf
from IPython.display import SVG
from matplotlib import patches
from matplotlib import pyplot as plt
from PIL import Image
from tensorflow.python.eager.context import eager_mode
import torch
import onnx
import onnxruntime as ort
from sklearn.metrics import confusion_matrix, accuracy_score, precision_recall_fscore_support
from itertools import islice

# Funciones y parámetros de la CNN base
import sys
sys.path.append('../src/')
import resbagan_networks
import resbagan_datasets

# import the hailo sdk client relevant classes
from hailo_sdk_client import ClientRunner, InferenceContext

## 1. Optimización detallada

### 1.1. Carga de datos

In [None]:
# Datos de entrada

batch_size = 100
B = 5
sizex = 32
sizey = 32

In [None]:
# Definir parámetros y cargar datos

DATASET='../data/imagenes_rios/oitaven_river.raw'
GT='../data/imagenes_rios/oitaven_river.pgm'
MODEL="../results/models/model_ResBaGAN.pt"

# Proporción de entrenamiento, validación y test
SAMPLES=[0.02,0.01]

# Carga de datos para la inferencia en el discriminador
dataset = resbagan_datasets.HyperDataset(
    "oitaven_river", segmented=False, patch_size=32, ratios=(SAMPLES[0], SAMPLES[1]))

# Almacenamos las dimensiones en variables
H = dataset.height
V = dataset.width

# En este caso seleccionamos samples aleatorios
samples = dataset.test_set['samples']

# Obtenemos el array de gt
truth = dataset.gt.flatten()
# Obtenemos un array de indices para test
test = dataset.test_index_list

# Obtenemos el numero de clases y el numero de clases no vacias para test
nclases = dataset.classes_count
nclases_no_vacias = 0
for i in range(nclases):
    clase_actual = i + 1
    if any(truth[idx] == clase_actual for idx in test):
        nclases_no_vacias += 1

print(dataset)

# The custom HyperDataset object contains all the train, validation and test data
#   --> But it will wrapped into a PyTorch data feeder for convenience
data_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=100,
    shuffle=True,
    num_workers=4,
    pin_memory=False,
)

In [None]:
# Cargar el modelo convertido a ONNX

ort_session = ort.InferenceSession("../results/models/model_ResBaGAN_discriminator.onnx")

# Cargar el modelo en HAR

model_name = "../results/models/model_ResBaGAN_discriminator"
hailo_model_har_name = f"{model_name}_hailo_model.har"
assert os.path.isfile(hailo_model_har_name), "Please provide valid path for HAR file"
runner = ClientRunner(har=hailo_model_har_name, hw_arch="hailo8l")

### 1.2 Evaluar el modelo en har sin optimización

In [None]:
# Comprobacion rapida de la diferencia de precisión

# Crear tensor de entrada de prueba
input_tensor = (torch.rand(1, B, sizex, sizey) * 2 - 1).to("cpu")
input_np = input_tensor.cpu().numpy()

# Realizar la inferencia en ONNX
output_onnx = ort_session.run(None, {'input': input_np})[0]

print(output_onnx)

# Realizar la inferencia en HAR
with runner.infer_context(InferenceContext.SDK_NATIVE) as ctx:
    input_har = np.transpose(input_np, (0, 2, 3, 1))

    # Realizar la inferencia en el modelo .har
    output_har = runner.infer(ctx, input_har)[0]

    print(output_har) 

# Comparar las diferencias
error = np.abs(output_onnx - output_har).mean()
print(f'Error medio entre ONNX y HAR: {error}')

In [None]:
# Evaluar el modelo en HAR sin optimizacion

# Modo evaluación
dataset.to_test()
data_iter = iter(data_loader)

total_error = 0
total_matches = 0
total_elements = 0

num_iters = 100

for i in range(num_iters):

    (inputs, labels, targets_pixel_level) = next(data_iter)
    
    inputs_np = inputs.numpy()

    # Inferencia para el modelo en ONNX
    outputs = ort_session.run(None, {'input': inputs_np})
    outputs_discriminator = outputs[0]
    outputs_discriminator[:, dataset.classes_count] = -math.inf # Se deshabilita la clase fake para test
    preds_onnx=np.argmax(outputs_discriminator, axis=1)

    # Inferencia para el modelo en HAR
    with runner.infer_context(InferenceContext.SDK_NATIVE) as ctx:
        inputs_har = np.transpose(inputs.numpy(), (0, 2, 3, 1))

        # Realizar la inferencia en el modelo .har
        native_res = runner.infer(ctx, inputs_har)[0]
        native_res[:, 0, 0, dataset.classes_count] = -math.inf # Se deshabilita la clase fake para test
        preds_har=np.argmax(native_res, axis=-1).squeeze()

    # Comparar la diferencia entre ONNX y har utilizando el error medio absoluto
    error = np.abs(preds_onnx - preds_har).mean()
    total_error += error
    
    # Comparacion exacta
    matches = np.sum(preds_onnx == preds_har)
    total_matches += matches
    total_elements += preds_onnx.size
    
    print(f"[Iteración {i+1}] Error medio: {error:.4f} ; Coincidencias: {matches}/{preds_onnx.size}")
    
# Resultados globales
mean_error = total_error / num_iters
accuracy = total_matches / total_elements

print(f"Resultados globales para {total_elements} predicciones ({num_iters} iteraciones):")
print(f"Error medio absoluto (promedio): {mean_error:.6f}")
print(f"Precisión global: {accuracy*100:.2f}%")

### 1.3 Aplicar modificaciones de optimización al modelo y evaluarlo

In [None]:
# Crear un model script para el proceso de optimización

model_script_lines = [
    # Add normalization layer with mean [123.675, 116.28, 103.53] and std [58.395, 57.12, 57.375])
    # "normalization1 = normalization([123.675, 116.28, 103.53], [58.395, 57.12, 57.375])\n",
    # For multiple input nodes:
    # {normalization_layer_name_1} = normalization([list of means per channel], [list of stds per channel], {input_layer_name_1_from_hn})\n',
    # {normalization_layer_name_2} = normalization([list of means per channel], [list of stds per channel], {input_layer_name_2_from_hn})\n',
    # ...
]

# Load the model script to ClientRunner so it will be considered on optimization
runner.load_model_script("".join(model_script_lines))
runner.optimize_full_precision()

In [None]:
# Evaluar el modelo en HAR con optimizacion

# Modo evaluación
dataset.to_test()
data_iter = iter(data_loader)

total_error = 0
total_matches = 0
total_elements = 0

num_iters = 100

for i in range(num_iters):

    (inputs, labels, targets_pixel_level) = next(data_iter)
    
    inputs_np = inputs.numpy()

    # Inferencia para el modelo en ONNX
    outputs = ort_session.run(None, {'input': inputs_np})
    outputs_discriminator = outputs[0]
    outputs_discriminator[:, dataset.classes_count] = -math.inf # Se deshabilita la clase fake para test
    preds_onnx=np.argmax(outputs_discriminator, axis=1)

    # Inferencia para el modelo en HAR
    with runner.infer_context(InferenceContext.SDK_FP_OPTIMIZED) as ctx:
        inputs_har = np.transpose(inputs.numpy(), (0, 2, 3, 1))

        # Realizar la inferencia en el modelo .har
        native_res = runner.infer(ctx, inputs_har)[0]
        native_res[:, 0, 0, dataset.classes_count] = -math.inf # Se deshabilita la clase fake para test
        preds_har=np.argmax(native_res, axis=-1).squeeze()

    # Comparar la diferencia entre ONNX y har utilizando el error medio absoluto
    error = np.abs(preds_onnx - preds_har).mean()
    total_error += error
    
    # Comparacion exacta
    matches = np.sum(preds_onnx == preds_har)
    total_matches += matches
    total_elements += preds_onnx.size
    
    print(f"[Iteración {i+1}] Error medio: {error:.4f} ; Coincidencias: {matches}/{preds_onnx.size}")
    
# Resultados globales
mean_error = total_error / num_iters
accuracy = total_matches / total_elements

print(f"Resultados globales para {total_elements} predicciones ({num_iters} iteraciones):")
print(f"Error medio absoluto (promedio): {mean_error:.6f}")
print(f"Precisión global: {accuracy*100:.2f}%")

### 1.4 Cuantizar el modelo y evaluarlo

In [None]:
# Crear un dataset de calibración
# The original images are being used, just as the input to the SDK_FP_OPTIMIZED emulator
total_images = 1050

dataset.to_train()
calib_dataset = np.zeros((total_images, sizex, sizey, B), dtype = np.float32)

count = 0

# Seleccionamos batches del dataloader para guardarlos en el dataset
for (inputs, labels, targets_pixel_level) in data_loader:
    for img in inputs:
        if count >= total_images:
            break
            
        # Los inputs son de la forma (batch_size, B, sizex, sizey)
        img_np = img.numpy()
        # Trasponemos los inputs a formato (batch_size, sizex, sizey, B)
        img_har = np.transpose(img_np, (1, 2, 0))

        calib_dataset[count] = img_har
        count += 1
        
    if count >= total_images:
        break

In [None]:
# Cuantizar el modelo con el dataset de calibración

# For calling Optimize, use the short version: runner.optimize(calib_dataset)
# A more general approach is being used here that works also with multiple input nodes.
# The calibration dataset could also be a dictionary with the format:
# {input_layer_name_1_from_hn: layer_1_calib_dataset, input_layer_name_2_from_hn: layer_2_calib_dataset}
hn_layers = runner.get_hn_dict()["layers"]
print("Input layers are: ")
print([layer for layer in hn_layers if hn_layers[layer]["type"] == "input_layer"])  # See available input layer names
calib_dataset_dict = {"model_ResBaGAN_discriminator/input_layer1": calib_dataset}  # In our case there is only one input layer

optimization_level = 4
compression_level = 3
# Mapeamos las proporciones de pesos de 4 bits según el nivel de compresión
compression_ratios = {
    0: 0.0,
    1: 0.2,
    2: 0.4,
    3: 0.6,
    4: 0.8,
    5: 1.0
}
auto_4bit_ratio = compression_ratios.get(compression_level, 0.0)

alls_lines = [
    # "normalization1 = normalization([123.675, 116.28, 103.53], [58.395, 57.12, 57.375])\n",
    # Batch size is 8 by default; 2 was used for stability on PCs with low amount of RAM / VRAM
    f"model_optimization_flavor(optimization_level={optimization_level}, compression_level={compression_level}, batch_size=8)\n",
    # The following line is needed because this is a really small model, and the compression_level is always reverted back to 0.'
    # To force using compression_level with small models, the following line should be used (compression level=4 equals to 80% 4-bit):
    f"model_optimization_config(compression_params, auto_4bit_weights_ratio={auto_4bit_ratio})\n",
    # The application of the compression could be seen by the [info] messages: "Assigning 4bit weight to layer .."
]

runner.load_model_script("".join(alls_lines))

runner.optimize(calib_dataset_dict)

In [None]:
# Evaluar el modelo en HAR cuantizado

# Modo evaluación
dataset.to_test()
data_iter = iter(data_loader)

total_error = 0
total_matches = 0
total_elements = 0

num_iters = 100

for i in range(num_iters):

    (inputs, labels, targets_pixel_level) = next(data_iter)
    
    inputs_np = inputs.numpy()

    # Inferencia para el modelo en ONNX
    outputs = ort_session.run(None, {'input': inputs_np})
    outputs_discriminator = outputs[0]
    outputs_discriminator[:, dataset.classes_count] = -math.inf # Se deshabilita la clase fake para test
    preds_onnx=np.argmax(outputs_discriminator, axis=1)

    # Inferencia para el modelo en HAR
    with runner.infer_context(InferenceContext.SDK_QUANTIZED) as ctx:
        inputs_har = np.transpose(inputs.numpy(), (0, 2, 3, 1))

        # Realizar la inferencia en el modelo .har
        native_res = runner.infer(ctx, inputs_har)[0]
        native_res[:, 0, 0, dataset.classes_count] = -math.inf # Se deshabilita la clase fake para test
        preds_har=np.argmax(native_res, axis=-1).squeeze()

    # Comparar la diferencia entre ONNX y har utilizando el error medio absoluto
    error = np.abs(preds_onnx - preds_har).mean()
    total_error += error
    
    # Comparacion exacta
    matches = np.sum(preds_onnx == preds_har)
    total_matches += matches
    total_elements += preds_onnx.size
    
    print(f"[Iteración {i+1}] Error medio: {error:.4f} ; Coincidencias: {matches}/{preds_onnx.size}")
    
# Resultados globales
mean_error = total_error / num_iters
accuracy = total_matches / total_elements

print(f"Resultados globales para {total_elements} predicciones ({num_iters} iteraciones):")
print(f"Error medio absoluto (promedio): {mean_error:.6f}")
print(f"Precisión global: {accuracy*100:.2f}%")

In [None]:
# Guardar el modelo cuantizado
# Let's save the runner's state to a Quantized HAR
quantized_model_har_path = f"{model_name}_quantized_model_o{optimization_level}_c{compression_level}.har"
runner.save_har(quantized_model_har_path)