# Quantization Aware Training Tutorial

This tutorial is intended for advanced users, If the previous accuracy results were satisfactory, it can be omitted..

This section will describe the steps for performing Quantization Aware Training (QAT) using Hailo's quantized model. It is assumed that the User already has a background in training deep neural networks.

Quantization aware training - refers to a set of algorithms that incorporate full network training in a quantized domain. The technique utilizes the straight-through estimator (STE) concept to allow for backpropagation through non-differentiable operations, such as rounding and clipping, during the training process. In deep learning literature, QAT typically refers to an extended training procedure using the full dataset, labels, and multiple GPUs, similar to the original training process. However, it can also be applied in other scenarios.

The main differences between the quantization-aware training method and the optimization method shown in previous tutorials are:

* QAT enables training using labeled data, whereas the FineTune algorithm ([Model Optimization Tutorial](./DFC_2_Model_Optimization_Tutorial.ipynb)) is limited to training using knowledge distillation from the full precision model.
* QAT supports running on multiple GPUs for faster training.
* QAT allows for the use of a pipeline of networks or the integration of post-processing functions into the training procedure.

In summary, QAT is a useful tool for training quantized models with labeled data and supports multi-GPU training and integration of post-processing functions. Currently, Hailo QAT only supports Keras.

The remainder of this tutorial will cover the following steps:

* Input definitions: In this step, we will prepare the dataset and model for training and testing.
* Full precision training: A short training procedure will be run to initialize the model's weights.
   * In real scenarios, a complete full precision training procedure should take place here. In this notebook, the full precision training has been shortened to simplify the tutorial.
* Translation of the model: The model will be exported to TFlite, parsed, optimized, and evaluated using the Hailo toolchain.
* Running QAT: Finally, quantization-aware training will be performed on the quantized model to optimize its accuracy.

**Requirements:**

* Run this code in Jupyter notebook, see the Introduction tutorial for more details.

In [None]:
import keras
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

from hailo_sdk_client import ClientRunner, InferenceContext

