# *IT00CH92 Embedded AI - Spring 2024*



# Pruning and Quantization

## Importing libraries

In [26]:
import os
import tensorflow as tf
import tensorflow.keras as keras
import numpy as np
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
tf.random.set_seed(
    seed=81
)

import tempfile
from tensorflow_model_optimization.python.core.keras.compat import keras
import tensorflow_model_optimization as tfmot
# from tensorflow.keras.models import load_model

## Loading Tensorflow Datasets API to load MNIST

TFDS datasets often come with data already split into different sets. For MNIST, it has splits for train and test. We use the [Slicing API](https://www.tensorflow.org/datasets/splits#slicing_api) for TFDS to create a validation split.

Next, we aim to understand how the dataset is formatted and utilize visualizations. Finally, the dataset is preprocessed before being passed to the model. For preprocessing, we simply normalize the image values as float32 within the range [0, 1] for all three splits.

In [27]:
dataset, info = tfds.load('mnist', with_info=True, as_supervised=True)

In [28]:
(ds_train, ds_val, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train[:90%]', 'train[90%:]', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

### Parameters

In [31]:
batch_size = 128
n_epochs = 4

### Preprocessing

In [32]:
def normalize_img(image:tf.uint8, label:tf.int64):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

def normalize_splits(ds, split_name: str, batch_size: int):
  """Applies preprocessing to train, val and test sets"""
  ds = ds.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE
  )
  ds = ds.cache() # Caching makes it faster for consecutive runs
  if split_name != 'test':
    # Shuffling is not done for the test set
    ds = ds.shuffle(ds_info.splits[split_name].num_examples)
  ds = ds.batch(batch_size)
  ds = ds.prefetch(tf.data.AUTOTUNE)
  return ds

In [33]:
ds_train = normalize_splits(ds_train, split_name='train[:90%]', batch_size=batch_size)
ds_val = normalize_splits(ds_val, split_name='train[90%:]', batch_size=batch_size)
ds_test = normalize_splits(ds_test, split_name='test', batch_size=batch_size)

# Loading presaved model

In [None]:
keras_file = 'saved_model/Full_Precision_MNIST_TF.h5'
model = keras.models.load_model(keras_file)

### Saving full model as TensorFlow Lite

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

# Save the converted model to a file
tflite_model_file = 'saved_model/Full_Precision_MNIST_TF.tflite'
with open(tflite_model_file, 'wb') as f:
    f.write(tflite_model)

print(f"TensorFlow Lite model saved as {tflite_model_file}")

# Pruning 

## Fine-tune pre-trained model with pruning

In [36]:
prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude

# Compute end step to finish pruning after 2 epochs.
batch_size = 128
epochs = 2
validation_split = 0.1 # 10% of training set will be used for validation set. 

num_images = len(ds_train)
end_step = np.ceil(num_images / batch_size).astype(np.int32) * epochs

# Define model for pruning.
pruning_params = {
      'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.50,
                                                               final_sparsity=0.80,
                                                               begin_step=0,
                                                               end_step=end_step)
}


model_for_pruning = prune_low_magnitude(model, **pruning_params)

