<h1> <FONT COLOR=""> Quantization and benchmarking of deep learning models using ONNX Runtime and STM32Cube.AI Developer Cloud : </h1>
    
    


<p>
The process of quantization involves the convertion the original floating-point parameters and intermediate activations of a model into lower precision integer representations. This reduction in precision can significantly decrease the memory footprint and computational cost of the model, making it more efficient to deploy on STM32 board using STM32Cube.AI or any other resource-constrained devices.

ONNX Runtime Quantization is a feature the ONNX Runtime that allows efficient execution of quantized models. It provides tools and techniques to quantize the ONNX format models. It includes methods for quantizing weights and activations.


**This notebook demonstrates the process of static post-training quantization for deep learning models using the ONNX runtime. It covers the model quantization with calibration dataset or with fake data, the evaluation of the full precision model and the quantized model, and then the STM32Cube.AI Developer Cloud is used to benchmark the models and to generate the model C code to be deployed on your STM32 board.** 
</p>

## License of the Jupyter Notebook

This software component is licensed by ST under BSD-3-Clause license,
the "License"; 

You may not use this file except in compliance with the
License. 

You may obtain a copy of the License at: https://opensource.org/licenses/BSD-3-Clause

Copyright (c) 2023 STMicroelectronics. All rights reserved

<div style="border-bottom: 3px solid #273B5F">
<h2>Table of content</h2>
<ul style="list-style-type: none">
  <li><a href="#settings">1. Settings</a>
  <ul style="list-style-type: none">
    <li><a href="#install">1.1 Install and import necessary packages</a></li>
    <li><a href="#select">1.2 Select input model filename and dataset folder</a></li>
  </ul>
</li>
<li><a href="#quantization">2.Quantization</a></li>
      <ul style="list-style-type: none">
    <li><a href="#opset">2.1 Opset conversion</a></li>
    <li><a href="#dataset">2.2 Creating calibration dataset</a></li>
    <li><a href="#quantize">2.3 Quantize the model using QDQ quantization to int8 weights and activations</a></li>
  </ul>
<li><a href="#Model validation">3. Model validation </a></li>
<li><a href="#benchmark_both">4. Benchmarking the Models on the STM32Cube.AI Developer Cloud</a></li>
      <ul style="list-style-type: none">
    <li><a href="#proxy">4.1 Proxy setting and connection to the STM32Cube.AI Developer Cloud</a></li>
    <li><a href="#Benchmark_both">4.2 Benchmark the models on a STM32 target</a></li>
    <li><a href="#generate">4.2 Generate the model optimized C code for STM32</a></li>
         

  </ul>
</ul>
</div>




<div id="settings">
    <h2>1. Settings</h2>
</div>


<div id="install">
    <h3>1.1 Install and import necessary packages </h3>
</div>

In [None]:
import sys
!{sys.executable} -m pip install numpy==1.23.5
!{sys.executable} -m pip install onnxruntime==1.13.1
!{sys.executable} -m pip install onnx==1.12.0
!{sys.executable} -m pip install Pillow==9.4.0
!{sys.executable} -m pip install tensorflow==2.8.3 
!{sys.executable} -m pip install scikit-learn
!{sys.executable} -m pip install tqdm
!{sys.executable} -m pip install matplotlib

# for the cloud service
!{sys.executable} -m pip install gitdir

In [None]:
import glob
import os
import random
import shutil

import numpy as np 
import tensorflow as tf
from datetime import datetime
from tqdm import tqdm
from typing import Tuple, Optional, List, Dict

import onnx
import onnxruntime
from onnx import version_converter
from onnxruntime import quantization
from onnxruntime.quantization import (CalibrationDataReader, CalibrationMethod,
                                      QuantFormat, QuantType, quantize_static)


<div id="select">
    <h3>1.2 Select input model filename and dataset folder</h3>
</div>

