# Training a 8-bit Quantized CNN for MNIST dataset

## Overview

The model is trained using Keras and is quantized to a TFLite model. Then the necessary parameters are extracted from the TFLite model to be used in HLS code. 

## Setup

In [None]:
! pip uninstall -y tensorflow
! pip install -q tf-nightly
! pip install -q tensorflow-model-optimization
! pip install h5py


Uninstalling tensorflow-2.4.1:
  Successfully uninstalled tensorflow-2.4.1
[K     |████████████████████████████████| 408.3MB 43kB/s 
[K     |████████████████████████████████| 4.0MB 54.8MB/s 
[K     |████████████████████████████████| 5.9MB 25.1MB/s 
[K     |████████████████████████████████| 471kB 44.6MB/s 
[K     |████████████████████████████████| 4.0MB 44.9MB/s 
[K     |████████████████████████████████| 3.8MB 44.9MB/s 
[31mERROR: fancyimpute 0.4.3 requires tensorflow, which is not installed.[0m
[K     |████████████████████████████████| 174kB 7.7MB/s 


In [None]:
#Step 2: Mount your google drive (OPTIONAL)
###########################################

#Mount the google drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
%cd gdrive/My Drive/Test2/

/content/gdrive/My Drive/Test2


In [None]:
import tempfile
import os

import tensorflow as tf

from tensorflow import keras

import numpy as np

import h5py

In [None]:
#os.chdir('/content/gdrive/My Drive/Test')
#os.environ['PATH'] = '/content/gdrive/My Drive/Test' + ';' + os.environ['PATH']
#print(os.getcwd())

/content/gdrive/My Drive/Test


## Train a model for MNIST without quantization aware training

In [None]:
# Load MNIST dataset
mnist = keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1)

test_images = np.pad(test_images, ((0,0),(2,2),(2,2),(0,0)), 'constant')
train_images = np.pad(train_images, ((0,0),(2,2),(2,2),(0,0)), 'constant')

# Normalize the input image so that each pixel value is between 0 to 1.
#train_images = train_images / 255.0
#test_images = test_images / 255.0

# Define the model architecture.
model = keras.Sequential([
  #keras.layers.InputLayer(input_shape=(32, 32)),
  #keras.layers.Reshape(target_shape=(32, 32, 1)),
  keras.layers.Conv2D(filters=8, kernel_size=(4, 4), input_shape=(32, 32, 1), activation='relu'),
  keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2),
  keras.layers.Conv2D(filters=16, kernel_size=(2, 2), activation='relu'),
  keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2),
  keras.layers.Flatten(),
  keras.layers.Dense(120, activation='relu'),
  keras.layers.Dense(84, activation='relu'),
  keras.layers.Dense(10, activation=None)
])

# Train the digit classification model
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

model.fit(
  train_images,
  train_labels,
  epochs=1,
  validation_split=0.1,
)
model.summary()
weights1=model.get_weights()
with open('./output/normal.txt', 'w') as f1:
  print(weights1, file=f1)

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_2 (Conv2D)            (None, 29, 29, 8)         136       
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 14, 14, 8)         0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 13, 13, 16)        528       
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 6, 6, 16)          0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 576)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 120)               69240     
_________________________________________________________________
dense_4 (Dense)              (None, 84)               

In [None]:
# serialize model to JSON
model_json = model.to_json()
with open("/content/gdrive/MyDrive/Test2/model.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
model.save_weights("/content/gdrive/MyDrive/Test2/model.h5")
print("Saved model to disk")

Saved model to disk


## Clone and fine-tune pre-trained model with quantization aware training


### Define the model

You will apply quantization aware training to the whole model and see this in the model summary. All layers are now prefixed by "quant".

Note that the resulting model is quantization aware but not quantized (e.g. the weights are float32 instead of int8). The sections after show how to create a quantized model from the quantization aware one.

In [None]:
import tensorflow_model_optimization as tfmot

quantize_model = tfmot.quantization.keras.quantize_model

# q_aware stands for for quantization aware.
q_aware_model = quantize_model(model)

# `quantize_model` requires a recompile.
q_aware_model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

q_aware_model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
quantize_layer (QuantizeLaye (None, 32, 32, 1)         3         
_________________________________________________________________
quant_conv2d_2 (QuantizeWrap (None, 29, 29, 8)         155       
_________________________________________________________________
quant_max_pooling2d_2 (Quant (None, 14, 14, 8)         1         
_________________________________________________________________
quant_conv2d_3 (QuantizeWrap (None, 13, 13, 16)        563       
_________________________________________________________________
quant_max_pooling2d_3 (Quant (None, 6, 6, 16)          1         
_________________________________________________________________
quant_flatten_1 (QuantizeWra (None, 576)               1         
_________________________________________________________________
quant_dense_3 (QuantizeWrapp (None, 120)              

### Train and evaluate the model against baseline

To demonstrate fine tuning after training the model for just an epoch, fine tune with quantization aware training on a subset of the training data.

In [None]:
train_images_subset = train_images[0:1000] # out of 60000
train_labels_subset = train_labels[0:1000]

q_aware_model.fit(train_images_subset, train_labels_subset,
                  batch_size=500, epochs=1, validation_split=0.1)
weights2=q_aware_model.get_weights()
with open('./output/quantize_aware.txt', 'w') as f2:
  print(weights2, file=f2)



For this example, there is minimal to no loss in test accuracy after quantization aware training, compared to the baseline.

In [None]:
_, baseline_model_accuracy = model.evaluate(
    test_images, test_labels, verbose=0)

_, q_aware_model_accuracy = q_aware_model.evaluate(
   test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_accuracy)
print('Quant test accuracy:', q_aware_model_accuracy)

Baseline test accuracy: 0.9679999947547913
Quant test accuracy: 0.9175999760627747


## Create quantized model for TFLite backend

After this, you have an actually quantized model with int8 weights and uint8 activations.

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

quantized_tflite_model = converter.convert()
#quantized_tflite_model.summary()



INFO:tensorflow:Assets written to: /tmp/tmpmhh0kkp7/assets


INFO:tensorflow:Assets written to: /tmp/tmpmhh0kkp7/assets


## See persistence of accuracy from TF to TFLite

Define a helper function to evaluate the TF Lite model on the test dataset.

In [None]:
import numpy as np

def evaluate_model(interpreter):
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]

  # Run predictions on every image in the "test" dataset.
  prediction_digits = []
  for i, test_image in enumerate(test_images):
    if i % 1000 == 0:
      print('Evaluated on {n} results so far.'.format(n=i))
    # Pre-processing: add batch dimension and convert to float32 to match with
    # the model's input data format.
    test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
    interpreter.set_tensor(input_index, test_image)

    # Run inference.
    interpreter.invoke()

    # Post-processing: remove batch dimension and find the digit with highest
    # probability.
    output = interpreter.tensor(output_index)
    digit = np.argmax(output()[0])
    prediction_digits.append(digit)

  print('\n')
  # Compare prediction results with ground truth labels to calculate accuracy.
  prediction_digits = np.array(prediction_digits)
  accuracy = (prediction_digits == test_labels).mean()
  return accuracy

You evaluate the quantized model and see that the accuracy from TensorFlow persists to the TFLite backend.

In [None]:
interpreter = tf.lite.Interpreter(model_content=quantized_tflite_model)
interpreter.allocate_tensors()

test_accuracy = evaluate_model(interpreter)

print('Quant TFLite test_accuracy:', test_accuracy)
print('Quant TF test accuracy:', q_aware_model_accuracy)

Evaluated on 0 results so far.
Evaluated on 1000 results so far.
Evaluated on 2000 results so far.
Evaluated on 3000 results so far.
Evaluated on 4000 results so far.
Evaluated on 5000 results so far.
Evaluated on 6000 results so far.
Evaluated on 7000 results so far.
Evaluated on 8000 results so far.
Evaluated on 9000 results so far.


Quant TFLite test_accuracy: 0.9172
Quant TF test accuracy: 0.9175999760627747


## See 4x smaller model from quantization

You create a float TFLite model and then see that the quantized TFLite model
is 4x smaller.

In [None]:
path_name = '/content/gdrive/MyDrive/Test2/'
# Create float TFLite model.
float_converter = tf.lite.TFLiteConverter.from_keras_model(model)
float_tflite_model = float_converter.convert()

# Measure sizes of models.
_, float_file = tempfile.mkstemp('.tflite')
_, quant_file = tempfile.mkstemp('.tflite')

with open('/content/gdrive/MyDrive/Test2/quant_file', 'wb') as f:
  f.write(quantized_tflite_model)

with open('/content/gdrive/MyDrive/Test2/float_file', 'wb') as f:
  f.write(float_tflite_model)

print("Float model in Mb:", os.path.getsize(float_file) / float(2**20))
print("Quantized model in Mb:", os.path.getsize(quant_file) / float(2**20))
print(os.path.dirname(quant_file))
%pwd

INFO:tensorflow:Assets written to: /tmp/tmp4wkttv9z/assets


INFO:tensorflow:Assets written to: /tmp/tmp4wkttv9z/assets


Float model in Mb: 0.0
Quantized model in Mb: 0.0
/tmp


'/content/gdrive/MyDrive/Test2'

In [None]:
# get details for each layer
all_layers_details = interpreter.get_tensor_details() 


f = h5py.File("mobilenet_v3_weights_infos.hdf5", "w")   

for layer in all_layers_details:
     # to create a group in an hdf5 file
     grp = f.create_group(str(layer['index']))

     # to store layer's metadata in group's metadata
     grp.attrs["name"] = layer['name']
     grp.attrs["shape"] = layer['shape']
     # grp.attrs["dtype"] = all_layers_details[i]['dtype']
     grp.attrs["quantization"] = layer['quantization']

     # to store the weights in a dataset
     grp.create_dataset("weights", data=interpreter.get_tensor(layer['index']))


f.close()

In [None]:
def save_W1(W1,name):
    W1_numpy = np.empty((0), dtype = float)
    for i in W1:    
        #aux = W1[: , : , :1 , i:i+1]
        #aux = np.reshape(aux,(4,4))
        W1_numpy = np.append(W1_numpy,i)
    np.savetxt('./output/'+name+'.out',W1_numpy, delimiter=',') 

with open('./output/quantized.txt', 'w') as f3:
  for layer in all_layers_details:
      # to create a group in an hdf5 file
      print('----------------------------------------------------------------------------------------------------------------', file=f3)
      print(str(layer['index'])+' - '+str(layer['name']), file=f3)
      dim=int(layer['shape'].size)
      if (dim):
        print('Dimensions = '+str(layer['shape']), file=f3)
        save_W1(interpreter.get_tensor(layer['index']),str(layer['index']))
      print ('\n Quantizations:', file=f3)
      print (layer['quantization'], file=f3)
      print ('\n Scales:', file=f3)
      print (layer['quantization_parameters']['scales'], file=f3)
      print ('\n Zero points:', file=f3)
      print (layer['quantization_parameters']['zero_points'], file=f3)
      print ('\n Quantized dimensions:', file=f3)
      print (layer['quantization_parameters']['quantized_dimension'], file=f3)
      print ('\n Tensors:', file=f3)
      temp=list(interpreter.get_tensor(layer['index']))
      a=len(temp)
      if (dim==2):
        print('{', file=f3)
        for i in range(a):
          print('{',end='', file=f3)
          print(','.join(str(x) for x in temp[i]),end='', file=f3)
          if i == a-1:
            print('}', file=f3, end='')
          else:
            print('},', file=f3)
        print('};', file=f3)
      elif (dim==4):
        #print('{', file=f3)
        for i in range(a):
          b=len(temp[i])
          print('{', end='',file=f3)
          for j in range(b):
            print('{', end='',file=f3)
            c=len(temp[i][j])
            for k in range(c):
              if (list(layer['shape'])[3]==1):
                print('{', end='',file=f3)
                print(','.join(str(x) for x in temp[i][j][k]),end='', file=f3)
                if k==c-1:
                  print('}',end='', file=f3)
                else:
                  print('},',end='', file=f3)
              else:
                print('{', end='',file=f3)
                print(','.join(str(x) for x in temp[i][j][k]),end='', file=f3)
                if k==c-1:
                  print('}',end='', file=f3)
                else:
                  print('},', file=f3)
            if j==b-1:
              print('}', file=f3)
            else:
              print('},', file=f3)
          if i==a-1:
            print('};', file=f3)
          else:
            print('},', file=f3)
        #print('};', file=f3)
      else:
        print(temp, file=f3)
      

In [None]:
import numpy as np

def reference_io(interpreter):
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]

  # Run predictions on every image in the "test" dataset.
  prediction_digits = []
  with open('./output/refernce.txt', 'w') as f4:
    for i, test_image in enumerate(test_images[0:10]):
      # Pre-processing: add batch dimension and convert to float32 to match with
      # the model's input data format.
      print("Image %i \n \n" %i, file=f4)
      print('Input before float: \n', file=f4)
      for line in test_image:
        print(','.join(str(x[0]) for x in line), file=f4)
      test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
      #print('\n Input after float: \n', file=f4)
      #for line2 in test_image:
        #for line in line2:
          #print(','.join(str(x[0]) for x in line, file=f4)
      interpreter.set_tensor(input_index, test_image)

      # Run inference.
      interpreter.invoke()

      # Post-processing: remove batch dimension and find the digit with highest
      # probability.
      output = interpreter.tensor(output_index)
      print('\n Output: \n', file=f4)
      print(output()[0], file=f4)
      digit = np.argmax(output()[0])
      print('\n Prediction: \n', file=f4)
      print(digit, file=f4)
      print('\n \n \n -------------------------------------------------------------------------------------------------- \n \n \n', file=f4)

    print('\n', file=f4)
  # Compare prediction results with ground truth labels to calculate accuracy.
  return

reference_io(interpreter)
print('done')

done


## Final Note

The code can be further improved to create a file that can be directly used in HLS.