# `prune_low_magnitude` requires a recompile.
model_for_pruning.compile(optimizer='adam',
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

model_for_pruning.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 prune_low_magnitude_conv2d  (None, 28, 28, 6)         308       
  (PruneLowMagnitude)                                            
                                                                 
 prune_low_magnitude_max_po  (None, 14, 14, 6)         1         
 oling2d (PruneLowMagnitude                                      
 )                                                               
                                                                 
 prune_low_magnitude_conv2d  (None, 14, 14, 16)        4818      
 _1 (PruneLowMagnitude)                                          
                                                                 
 prune_low_magnitude_max_po  (None, 7, 7, 16)          1         
 oling2d_1 (PruneLowMagnitu                                      
 de)                                                    

## Train and evaluate the model against baseline

In [37]:
logdir = tempfile.mkdtemp()

callbacks = [
  tfmot.sparsity.keras.UpdatePruningStep(),
  tfmot.sparsity.keras.PruningSummaries(log_dir=logdir),
]

model_for_pruning.fit(ds_train,
                  batch_size=batch_size, epochs=epochs, validation_data=ds_val,
                  callbacks=callbacks)

Epoch 1/2


  output, from_logits = _get_logits(






Epoch 2/2


<tf_keras.src.callbacks.History at 0x301bfc220>

In [38]:
_, model_for_pruning_accuracy = model_for_pruning.evaluate(
   ds_test, verbose=0)

print('Baseline test accuracy:', 0.9829999804496765) 
print('Pruned test accuracy:', model_for_pruning_accuracy)

Baseline test accuracy: 0.9829999804496765
Pruned test accuracy: 0.9887999892234802


## Create 3x smaller models from pruning

In [39]:

#Do the pruning
model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)

# _, pruned_keras_file = tempfile.mkstemp('.h5')

pruned_keras_file = "saved_model/pruned_model_1.h5"
keras.models.save_model(model_for_export, pruned_keras_file, include_optimizer=False)
print('Saved pruned Keras model to:', pruned_keras_file)



  keras.models.save_model(model_for_export, pruned_keras_file, include_optimizer=False)


Saved pruned Keras model to: saved_model/pruned_model_1.h5


In [40]:
# Converting to TensorFlowLite
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
pruned_tflite_model = converter.convert()

# _, pruned_tflite_file = tempfile.mkstemp('.tflite')
pruned_tflite_file = "saved_model/pruned_model_tflite_1.tflite"

with open(pruned_tflite_file, 'wb') as f:
  f.write(pruned_tflite_model)

print('Saved pruned TFLite model to:', pruned_tflite_file)

INFO:tensorflow:Assets written to: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpmme71_4b/assets


INFO:tensorflow:Assets written to: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpmme71_4b/assets


Saved pruned TFLite model to: saved_model/pruned_model_tflite_1.tflite


W0000 00:00:1717498141.837855 13438389 tf_tfl_flatbuffer_helpers.cc:390] Ignored output_format.
W0000 00:00:1717498141.837864 13438389 tf_tfl_flatbuffer_helpers.cc:393] Ignored drop_control_dependency.
2024-06-04 13:49:01.838002: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpmme71_4b
2024-06-04 13:49:01.838622: I tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2024-06-04 13:49:01.838628: I tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpmme71_4b
2024-06-04 13:49:01.844105: I tensorflow/cc/saved_model/loader.cc:234] Restoring SavedModel bundle.
2024-06-04 13:49:01.857470: I tensorflow/cc/saved_model/loader.cc:218] Running initialization op on SavedModel bundle at path: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpmme71_4b
2024-06-04 13:49:01.863397: I tensorflow/cc/saved_model/loader.c

In [41]:
def get_gzipped_model_size(file):
  # Returns size of gzipped model, in bytes.
  import os
  import zipfile

  # _, zipped_file = tempfile.mkstemp('.zip')
  zipped_file = "saved_model/pruned_model_zipped_1.zip"
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)

In [42]:
print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped pruned Keras model: %.2f bytes" % (get_gzipped_model_size(pruned_keras_file)))
print("Size of gzipped pruned TFlite model: %.2f bytes" % (get_gzipped_model_size(pruned_tflite_file)))

Size of gzipped baseline Keras model: 1200921.00 bytes
Size of gzipped pruned Keras model: 402662.00 bytes
Size of gzipped pruned TFlite model: 400753.00 bytes


## Create a 10x smaller model from combining pruning and quantization

Full integer quantization

In [43]:
# pruned_model = keras.models.load_model("saved_model/pruned_model_1.h5")

In [44]:
def representative_data_gen():
  for image, label  in ds_train.take(100) :
    yield [image]

In [45]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# Set the input and output tensors to uint8 
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

tflite_model_quant = converter.convert()

INFO:tensorflow:Assets written to: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpabbyaf4m/assets


INFO:tensorflow:Assets written to: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpabbyaf4m/assets
W0000 00:00:1717498142.996554 13438389 tf_tfl_flatbuffer_helpers.cc:390] Ignored output_format.
W0000 00:00:1717498142.996563 13438389 tf_tfl_flatbuffer_helpers.cc:393] Ignored drop_control_dependency.
2024-06-04 13:49:02.996687: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpabbyaf4m
2024-06-04 13:49:02.997481: I tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2024-06-04 13:49:02.997487: I tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: /var/folders/s7/3gb_v_vn6ns01664c_jtg6hh0000gn/T/tmpabbyaf4m
2024-06-04 13:49:03.002596: I tensorflow/cc/saved_model/loader.cc:234] Restoring SavedModel bundle.
2024-06-04 13:49:03.016193: I tensorflow/cc/saved_model/loader.cc:218] Running initialization op on SavedModel bundle at path: /var/folders/s7/3gb_v_