### Input Definitions
The input definitions step of this tutorial involves using the [MNIST dataset](https://www.tensorflow.org/datasets/catalog/mnist) and a simple Convolutional Neural Network (CNN). The code provided will download the dataset and prepare it for training and evaluation.

In [None]:
# Model parameters
num_classes = 10
input_shape = (28, 28, 1)

# Load the data and split it between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# Prepare the dataset
x_train = x_train.astype(np.float32) / 255
x_test = x_test.astype(np.float32) / 255
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
print(f"Total number of training samples: {x_train.shape[0]}")
print(f"Total number of testing samples: {x_test.shape[0]}")

In [None]:
# Define the model
model = keras.Sequential(
    [
        keras.Input(shape=input_shape),
        keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Flatten(),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(num_classes, activation="softmax"),
    ],
)
model.summary()

### Full Precision Training
In this step, a short training procedure will be run to initialize the model's weights. Only 5,000 images from the full training dataset will be used. The accuracy of the model will be measured on the test dataset.

In [None]:
# Run short training (using only 5k images)
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
model.fit(x_train[:5000], y_train[:5000], batch_size=128, epochs=1)

# Evaluate the results
score = model.evaluate(x_train, y_train)
print(f"Train accuracy: {100 * score[1]:.3f} (Top-1)")
score = model.evaluate(x_test, y_test)
print(f"Test accuracy: {100 * score[1]:.3f} (Top-1)")

### Translation of the Model
In this step, a trained model will be exported into TFlite format to prepare it for use in the Hailo toolchain. After being translated into TFlite, the model can be parsed, optimized, and inferred using the Hailo DFC. The results of the full precision model will be compared to those of the quantized model. It is important to note that the results of the full precision model should be identical to those obtained from the Keras evaluation, while the quantized model may experience some degradation due to quantization noise.

In [None]:
# Export the model to TFlite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
tflite_model_path = "model.tflite"
with tf.io.gfile.GFile(tflite_model_path, "wb") as f:
    f.write(tflite_model)

In [None]:
# Parse the TFlite model
runner = ClientRunner(hw_arch="hailo8")
runner.translate_tf_model(tflite_model_path)

# Optimize the model: enforce 60% 4-bit weights without optimization
model_script_commands = [
    "model_optimization_config(compression_params, auto_4bit_weights_ratio=0.6)\n"
    "model_optimization_flavor(optimization_level=0)\n",
]

runner.load_model_script("".join(model_script_commands))
runner.optimize(x_train[:1024])

In [None]:
# Evaluate the results
with runner.infer_context(InferenceContext.SDK_QUANTIZED) as q_ctx:
    with runner.infer_context(InferenceContext.SDK_FP_OPTIMIZED) as fp_ctx:
        y_infer_fp = runner.infer(fp_ctx, x_test)
        y_infer_q = runner.infer(q_ctx, x_test)

# Hailo Keras model is exported with rank4 layers, expands dimensions for the y_test to match the model output shape
y_test = np.expand_dims(y_test, axis=[1, 2])
full_precision_result = np.count_nonzero(np.argmax(y_infer_fp, axis=-1) == np.argmax(y_test, axis=-1)) / len(y_test)
quantize_result = np.count_nonzero(np.argmax(y_infer_q, axis=-1) == np.argmax(y_test, axis=-1)) / len(y_test)
print(f"Test accuracy (floating point): {100 * full_precision_result:.3f} (Top-1)")
print(f"Test accuracy (quantized): {100 * quantize_result:.3f}%(Top-1)")
print(f"Degradation: {100 * (full_precision_result - quantize_result):.3f}")

### Running QAT
In this final step, a quantized model will be optimized to enhance its accuracy. The `runner.get_keras_model` API will be used to obtain a Keras model initialized with the quantized weights. The model can then be trained using straight-through estimator (STE) method.

* The `tf.distribute.MirroredStrategy` API is being used to enable synchronous training across multiple GPUs on the same machine.
* The `runner.get_keras_model` API must be used with `trainable=True` to allow training (usage of `fit`).
* To the Keras model additional layers, post-processing or other models can be added. For example, here a new `tf.keras.layers.Softmax` layer is being added.
* For training, use the `fit` API provided by Keras. Training can be done with customized loss functions and different optimizers.
* After training is complete, update the `ClientRunner` weights with the updated model. This is done using the `runner.set_keras_model` API. Only allowed changes to the Keras model includes weight changes. Once the new weights are updated, compile the model with the new weights using the `runner.compile` API.

In [None]:
with tf.distribute.MultiWorkerMirroredStrategy().scope():
    with runner.infer_context(InferenceContext.SDK_QUANTIZED) as ctx:
        # Hailo Keras model is exported with rank4 layers, expands dimensions for the y_train to match the model output shape
        y_train = np.expand_dims(y_train, axis=[1, 2])

        # move numpy data to tf.data.Dataset to be used by multiple GPUs
        train_data = tf.data.Dataset.from_tensor_slices((x_train, y_train))
        train_data = train_data.batch(128)
        options = tf.data.Options()
        options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
        train_data = train_data.with_options(options)

        # get the Hailo Keras model for training
        model = runner.get_keras_model(ctx, trainable=True)
        model.build(train_data)
        inputs = keras.Input(input_shape)
        x = model(inputs)
        outputs = keras.layers.Softmax(axis=0)(x)
        new_model = keras.Model(inputs, outputs)

        # adding external loss.
        # note that this compile API only compiles the Keras model but doesn't compile the model to the Hailo HW.
        new_model.build(train_data)
        new_model.compile(
            loss=keras.losses.CategoricalCrossentropy(),
            optimizer=keras.optimizers.Adam(learning_rate=1e-6),
            metrics=["accuracy"],
        )

        # start QAT
        log = new_model.fit(train_data, batch_size=128, epochs=10)

        # set the Keras model after training. The model is already optimized, so do not run optimize() again.
        runner.set_keras_model(model)

# plot training curve
plt.plot(log.history["accuracy"])
plt.title("Model Accuracy")
plt.ylabel("Top-1")
plt.xlabel("Epoch")
plt.grid()
plt.show()

In [None]:
# Evaluate the results
with runner.infer_context(InferenceContext.SDK_QUANTIZED) as q_ctx:
    y_infer_qat = runner.infer(q_ctx, x_test)

qat_result = np.count_nonzero(np.argmax(y_infer_qat, axis=-1) == np.argmax(y_test, axis=-1)) / len(y_test)
print(f"Test accuracy (quantized) before QAT: {100 * quantize_result:.3f} (Top-1)")
print(f"Test accuracy (quantized) after QAT: {100 * qat_result:.3f} (Top-1)")
print(f"Accuracy improvement: {100 * (qat_result - quantize_result):.3f}")

### Knowledge Distillation and QAT
QAT can gain additional accuracy with training using a teacher (the full precision model) to train the student model (the quantized model) - [knowledge distillation](https://arxiv.org/abs/1503.02531). To use the full precision model, call the `runner.get_keras_model` API with a different context and change the loss accordingly. In the following code, a new class `Distiller` is generated to distill the full precision and combine with the supervision of the labels.

* Note that, Hailo's FineTune algorithm works in the same way as well (more information can be found in the DFC user guide).

In [None]:
class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super().__init__()
        self._teacher = teacher
        self._student = student

    def compile(self, optimizer, metrics, student_loss_fn, distillation_loss_fn, alpha=0.1, temperature=3):
        self._teacher.model.compile()
        self._student.model.compile()
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self._student_loss_fn = student_loss_fn
        self._distillation_loss_fn = distillation_loss_fn
        self._alpha = alpha
        self._temperature = temperature

    def build(self, input_shape):
        if not self._teacher.model.built:
            self._teacher.model.build(input_shape)
        if not self._student.model.built:
            self._student.model.build(input_shape)

    def train_step(self, data):
        # unpack data (image, label)
        x, y = data

        # forward pass of teacher
        teacher_predictions = self._teacher.model(x, training=False)
        trainable_vars = [v._value for v in self._student.trainable_variables]

        with tf.GradientTape() as tape:
            tape.watch(trainable_vars)
            # forward pass of student
            student_predictions = self._student.model(x, training=True)

            # compute supervised loss
            student_loss = self._student_loss_fn(y, student_predictions / self._temperature) * self._temperature

            # compute distillation loss
            distillation_loss = (
                self._distillation_loss_fn(
                    teacher_predictions / self._temperature,
                    student_predictions / self._temperature,
                )
                * self._temperature**2
            )

            total_loss = self._alpha * student_loss + (1 - self._alpha) * distillation_loss

        # compute gradients
        gradients = tape.gradient(total_loss, trainable_vars)

        # update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        for metric in self.metrics:
            metric.update_state(y, student_predictions)
        results = {m.name: m.result() for m in self._metrics}
        results.update(
            {"total_loss": total_loss, "student_loss": student_loss, "distillation_loss": distillation_loss},
        )
        return results

In [None]:
# Parse the TFlite model
runner = ClientRunner(hw_arch="hailo8")
runner.translate_tf_model(tflite_model_path)

# Optimize the model: enforce 40% 4bit weights without optimization
model_script_commands = [
    "model_optimization_config(compression_params, auto_4bit_weights_ratio=0.6)\n"
    "model_optimization_flavor(optimization_level=0)\n",
]

runner.load_model_script("".join(model_script_commands))
runner.optimize(x_train[:1024])

with runner.infer_context(InferenceContext.SDK_QUANTIZED) as ctx_q:
    with runner.infer_context(InferenceContext.SDK_FP_OPTIMIZED) as ctx_fp:
        # get the Hailo Keras model for training
        student = runner.get_keras_model(ctx_q, trainable=True)

        # geth the full precision model for kd
        teacher = runner.get_keras_model(ctx_fp, trainable=False)

        # create the kd model
        distiller = Distiller(student=student, teacher=teacher)
        distiller_input_shapes = (1, *student.model.get_input_shapes()[0])
        distiller.build(distiller_input_shapes)
        distiller.compile(
            optimizer=keras.optimizers.Adam(learning_rate=1e-6),
            metrics=[keras.metrics.SparseCategoricalAccuracy()],
            student_loss_fn=keras.losses.CategoricalCrossentropy(),
            distillation_loss_fn=keras.losses.KLDivergence(),
            alpha=0.5,
            temperature=10,
        )

        # start QAT
        log = distiller.fit(x_train, y_train, batch_size=128, epochs=10)

        # set the Keras model after training
        runner.set_keras_model(student)

In [None]:
# Evaluate the results
with runner.infer_context(InferenceContext.SDK_QUANTIZED) as q_ctx:
    y_infer_qat = runner.infer(q_ctx, x_test)

qat_with_kd_result = np.count_nonzero(np.argmax(y_infer_qat, axis=-1) == np.argmax(y_test, axis=-1)) / len(y_test)
print(f"Test accuracy (quantized) with QAT: {100 * qat_result:.3f} (Top-1)")
print(f"Test accuracy (quantized) with QAT and KD: {100 * qat_with_kd_result:.3f} (Top-1)")
print(f"Accuracy improvement: {100 * (qat_with_kd_result - qat_result):.3f}")