The code section below is to set the paths of the model and the dataset for the following notebook, the model is expected to be in Open Neural Network Exchange (ONNX) format, in the conducted experience we are using the mobilenet_v2_0.35_128 model as an exemple with the modified version of COCO2014 dataset. To find more details please visit this [link](https://pjreddie.com/projects/coco-mirror/). 

The quantization set is a directory containing a sub-directory per class, For instance:

```bash
 quantization_set/
 ..class_a:person/
 ....a_image_1.jpg
 ....a_image_2.jpg
 ..class_b:not_person/
 ....b_image_1.jpg
 ....b_image_2.jpg

```

To ensure proper quantization, ``quantization_dataset_path`` must point to the quantization set or the training set to create the calibration dataset later.

For fake quantization, set ``quantization_dataset_path`` to ``None``.

In [None]:
input_model ="models/mobilenet_v2_128_0.5.onnx"
quantization_dataset_path=os.path.join("path/to/quantization_set")
#quantization_dataset_path=None



<div id="quantization">
    <h2>2. Quantization</h2>
</div>

<div id="opset">
    <h3>2.1. Opset conversion  </h3>
</div>

In this section, we are upgrading the model's opset to version 15 to take advantage of advanced optimizations such as Batch normalization folding and ensure compatibility with the latest versions of ONNX and ONNX runtime. To do this, we run the code below.

To ensure compatibility between the ONNX runtime version and the opset number, please refer to [the official documentation of ONNX Runtime](https://onnxruntime.ai/docs/reference/compatibility.html).

In [None]:
def change_opset(input_model: str, new_opset: int) -> str:
    """
    Converts the opset version of an ONNX model to a new opset version.

    Args:
        input_model (str): The path to the input ONNX model.
        new_opset (int): The new opset version to convert the model to.

    Returns:
        str: The path to the converted ONNX model.
    """
    if not input_model.endswith('.onnx'):
        raise Exception("Error! The model must be in onnx format")    
    model = onnx.load(input_model)
    # Check the current opset version
    current_opset = model.opset_import[0].version
    if current_opset == new_opset:
        print(f"The model is already using opset {new_opset}")
        return input_model

    # Modify the opset version in the model
    converted_model = version_converter.convert_version(model, new_opset)
    temp_model_path = input_model+ '.temp'
    onnx.save(converted_model, temp_model_path)

    # Load the modified model using ONNX Runtime Check if the model is valid
    session = onnxruntime.InferenceSession(temp_model_path)
    try:
        session.get_inputs()
    except Exception as e:
        print(f"An error occurred while loading the modified model: {e}")
        return

    # Replace the original model file with the modified model
    os.replace(temp_model_path, input_model)
    print(f"The model has been converted to opset {new_opset} and saved at the same location.")
    return input_model

change_opset(input_model, new_opset=15)

<div id="dataset">
    <h3> 2.2 Creating the calibration dataset </h3>
</div>

During ONNX runtime quantization, the model is run on the calibration data to provide statistics about the dynamic and characteristics of each input and output. These statistics are then used to determine the main quantization parameters, which are the scale factor and a zero-point or offset to map the floating-point values to integers.

The next three code sections below contain:

* The `create_calibration_dataset` function to create the calibration set from the original directory by taking a specific number of samples from each class, and the `preprocess_image_batch` function to load the batch and process it.
* The `preprocess_random_images` function to generate random images for fake quantization and preprocess them.
* The `ImageNetDataReader` class that inherits from the ONNX Runtime calibration data readers and implements the `get_next` method to generate and provide input data dictionaries for the calibration process.

**Note:** Using a different normalization method during quantization than during training can affect the scale of the data and lead to a loss of accuracy in the quantized model. For example, if you used TensorFlow's normalization method during training, where the data is scaled by dividing each pixel value by 255.0, you should also use this method during quantization. Similarly, if you used Torch's normalization method during training, where the data is scaled by subtracting the mean and dividing by the standard deviation, you should also use this method during quantization.

Using the same normalization method for both training and quantization ensures that the quantized model retains the accuracy achieved during training. Therefore, it is important to pay attention to the normalization method used during both training and quantization to ensure the best possible accuracy for your model.

To align the preprocessing of the quantization dataset in the section below with the preprocessing of the trained model, adjust the arguments `color_mode`, `interpolation`, and `norm` for normalization.

In [None]:
def create_calibration_dataset(dataset_path: str, samples_per_class: Optional[int] = 100) -> str:
    """
    Creates a calibration dataset for use in quantizing a machine learning model.

    Args:
        dataset_path (str): The path to the original dataset.
        samples_per_class (int, optional): The number of images to include per class in the calibration dataset. Defaults to 100.

    Returns:
        str: The path to the calibration dataset.
    """
    # the calibration dataset will be find in under the same directory as the dataset 
    calibration_dataset_path = os.path.join(os.path.dirname(dataset_path), 'calibration_' + os.path.basename(dataset_path))
    # List directories
    dir_list = next(os.walk(dataset_path))[1]

    # Create the target directory if it doesn't exist
    if not os.path.exists(calibration_dataset_path):
        os.makedirs(calibration_dataset_path)

    # For each directory, create a new directory in the target directory
    for dir_i in tqdm(dir_list):
        img_list = glob.glob(os.path.join(dataset_path, dir_i, '*.jpg')) + \
                   glob.glob(os.path.join(dataset_path, dir_i, '*.png')) + \
                   glob.glob(os.path.join(dataset_path, dir_i, '*.jpeg'))

        # Shuffle the data
        random.shuffle(img_list)

        # Copy a subset of images to the target directory
        for j in range(min(samples_per_class, len(img_list))):
            shutil.copy2(img_list[j], calibration_dataset_path)
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print(current_time + ' - ' + f'Done creating calibration dataset.')
    return calibration_dataset_path


def preprocess_image_batch(images_folder: str, height: int, width: int, size_limit: int = 0) -> np.ndarray:
    """
    Loads a batch of images and preprocess them
    :param images_folder: path to folder storing images
    :param height: image height in pixels
    :param width: image width in pixels
    :param size_limit: number of images to load. Default is 0 which means all images are picked.
    :return: list of matrices characterizing multiple images
    """
    TORCH_MEANS = [0.485, 0.456, 0.406]
    TORCH_STD = [0.224, 0.224, 0.224]

    interpolation = 'nearest'
    color_mode = 'rgb'
    norm = 'tf'

    image_names = os.listdir(images_folder)
    if size_limit > 0 and len(image_names) >= size_limit:
        batch_filenames = [image_names[i] for i in range(size_limit)]
    else:
        batch_filenames = image_names
    unconcatenated_batch_data = []

    for image_name in batch_filenames:
        image_filepath = os.path.join(images_folder, image_name)
        img = tf.keras.utils.load_img(image_filepath, grayscale=False, color_mode=color_mode, target_size=(width, height), interpolation=interpolation)
        img_array = np.array([tf.keras.utils.img_to_array(img)])
        if norm.lower() == 'tf':
            img_array = -1 + img_array / 127.5
        elif norm.lower() == 'torch':
            img_array = img_array / 255.0
            img_array = img_array - TORCH_MEANS
            img_array = img_array / TORCH_STD
        # transpose the data (hwc to chw) to be conform to the expected input data representation
        img_array = img_array.transpose((0, 3, 1, 2))
        unconcatenated_batch_data.append(img_array)
    batch_data = np.stack(unconcatenated_batch_data, axis=0)
    return batch_data

In [None]:
def preprocess_random_images(height: int, width: int, channel: int, size_limit: int = 400) -> np.ndarray:
    """
    Loads a batch of random images and preprocess them
    :param height: Image height in pixels.
    :param width: Image width in pixels.
    :param channel: Number of channels in the image.
    :param size_limit: Number of images to generate. Default is 400.
    :return: List of matrices characterizing multiple images.
    """
    unconcatenated_batch_data = []
    for i in range(size_limit):
        random_vals = np.random.uniform(0, 1, channel*height*width).astype('float32')
        random_image = random_vals.reshape(1, channel, height, width)
        unconcatenated_batch_data.append(random_image)
        batch_data = np.concatenate(np.expand_dims(unconcatenated_batch_data, axis=0), axis=0) 
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print(current_time + ' - ' + 'Random dataset with {} random images.'.format(size_limit))
    return batch_data

In [None]:
class ImageNetDataReader(CalibrationDataReader):
    def __init__(self, calibration_image_folder: str, model_path: str):
        # Use inference session to get input shape
        session = onnxruntime.InferenceSession(model_path, None)
        (_, channel, height, width) = session.get_inputs()[0].shape

        # Convert image to input data
        # Set input normalization based on training normalization 
        if calibration_image_folder:
            self.nhwc_data_list = preprocess_image_batch(
                calibration_image_folder, height, width, size_limit=0
            )
        else:
            self.nhwc_data_list = preprocess_random_images(
                height, width, channel
            )

        self.input_name = session.get_inputs()[0].name
        self.datasize = len(self.nhwc_data_list)

        self.enum_data = None  # Enumerator for calibration data

    def get_next(self):
        if self.enum_data is None:
            # Create an iterator that generates input dictionaries
            # with input name and corresponding data
            self.enum_data = iter(
                [{self.input_name: nhwc_data} for nhwc_data in self.nhwc_data_list]
            )
        
        return next(self.enum_data, None)  # Return next item from enumerator

    def rewind(self):
        self.enum_data = None  # Reset the enumeration of calibration data

In [None]:
class ImageNetDataReader(CalibrationDataReader):
    """
    A class used to read calibration data for a given model.

    Attributes
    ----------
    calibration_image_folder : str
        The path to the folder containing calibration images
    model_path : str
        The path to the ONNX model file

    Methods
    -------
    get_next() -> Dict[str, List[float]]
        Returns the next item from the enumerator
    rewind() -> None
        Resets the enumeration of calibration data
    """

    def __init__(self, calibration_image_folder: str, model_path: str) -> None:
        """
        Initializes the ImageNetDataReader class.

        Parameters
        ----------
        calibration_image_folder : str
            The path to the folder containing calibration images
        model_path : str
            The path to the ONNX model file
        """

        # Use inference session to get input shape
        session = onnxruntime.InferenceSession(model_path, None)
        (_, channel, height, width) = session.get_inputs()[0].shape

        # Convert image to input data
        # Set input normalization based on training normalization 
        if calibration_image_folder:
            self.nhwc_data_list = preprocess_image_batch(
                calibration_image_folder, height, width, size_limit=0
            )
        else:
            self.nhwc_data_list = preprocess_random_images(
                height, width, channel
            )

        self.input_name = session.get_inputs()[0].name
        self.datasize = len(self.nhwc_data_list)

        self.enum_data = None  # Enumerator for calibration data

    def get_next(self) -> Dict[str, List[float]]:
        """
        Returns the next item from the enumerator.

        Returns
        -------
        Dict[str, List[float]]
            A dictionary containing the input name and corresponding data
        """

        if self.enum_data is None:
            # Create an iterator that generates input dictionaries
            # with input name and corresponding data
            self.enum_data = iter(
                [{self.input_name: nhwc_data} for nhwc_data in self.nhwc_data_list]
            )
        
        return next(self.enum_data, None)  # Return next item from enumerator

    def rewind(self) -> None:
        """
        Resets the enumeration of calibration data.
        """

        self.enum_data = None  # Reset the enumeration of calibration data

<div id="quantize">
    <h3> 2.3 Quantize the model using QDQ quantization to int8 weights and activations </h3>
</div>

The following section quantize the float32 onnx model to int8 quantized onnx model after the preprocessing to prepare it to the qunatization by using the ``quantize_static`` function that we recommand to use with calibration data and with the following supported arguments setting.


<table>
<tr>
<th style="text-align: left">Argument</th>
<th style="text-align: left">Description /  CUBE.AI recommendation</th>
</tr>
    
<tr><td style="text-align: left">Quant_format </td>
<td style="text-align: left"> <p> QuantFormat.QDQ format: <strong>recommended</strong>, it quantizes the model by inserting QuantizeLinear/DeQuantizeLinear on the tensor. QOperator format: <strong> not recommended </strong>, it quantizes the model with quantized operators directly </p> </td></tr>
<tr><td style="text-align: left"> Activation type</td> 
<td style="text-align: left"> <p> QuantType.QInt8: <strong>recommended</strong>, it quantizes the activations to int8.  QuantType.QUInt8: <strong>not recommended</strong>, to quantize the activations uint8 </p> </td></tr>  
<tr><td style="text-align: left">Weight_type </td> 
<td style="text-align: left"> <p> QuantType.QInt8: <strong>recommended</strong> , it quantizes the weights to int8.  QuantType.QUInt8: <strong>not recommended</strong>, it quantizes the weights to uint8</p> </td></tr> 
<tr><td style="text-align: left">Per_Channel</td>
<td style="text-align: left"> <p>True: <strong>recommended</strong>, it makes the quantization process is carried out individually and separately for each channel based on the characteristics of the data within that specific channel / False: supported and <strong>not recommended</strong>, the quantization process is carried out for each tensor </p> </td>
</tr>
<tr><td style="text-align: left">ActivationSymmetric</td>
<td style="text-align: left"> <p>False: <strong>recommended</strong> it makes the activations in the range of [-128  +127]. True: supported, it makes the  activations in the range of [-128  +127] with the zero_point=0 </p> </td>
</tr>
<tr>
<td style="text-align: left">WeightSymmetric</td>
<td style="text-align: left"> <p>True: <strong>Highly recommended</strong>, it makes the weights in the range of [-127  +127] with the zero_point=0.  False: supported and <strong>not recommended</strong>, it makes the weights in the range of [-128  +127]</p> </td>
</tr>
<td style="text-align: left">reduce_range</td>
<td style="text-align: left"> <p>True: <strong>Highly recommended</strong>, it quantizes the weights in 7-bits. It may improve the accuracy for some models, especially for per-channel mode</p> </td>
</tr> 
</table>

In [None]:
if quantization_dataset_path is not None:
    calibration_dataset_path = create_calibration_dataset(quantization_dataset_path, samples_per_class=200)
else:
    calibration_dataset_path = None

# Set the data reader pointing to the representative dataset
print('Prepare the data reader for the representative dataset...')
dr = ImageNetDataReader(calibration_dataset_path, input_model) 
print('The data reader is ready.')

# Preprocess the model to infer shapes of each tensor
infer_model = os.path.splitext(input_model)[0] + '_infer' + os.path.splitext(input_model)[1]
print('Infer for the model: {}...'.format(os.path.basename(input_model)))
quantization.quant_pre_process(input_model_path=input_model, output_model_path=infer_model, skip_optimization=False)

# Prepare quantized ONNX model filename
if calibration_dataset_path is not None:
    quant_model = os.path.splitext(input_model)[0] + '_QDQ_quant' + os.path.splitext(input_model)[1]
else:
    quant_model = os.path.splitext(input_model)[0] + '_QDQ_fakequant' + os.path.splitext(input_model)[1]
print('Quantize the model {}, please wait...'.format(os.path.basename(input_model)))

quantize_static(
        infer_model,
        quant_model,
        dr,
        calibrate_method=CalibrationMethod.MinMax, 
        quant_format=QuantFormat.QDQ,
        per_channel=True,
        weight_type=QuantType.QInt8, 
        activation_type=QuantType.QInt8, 
        optimize_model=False,
        reduce_range=True,
        extra_options={'WeightSymmetric': True, 'ActivationSymmetric': False})

now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print(current_time + ' - ' + '{} model has been created.'.format(os.path.basename(quant_model)))
quantized_session = onnxruntime.InferenceSession(quant_model)


<div id="validation">
        <h2> 3. Model validation </h2>
</div>

The following code section includes functions to evaluate the models on the validation dataset. It's important to note that the preprocessing of the evaluation dataset should match the preprocessing of the data during training and quantization. Therefore, make sure to adjust the arguments ``color_mode``, ``interpolation``, and ``norm`` to correspond to your preprocessing during the training scenario.

In [None]:
from onnx import ModelProto
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt

def get_preprocessed_image(image_path: str, height: int, width: int, grayscale: bool, color_mode: str, interpolation: str, norm: str) -> np.ndarray:
    """
    Preprocesses an image for input to a neural network.

    Args:
        image_path (str): The path to the image file.
        height (int): The desired height of the image.
        width (int): The desired width of the image.
        grayscale (bool): Whether to convert the image to grayscale.
        color_mode (str): The color mode of the image ('rgb' or 'rgba').
        interpolation (str): The interpolation method to use when resizing the image.
        norm (str): The normalization method to use ('tf' or 'torch').

    Returns:
        np.ndarray: The preprocessed image as a numpy array.
    """
    TORCH_MEANS = [0.485,0.456,0.406]
    TORCH_STD = [0.224, 0.224, 0.224]

    img = tf.keras.utils.load_img(image_path, grayscale=grayscale , color_mode = color_mode,
     target_size = (width,height), interpolation=interpolation)
    img_array = np.array([tf.keras.utils.img_to_array(img)])
    if norm.lower() == 'tf':
        img_array = -1 + img_array / 127.5
    elif norm.lower() == 'torch':
        img_array = img_array / 255.0
        img_array = img_array - TORCH_MEANS
        img_array= img_array/ TORCH_STD
    img_array = img_array.transpose((0,3,1,2))
    return img_array

def predict_onnx(sess: ModelProto, data: np.ndarray) -> np.ndarray:
    """
    Runs inference on an ONNX model.

    Args:
        sess (ModelProto): The ONNX model.
        data (np.ndarray): The input data for the model.

    Returns:
        np.ndarray: The model's predictions.
    """
    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    onx_pred = sess.run([label_name], {input_name: data.astype(np.float32)})[0]
    return onx_pred

def plot_confusion_matrix(cm: np.ndarray, class_labels: List[str], model_name: str, val_accuracy: float = None) -> None:
    """
    Plots a confusion matrix.

    Args:
        cm (np.ndarray): The confusion matrix.
        class_labels (List[str]): The labels for the classes.
        model_name (str): The name of the model.
        val_accuracy (float, optional): The validation accuracy of the model. Defaults to None.
    """
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    fig, ax = plt.subplots(figsize=(6, 6))
    im = ax.imshow(cm_normalized, interpolation='nearest', cmap=plt.cm.Blues)
    cbar = ax.figure.colorbar(im, ax=ax, pad=0.1)

    # Show all ticks
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           xticklabels=class_labels, yticklabels=class_labels,
           title=f'Model Accuracy: {val_accuracy} %',
           ylabel='True label',
           xlabel='Predicted label')

    # Rotate the tick labels and set their alignment.
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")

    # Loop over data dimensions and create text annotations.
    fmt = '.2f'
    thresh = cm_normalized.max() / 2.
    for i in range(cm_normalized.shape[0]):
        for j in range(cm_normalized.shape[1]):
            ax.text(j, i, format(cm_normalized[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm_normalized[i, j] > thresh else "black")

    fig.tight_layout()
    plt.savefig(f'outputs/{model_name}_confusion-matrix.png')
    plt.show()

In [None]:
def evaluate_onnx_model(onnx_model_path: str, val_dir: str, model_name: str, interpolation: str = 'bilinear') -> Tuple[float, np.ndarray]:
    """
    Evaluates an ONNX model on a validation dataset.

    Args:
        onnx_model_path (str): The path to the ONNX model.
        val_dir (str): The path to the validation dataset.
        model_name (str): The name of the model.
        interpolation (str, optional): The interpolation method to use when resizing images. Defaults to 'bilinear'.

    Returns:
        Tuple[float, np.ndarray]: The validation accuracy and confusion matrix.
    """
    onx = ModelProto()
    with open(onnx_model_path, mode='rb') as f:
        content = f.read()
        onx.ParseFromString(content)
    sess = onnxruntime.InferenceSession(onnx_model_path)
    (_, _, img_height, img_width) = sess.get_inputs()[0].shape
    gt_labels = []
    prd_labels = np.empty((0))
    class_labels = sorted(os.listdir(val_dir))
    for i in range(len(class_labels)):
        class_label = class_labels[i]
        
        for file in os.listdir(os.path.join(val_dir, class_label)):
            gt_labels.append(i)
            image_path = os.path.join(val_dir, class_label, file)
            # don't forget to adapt the preprocessing schema
            img = get_preprocessed_image(image_path, width=img_width, height=img_height, 
                                          grayscale=False, color_mode='rgb',
                                          interpolation=interpolation, norm='tf')
            # predicting the results on the batch
            pred = predict_onnx(sess, img).argmax(axis=1)
            prd_labels = np.concatenate((prd_labels, pred))

    val_acc = round(accuracy_score(gt_labels, prd_labels) * 100, 2)
    print(f'Evaluation Top 1 accuracy: {val_acc} %')
    if not os.path.exists("outputs"):
        os.makedirs("outputs")
    log_file_name = "outputs/" + model_name + ".log"
    with open(log_file_name, 'a') as f:
        f.write("Evaluation Top 1 accuracy: {} %\n".format(val_acc))
    val_cm = confusion_matrix(gt_labels, prd_labels)
    plot_confusion_matrix(val_cm, class_labels, model_name, val_accuracy=val_acc)
    
    return val_acc, val_cm

**Float model validation:**

We evaluate the full precision model to provide a baseline measure of the model's accuracy in its original form when weights, activations, and computations are represented as 32-bit floating-point numbers without any quantization applied.

To evaluate the float model, set the `val_set` variable to the path of the evaluation dataset, `input_model` to the path of the float model and the `model_name`. For example:

In [None]:
val_set = "path/to/quantization_set"
input_model = "models/mobilenet_v2_128_0.5.onnx"
model_name = 'mobilenet_v2_128_0.5'
evaluate_onnx_model(input_model, val_set, model_name)

**Quantized model validation:**

To evaluate the quantized model, set the  `quantized_model_path` to the path of the quantized model and the `model_name`.


In [None]:
input_model = "models/mobilenet_v2_128_0.5_QDQ_quant.onnx"
model_name = 'mobilenet_v2_128_0.5_QDQ_quant'
evaluate_onnx_model(input_model, val_set, model_name)

<div id="benchmark">
        <h2> 4. Benchmarking the Models on the STM32Cube.AI Developer Cloud</h2>
</div>

In this section, we use the [STM32Cube.AI Developer Cloud](https://stedgeai-dc.st.com/home) to optimize and benchmark a quantized neural network on an **STM32** target and generate its code for deployment.

<div id="proxy">
        <h3> 4.1 Proxy Settings and Connection to the STM32Cube.AI Developer Cloud</h3>
</div>

If you are behind a proxy, you can uncomment and fill in the following proxy settings.

**Note:** If the password contains special characters such as `@`, `:`, etc., they need to be URL-encoded with their ASCII values.

In [None]:
# import os
# os.environ['http_proxy'] = "http://user:passwd@ip_address:port"
# os.environ['https_proxy'] = "https://user:passwd@ip_address:port"
# And eventually disable SSL verification
# os.environ['NO_SSL_VERIFY'] = "1"


To successfully connect to the [STM32Cube.AI Developer Cloud](https://stedgeai-dc.st.com/home) you need to `gitdir` the [`STM32AI Python interface`](https://github.com/STMicroelectronics/stm32ai-modelzoo_services/tree/main/common/stm32ai_dc).

In [None]:
# Get STM32Cube.AI Developer Cloud
!gitdir https://github.com/STMicroelectronics/stm32ai-modelzoo_services/tree/main/common/stm32ai_dc

# Reorganize local folders
if os.path.exists('./stm32ai_dc'):
    shutil.rmtree('./stm32ai_dc')
shutil.move('./common/stm32ai_dc', './stm32ai_dc')
shutil.rmtree('./common')

In [None]:
import sys 
sys.path.append(os.path.abspath('stm32ai'))
os.environ['STATS_TYPE'] = 'jupyter_devcloud'

from stm32ai_dc import (CliLibraryIde, CliLibrarySerie, CliParameters,
                        CloudBackend, Stm32Ai)
from stm32ai_dc.errors import BenchmarkServerError

Create an account on **myST** and then sign in to [STM32Cube.AI Developer Cloud](https://stedgeai-dc.st.com/home) to be able access the service and then set the environment variables below with your credentials; the mail adress should be set as a string in username and a popup will appear to enter the password.

In [None]:
import getpass

username ='xxx.yyy@st.com'
os.environ['stmai_username'] = username
print('Enter you password')
password = getpass.getpass()
os.environ['stmai_password'] = password
os.environ['NO_SSL_VERIFY'] = "1"

In [None]:
#Log in STM32Cube.AI Developer Cloud 
try:
    stmai = Stm32Ai(CloudBackend(str(username), str(password)))
    print("Successfully Connected!")
except Exception as e:
    print("ERROR: ", e)

<div id="benchmark_both">
        <h3> 4.2 Benchmark the models on a STM32 target</h3>
</div>

Then, run the code section below for later usage for the benchmark 

In [None]:
def analyze_footprints_and_inference_time(report: object, model_name: str, board_name: str) -> None:
    """
    Analyzes the inference time of a STM32Cube.AI model and saves the results in a log file.

    Args:
        report (object): The report object containing the inference time information.
        model_name (str): The name of the model being analyzed.
        board_name (str): The name of the board on which the model is being analyzed.

    Returns:
        None
    """
    activations_ram = report.ram_size / 1024
    weights_rom = report.rom_size / 1024
    macc = report.macc / 1e6
    cycles = report.cycles
    inference_time = report.duration_ms
    fps = 1000.0/inference_time

    print("[INFO] : Benchmarking the model on the {} board\n".format(board_name))
    print("[INFO] : MACCs : {} (M)".format(macc))
    print("[INFO] : Flash Weights  : {0:.1f} (KiB)".format(weights_rom))
    print("[INFO] : RAM Activations : {0:.1f} (KiB)".format(activations_ram))
    print("[INFO] : Number of cycles : {} ".format(cycles))
    print("[INFO] : Inference Time : {0:.1f} (ms)".format(inference_time))
    print("[INFO] : FPS : {0:.1f}".format(fps))

    # Writing to log file
    model_name_without_extension = model_name.replace(".onnx", "")
    log_file_name = "outputs/" + model_name_without_extension + ".log"
    with open(log_file_name, 'a') as f:
        f.write("[INFO] : Benchmarking the model on the {} board\n".format(board_name))
        f.write("[INFO] : Model Name : {}\n".format(model_name))
        f.write("[INFO] : MACCs : {} (M)\n".format(macc))
        f.write("[INFO] : Flash Weights  : {0:.1f} (KiB)\n".format(weights_rom))
        f.write("[INFO] : RAM Activations : {0:.1f} (KiB)\n".format(activations_ram))
        f.write("[INFO] : Number of cycles : {}\n".format(cycles))
        f.write("[INFO] : Inference Time : {0:.1f} (ms)\n".format(inference_time))
        f.write("[INFO] : FPS : {0:.1f}\n".format(fps))

**Benchmark the float model:** 

The next step to benchmark the model is to upload the model on STM32Cube.AI Developer Cloud by running the code below

The code above is used to upload the model to the STM32Cube.AI Developer Cloud and benchmark it on a specific STM32 board. The `model_path` variable specifies the path to the ONNX model file. The `board_name` variable specifies the name of the STM32 board on which the model will be benchmarked. 

Then the model is benchmarked on the specified board and generate a report of the inference time and memory footprint. The following table lists the available options the **8.1.0** of STM32Cube.AI and their descriptions for the benchmark on the STM32 boards:
<table>
<tr>
<th style="text-align: left">Option</th>
<th style="text-align: left">Description /  CUBE.AI recommendation</th>

</tr>
<tr>
    
    
<td style="text-align: left">model</td>
<td style="text-align: left">model name corresponding to the file name uploaded</td>
</tr>
    
<tr>
<td style="text-align: left">optimization</td>
<td style="text-align: left">optimization setting "balanced", "time" or "ram"</td>
</tr>
    
<tr>
<td style="text-align: left">allocateInputs</td>
<td style="text-align: left"><strong>recommended</strong>, activations buffer will be also used to handle the input buffers.True by default</td>
</tr>
 
<tr>
<td style="text-align: left">allocateOutputs</td>
<td style="text-align: left"><strong>recommended</strong>, activations buffer will be also used to handle the output buffers. True by default</td>
</tr>

<tr>
<td style="text-align: left">relocatable</td>
<td style="text-align: left"><strong>recommended</strong>, to generate a relocatable binary model. '--binary' option can be used to have a separate binary file with only the data of the weight/bias tensors. True by default</td>
</tr>

<tr>
<td style="text-align: left">noOnnxOptimizer</td>
<td style="text-align: left"><strong>not recommended</strong>, allows to disable the ONNX optimizer pass. "False" by default. Apply only to ONNX file will be ignored otherwise</td>
</tr>

<tr>
<td style="text-align: left">noOnnxIoTranspose</td>
<td style="text-align: left"> <strong>recommended only if</strong> the onnx model has already IO transpose layers to make it expect channel last data, allows to avoid adding a specific transpose layer during the import of a ONNX model, "False" by default. Apply only to ONNX file will be ignored otherwise</td>
</tr>
    
</table>

In [None]:
model_path = "models/mobilenet_v2_128_0.5.onnx"
model_name = os.path.basename(model_path)
try:
  stmai.upload_model(model_path)
  print(f'Model {model_name} is uploaded !\n')
except Exception as e:
    print("ERROR: ", e)
    
board_name = 'STM32H747I-DISCO'
result = stmai.benchmark(CliParameters(model=model_name,
                                       optimization='balanced',
                                       allocateInputs=True,
                                       allocateOutputs=True,
                                       noOnnxIoTranspose=False,
                                       fromModel=model_name),
                                       board_name=board_name)

analyze_footprints_and_inference_time(report=result, model_name=model_name, board_name=board_name)


**Benchmark the int8 model:**

Upload the model on STM32Cube.AI Developer Cloud and benchmark it by running the code below.


In [None]:
model_path = "models/mobilenet_v2_128_0.5_QDQ_quant.onnx"
model_name = os.path.basename(model_path)
try:
  stmai.upload_model(model_path)
  print(f'Model {model_name} is uploaded !\n')
except Exception as e:
    print("ERROR: ", e)
    
board_name = 'STM32H747I-DISCO'
result = stmai.benchmark(CliParameters(model=model_name,
                                       optimization='balanced',
                                       allocateInputs=True,
                                       allocateOutputs=True,
                                       noOnnxIoTranspose=False,
                                       fromModel=model_name),
                                       board_name=board_name)

analyze_footprints_and_inference_time(report=result, model_name=model_name, board_name=board_name)

Please run the next two code sections to compare the float model and the int8 model. The code will plot a figure that compares the two models.

In [None]:
import re


def compare_models(log_file_float: str, log_file_int8: str) -> None:
    """
    Generates a comparison graph of two models on various metrics.

    Args:
        log_file_float: The path to the log file of the first model.
        log_file_int8: The path to the log file of the second model.

    Returns:
        None
    """

    # Read the log files into strings
    with open(log_file_float, 'r') as f:
        log_float = f.read()
    with open(log_file_int8, 'r') as f:
        log_int8 = f.read()

    # Get the metrics of interest
    accuracy_float = float(re.search(r'Evaluation Top 1 accuracy: ([\d.]+) %', log_float).group(1))
    accuracy_int8 = float(re.search(r'Evaluation Top 1 accuracy: ([\d.]+) %', log_int8).group(1))
    flash_float = float(re.search(r'Flash\s*Weights\s*:\s*([\d.]+)\s*\(\s*KiB\s*\)', log_float).group(1))
    flash_int8 = float(re.search(r'Flash\s*Weights\s*:\s*([\d.]+)\s*\(\s*KiB\s*\)', log_int8).group(1))
    ram_float = float(re.search(r'RAM\s*Activations\s*:\s*([\d.]+)\s*\(\s*KiB\s*\)', log_float).group(1))
    ram_int8 = float(re.search(r'RAM\s*Activations\s*:\s*([\d.]+)\s*\(\s*KiB\s*\)', log_int8).group(1))
    inference_time_float = float(re.search(r'Inference\s*Time\s*:\s*([\d.]+)\s*\(\s*ms\s*\)', log_float).group(1))
    inference_time_int8 = float(re.search(r'Inference\s*Time\s*:\s*([\d.]+)\s*\(\s*ms\s*\)', log_int8).group(1))

    # Set the figure size and spacing between subplots
    fig, axs = plt.subplots(2, 2, figsize=(10, 8), gridspec_kw={'wspace': 0.3, 'hspace': 0.4})

    # Graph 1: Accuracy Comparison
    axs[0, 0].bar(['Float model', 'Int8 model'], [accuracy_float, accuracy_int8], color='#03234B')
    axs[0, 0].set_title('Accuracy')
    axs[0, 0].set_xlabel('Model')
    axs[0, 0].set_ylabel('Accuracy (%)')
    axs[0, 0].set_ylim([0, 100])
    axs[0, 0].text(0, accuracy_float+1, str(round(accuracy_float, 2))+'%')
    axs[0, 0].text(1, accuracy_int8+1, str(round(accuracy_int8, 2))+'%')

    # Graph 2: RAM Activation Comparison
    axs[0, 1].bar(['Float model', 'Int8 model'], [ram_float, ram_int8], color='#03234B')
    axs[0, 1].set_title('RAM activation')
    axs[0, 1].set_xlabel('Model')
    axs[0, 1].set_ylabel('RAM activation (KiB)')
    axs[0, 1].set_ylim([0, 1500])
    axs[0, 1].text(0, ram_float+20, str(round(ram_float, 2))+' KiB')
    axs[0, 1].text(1, ram_int8+20, str(round(ram_int8, 2))+' KiB')

    # Graph 3: Flash Weights Comparison
    axs[1, 0].bar(['Float model', 'Int8 model'], [flash_float, flash_int8], color='#03234B')
    axs[1, 0].set_title('Flash Weights')
    axs[1, 0].set_xlabel('Model')
    axs[1, 0].set_ylabel('Flash Weights (KiB)')
    axs[1, 0].set_ylim([0, 1000])
    axs[1, 0].text(0, flash_float+20, str(round(flash_float, 2))+' KiB')
    axs[1, 0].text(1, flash_int8+20, str(round(flash_int8, 2))+' KiB')

    # Graph 4: Inference Time Comparison
    axs[1, 1].bar(['Float model', 'Int8 model'], [inference_time_float, inference_time_int8], color='#03234B')
    axs[1, 1].set_title('Inference Time')
    axs[1, 1].set_xlabel('Model')
    axs[1, 1].set_ylabel('Inference Time (ms)')
    axs[1, 1].set_ylim([0, 1000])
    axs[1, 1].text(0, inference_time_float+20, str(round(inference_time_float, 2))+' ms')
    axs[1, 1].text(1, inference_time_int8+20, str(round(inference_time_int8, 2))+' ms')

    # Set the global title
    fig.suptitle('Comparison of Two Models on Various Metrics', fontsize=14)

    plt.tight_layout()

    # Save the figure to a file
    plt.savefig('comparison.png')

    plt.show()

In [None]:
log_file_int8 = 'outputs/mobilenet_v2_128_0.5_QDQ_quant.log'
log_file_float = 'outputs/mobilenet_v2_128_0.5.log'

compare_models(log_file_float, log_file_int8)

<div id="generate">
        <h3> 4.3 Generate the model optimized C code for STM32 </h3>
</div>

Here you generate the specialized network and data C-files to make the model ready to be integrated in the **STM32** application.

In [None]:
import os

code_folder = os.path.join('outputs/code_outputs')
os.makedirs(code_folder, exist_ok=True)

board_name = 'STM32H7'
IDE = 'gcc'
print(f'{model_name}\ngenerating code for {board_name}')

# Generate model .c/.h code + Lib/Inc on STM32Cube.AI Developer Cloud
result = stmai.generate(CliParameters(model=model_name,
                                      output=code_folder,
                                      optimization='balanced',
                                      allocateInputs=True,
                                      allocateOutputs=True,
                                      noOnnxIoTranspose=False,
                                      includeLibraryForSerie=CliLibrarySerie(board_name),
                                      includeLibraryForIde=CliLibraryIde(IDE),
                                      fromModel=model_name))

print(os.listdir(code_folder))

# Print the first 20 lines of the report
with open(os.path.join(code_folder, 'network_generate_report.txt'), 'r') as f:
    for _ in range(20):
        print(next(f))