In [46]:
#Saving model
_, quantized_and_pruned_tflite_file = tempfile.mkstemp('.tflite')

quantized_and_pruned_tflite_file = "saved_model/pruned_model_tflite_quantized_1.tflite"

with open(quantized_and_pruned_tflite_file, 'wb') as f:
  f.write(tflite_model_quant)

print('Saved quantized and pruned TFLite model to:', quantized_and_pruned_tflite_file)

print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped pruned and quantized TFlite model: %.2f bytes" % (get_gzipped_model_size(quantized_and_pruned_tflite_file)))

Saved quantized and pruned TFLite model to: saved_model/pruned_model_tflite_quantized_1.tflite
Size of gzipped baseline Keras model: 1200921.00 bytes
Size of gzipped pruned and quantized TFlite model: 97102.00 bytes


### Check persistence of accuracy from TF to TFLite

In [47]:
# Fuction th
def evaluate_model(interpreter):
    input_index = interpreter.get_input_details()[0]["index"]
    output_index = interpreter.get_output_details()[0]["index"]
 
    prediction_digits = []
    test_labels = []
    

    # Iterate over each batch in the test dataset
    for k, (batch, labels) in enumerate(ds_test):

        print('Evaluated on {n} results so far.'.format(n=k))
        # Scale and converts to integer 
        batch = tf.cast(batch * 255.0, tf.uint8)

        # Go through each batch
        for i in range(batch.shape[0]):
            test_image = batch[i:i+1]  
            labels_set = labels[i:i+1]
            test_labels.append(labels_set.numpy()[0]) 
    
            # Ensure the input is in the correct format
            interpreter.set_tensor(input_index, test_image.numpy())
            # Run inference
            interpreter.invoke()

            # Get the output and find the predicted digit
            output = interpreter.get_tensor(output_index)

            digit = np.argmax(output, axis=1)
            
            prediction_digits.append(digit[0])
    
    # Calculate the accuracy by comparing predicted and true labels
    prediction_digits = np.array(prediction_digits)
    test_labels = np.array(test_labels)
    accuracy = (prediction_digits == test_labels).mean()
    return accuracy

In [50]:
interpreter = tf.lite.Interpreter(model_content=tflite_model_quant)
interpreter.allocate_tensors()

test_accuracy = evaluate_model(interpreter)

print('Pruned and quantized TFLite test_accuracy:', test_accuracy)
print('Pruned TF test accuracy:', model_for_pruning_accuracy)

Evaluated on 0 results so far.
Evaluated on 1 results so far.
Evaluated on 2 results so far.
Evaluated on 3 results so far.
Evaluated on 4 results so far.
Evaluated on 5 results so far.
Evaluated on 6 results so far.
Evaluated on 7 results so far.
Evaluated on 8 results so far.
Evaluated on 9 results so far.
Evaluated on 10 results so far.
Evaluated on 11 results so far.
Evaluated on 12 results so far.
Evaluated on 13 results so far.
Evaluated on 14 results so far.
Evaluated on 15 results so far.
Evaluated on 16 results so far.
Evaluated on 17 results so far.
Evaluated on 18 results so far.
Evaluated on 19 results so far.
Evaluated on 20 results so far.
Evaluated on 21 results so far.
Evaluated on 22 results so far.
Evaluated on 23 results so far.
Evaluated on 24 results so far.
Evaluated on 25 results so far.
Evaluated on 26 results so far.
Evaluated on 27 results so far.
Evaluated on 28 results so far.
Evaluated on 29 results so far.
Evaluated on 30 results so far.
Evaluated on 31 re

2024-06-04 13:49:55.231208: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


# Convert the model to .h

In [None]:
!echo "const unsigned char model[] = {" > classifying_imu/content/model.h
!cat saved_model/pruned_model_tflite_quantized_1.tflite | xxd -i      >> classifying_imu/content/model.h
!echo "};"                              >> classifying_imu/content/model.h

import os
model_h_size = os.path.getsize("classifying_imu/content/model.h")
print(f"Header file, model.h, is {model_h_size:,} bytes.")
print("\nOpen the side panel (refresh if needed). Double click model.h to download the file.")

Header file, model.h, is 698,052 bytes.

Open the side panel (refresh if needed). Double click model.h to download the file.
