##### Copyright 2020 The TensorFlow Authors.

## Setup

In [1]:
! pip install -q tensorflow-model-optimization

ERROR: Could not install packages due to an OSError: [WinError 206] Der Dateiname oder die Erweiterung ist zu lang: 'C:\\Users\\Sever\\AppData\\Local\\Packages\\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\\LocalCache\\local-packages\\Python310\\site-packages\\tensorflow_model_optimization\\python\\core\\api\\quantization\\keras\\experimental\\default_n_bit\\default_n_bit_transforms'


[notice] A new release of pip is available: 23.3.1 -> 24.0
[notice] To update, run: C:\Users\Sever\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [1]:
import tempfile
import os

import tensorflow as tf
import numpy as np
print(tf.__version__)
from tensorflow_model_optimization.python.core.keras.compat import keras

%load_ext tensorboard

2.9.0


## Baseline implementation from TF


In [18]:
# Load MNIST dataset
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# Normalize the input image so that each pixel value is between 0 and 1.
train_images = train_images / 255.0
test_images = test_images / 255.0

# Define the model architecture.
model = keras.Sequential([
  keras.layers.InputLayer(input_shape=(28, 28)),
  keras.layers.Reshape(target_shape=(28, 28, 1)),
  keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
  keras.layers.MaxPooling2D(pool_size=(2, 2)),
  keras.layers.Flatten(),
  keras.layers.Dense(10)
])

# Train the digit classification model
model.compile(optimizer='adam',
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

model.fit(
  train_images,
  train_labels,
  epochs=4,
  validation_split=0.1,
)

import tensorflow_model_optimization as tfmot

prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude

# Compute end step to finish pruning after 2 epochs.
batch_size = 128
epochs = 2
validation_split = 0.1 # 10% of training set will be used for validation set. 

num_images = train_images.shape[0] * (1 - validation_split)
end_step = np.ceil(num_images / batch_size).astype(np.int32) * epochs

# Define model for pruning.
pruning_params = {
      'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.50,
                                                               final_sparsity=0.80,
                                                               begin_step=0,
                                                               end_step=end_step)
}

model_for_pruning = prune_low_magnitude(model, **pruning_params)

# `prune_low_magnitude` requires a recompile.
model_for_pruning.compile(optimizer='adam',
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

model_for_pruning.summary()

_, model_for_pruning_accuracy = model_for_pruning.evaluate(
   test_images, test_labels, verbose=0)

print('Pruned test accuracy:', model_for_pruning_accuracy)
print('Non-zero weights in the model:', non_zero_weights_summary(model))

Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4
Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 prune_low_magnitude_reshape  (None, 28, 28, 1)        1         
 _3 (PruneLowMagnitude)                                          
                                                                 
 prune_low_magnitude_conv2d_  (None, 26, 26, 12)       230       
 3 (PruneLowMagnitude)                                           
                                                                 
 prune_low_magnitude_max_poo  (None, 13, 13, 12)       1         
 ling2d_3 (PruneLowMagnitude                                     
 )                                                               
                                                                 
 prune_low_magnitude_flatten  (None, 2028)             1         
 _3 (PruneLowMagnitude)                                          
              

## Implementation of first simple pruning techniques


In [2]:
import numpy as np
import tensorflow as tf

def non_zero_weights_summary(model):
    """
    Prints a summary of the model showing only non-zero weights for each layer.

    Args:
    model (tf.keras.Model): The model to summarize.
    """
    print("Layer Name, Layer Type, Non-zero Weights")
    total_non_zero = 0
    for layer in model.layers:
        if hasattr(layer, 'weights') and len(layer.weights) > 0:
            non_zero_count = 0
            for weight in layer.get_weights():
                non_zero_count += np.count_nonzero(weight)
            print(f"{layer.name}, {type(layer).__name__}, {non_zero_count}")
            total_non_zero += non_zero_count
    print(f"Total non-zero weights in the model: {total_non_zero}")

In [3]:
import tensorflow as tf

def prune_weights_magnitude(original_model, threshold=0.01):
    """
    Creates a new model by cloning the architecture of the original model, 
    copying its weights, and then pruning the weights by setting weights with an 
    absolute value less than the threshold to zero.

    Args:
    original_model (tf.keras.Model): The trained Keras model to be pruned.
    threshold (float): The magnitude threshold below which weights will be set to zero.

    Returns:
    tf.keras.Model: A new model with pruned weights.
    """
    # Clone the model architecture
    new_model = tf.keras.models.clone_model(original_model)
    
    # Compile the new model with dummy parameters (these can be set as needed later)
    new_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    
    # Copy the weights from the original model
    new_model.set_weights(original_model.get_weights())
    
    # Prune the weights of the new model
    for layer in new_model.layers:
        if hasattr(layer, 'weights'):  # Check if the layer has weights
            new_weights = []
            for w in layer.get_weights():
                # Apply the pruning condition
                pruned_weights = tf.where(tf.abs(w) < threshold, tf.zeros_like(w), w)
                new_weights.append(pruned_weights)
            layer.set_weights(new_weights)
    
    return new_model

In [4]:
import tensorflow as tf

def prune_model_based_on_gradients(original_model, accumulators, threshold=0.0001):
    """
    Creates a new model by cloning the original model, computes gradients based on provided
    data and labels, and prunes the weights based on these gradients where the absolute 
    gradient value is below the specified threshold.

    Args:
    original_model (tf.keras.Model): The trained model to be cloned and pruned.
    data (np.array): Input data used to compute gradients.
    labels (np.array): Corresponding labels for the data.
    threshold (float): Gradient magnitude threshold below which weights will be set to zero.

    Returns:
    tf.keras.Model: A new model with pruned weights based on gradient magnitudes.
    """
    # Clone the model architecture
    new_model = tf.keras.models.clone_model(original_model)
    
    # Compile the new model with dummy parameters (these can be set as needed later)
    new_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    
    # Copy the weights from the original model
    new_model.set_weights(original_model.get_weights())
    # flat_accumulators = tf.concat([tf.reshape(acc, [-1]) for acc in accumulators], axis=0)
    # gradients = tf.nn.softmax(flat_accumulators).numpy() 
    gradients = accumulators
    
    # Prune the weights based on gradients
    idx = 0  # This should index the gradients list
    for layer in new_model.layers:
        if layer.trainable_weights:
            new_weights = []
            for w, g in zip(layer.weights, gradients[idx:idx + len(layer.weights)]):
                if g is not None:
                    mask = tf.abs(g) > threshold
                    new_weight = w * tf.cast(mask, dtype=w.dtype)
                    new_weights.append(new_weight.numpy())
                else:
                    new_weights.append(w.numpy())
            layer.set_weights(new_weights)
            idx += len(layer.weights)

    return new_model

In [5]:
import tensorflow as tf
from tensorflow import keras

def train_with_gradient_accumulation(model, train_data, epochs):
    # Define the loss function
    loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    # Define the optimizer
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
    
    # Initialize gradient accumulators as tf.Variables
    accumulators = [tf.Variable(tf.zeros_like(w), trainable=False) for w in model.trainable_weights]
    
    for epoch in range(epochs):
        print("Epoch {}/{}".format(epoch + 1, epochs))
        for step, (x_batch_train, y_batch_train) in enumerate(train_data):
            with tf.GradientTape() as tape:
                logits = model(x_batch_train, training=True)  # Logits for this minibatch
                loss_value = loss_fn(y_batch_train, logits)
            
            grads = tape.gradient(loss_value, model.trainable_weights)
            
            # Debugging: Check max gradient
            max_grad = max([tf.reduce_max(tf.abs(g)).numpy() for g in grads if g is not None])
            print(f"Max gradient at step {step}: {max_grad}")
            
            # Update the gradient accumulators
            for acc, g in zip(accumulators, grads):
                acc.assign_add(tf.abs(g))
                
            optimizer.apply_gradients(zip(grads, model.trainable_weights))
            
            if step % 100 == 0:
                print("Training loss (for one batch) at step {}: {:.4f}".format(step, float(loss_value)))

    return accumulators

In [6]:
#Normal training with SGD optimizer
import tensorflow as tf
from tensorflow import keras

def train_model(model, train_data, epochs, learning_rate=0.01):
    # Define the loss function
    loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    # Define the optimizer
    optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)

    # Training loop
    for epoch in range(epochs):
        print("Epoch {}/{}".format(epoch + 1, epochs))
        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
            with tf.GradientTape() as tape:
                logits = model(x_batch_train, training=True)  # Logits for this minibatch
                loss_value = loss_fn(y_batch_train, logits)
            
            grads = tape.gradient(loss_value, model.trainable_weights)
            optimizer.apply_gradients(zip(grads, model.trainable_weights))
            
            if step % 100 == 0:
                print("Training loss (for one batch) at step {}: {:.4f}".format(step, float(loss_value)))


In [7]:
#Training with SGD + pruning in each epoch
import tensorflow as tf
from tensorflow import keras

def train_model_pruning(model, train_data, epochs, learning_rate=0.01, threshold = 0.05):
    # Define the loss function
    loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    # Define the optimizer
    optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)

    # Training loop
    for epoch in range(epochs):
        if epoch != 0:
            model = prune_weights_magnitude(model, threshold) 
            pruned_weights_mask = [tf.not_equal(w, 0.0) for w in model.get_weights()]
            
        print("Epoch {}/{}".format(epoch + 1, epochs))
        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
            with tf.GradientTape() as tape:
                logits = model(x_batch_train, training=True)  # Logits for this minibatch
                loss_value = loss_fn(y_batch_train, logits)
            
            grads = tape.gradient(loss_value, model.trainable_weights)
            optimizer.apply_gradients(zip(grads, model.trainable_weights))
               
            if step % 100 == 0:
                print("Training loss (for one batch) at step {}: {:.4f}".format(step, float(loss_value)))
                
        if epoch != 0:
            # Apply mask to keep pruned weights at zero
            for w, mask in zip(model.trainable_weights, pruned_weights_mask):
                w.assign(w * tf.cast(mask, tf.float32))



In [8]:
def retrain_model(model, train_data, pruned_weights_mask, epochs, learning_rate=0.001):
    """
    Retrains a given model using the specified training data and parameters,
    while applying a mask to keep pruned weights at zero.

    Args:
    model (tf.keras.Model): The model to be retrained.
    train_data (tf.data.Dataset): Dataset to use for training.
    pruned_weights_mask (list of np.array): Mask where True indicates weight should be kept at zero.
    epochs (int): Number of epochs to train for.
    learning_rate (float): Learning rate for the optimizer.
    """
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=['accuracy'])
    
    for epoch in range(epochs):
        print("Epoch {}/{}".format(epoch + 1, epochs))
        for x_batch_train, y_batch_train in train_data:
            with tf.GradientTape() as tape:
                logits = model(x_batch_train, training=True)  # Logits for this minibatch
                loss_value = loss_fn(y_batch_train, logits)
            
            grads = tape.gradient(loss_value, model.trainable_weights)
            optimizer.apply_gradients(zip(grads, model.trainable_weights))
            
            # Apply mask to keep pruned weights at zero
            for w, mask in zip(model.trainable_weights, pruned_weights_mask):
                w.assign(w * tf.cast(mask, tf.float32))
    
    return model


In [None]:
# Train a network as usual

# Load MNIST dataset
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# Normalize the input image so that each pixel value is between 0 and 1.
train_images = train_images / 255.0
test_images = test_images / 255.0

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32)  # Example batch size

# Define the model architecture.
model_test = keras.Sequential([
    keras.layers.InputLayer(input_shape=(28, 28)),
    keras.layers.Reshape(target_shape=(28, 28, 1)),
    keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
    keras.layers.MaxPooling2D(pool_size=(2, 2)),
    keras.layers.Flatten(),
    keras.layers.Dense(10)
])

epochs = 4
train_model(model_test, train_dataset, epochs)

In [10]:
model_test.compile(optimizer='sgd',  # Use the same optimizer as used in training
                loss='sparse_categorical_crossentropy',  # Use the appropriate loss function
                metrics=['accuracy']) 
_, baseline_model_test_accuracy = model_test.evaluate(
    test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_test_accuracy)
print('Non-zero weights in the model:', non_zero_weights_summary(model_test))

pruned_keras_file = r'C:\Users\Sever\ML_on_MCU\Pruning\models\base.h5'
keras.models.save_model(model_test, pruned_keras_file, include_optimizer=False)

Baseline test accuracy: 0.9330999851226807
Layer Name, Layer Type, Non-zero Weights
conv2d, Conv2D, 120
dense, Dense, 20290
Total non-zero weights in the model: 20410
Non-zero weights in the model: None


In [None]:
# Train a network with Pruning

# Load MNIST dataset
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# Normalize the input image so that each pixel value is between 0 and 1.
train_images = train_images / 255.0
test_images = test_images / 255.0

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32)  # Example batch size

# Define the model architecture.
model_test_ = keras.Sequential([
    keras.layers.InputLayer(input_shape=(28, 28)),
    keras.layers.Reshape(target_shape=(28, 28, 1)),
    keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
    keras.layers.MaxPooling2D(pool_size=(2, 2)),
    keras.layers.Flatten(),
    keras.layers.Dense(10)
])

epochs = 4
train_model_pruning(model_test_, train_dataset, epochs)

In [12]:
model_test_.compile(optimizer='sgd',  # Use the same optimizer as used in training
                loss='sparse_categorical_crossentropy',  # Use the appropriate loss function
                metrics=['accuracy']) 
_, baseline_model_test_accuracy = model_test_.evaluate(
    test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_test_accuracy)
print('Non-zero weights in the model:', non_zero_weights_summary(model_test))

pruned_keras_file = r'C:\Users\Sever\ML_on_MCU\Pruning\models\base.h5'
keras.models.save_model(model_test, pruned_keras_file, include_optimizer=False)

Baseline test accuracy: 0.9106000065803528
Layer Name, Layer Type, Non-zero Weights
conv2d, Conv2D, 120
dense, Dense, 20290
Total non-zero weights in the model: 20410
Non-zero weights in the model: None


In [14]:
#Prune based on magnitude
model_mag = prune_weights_magnitude(model_test, threshold=0.08)

In [15]:
pruned_weights_mask = [tf.not_equal(w, 0.0) for w in model_mag.get_weights()]
retrain_model(model_mag, train_dataset, pruned_weights_mask, epochs=2, learning_rate=0.001)

Epoch 1/2
Epoch 2/2


<keras.engine.sequential.Sequential at 0x219a0afb820>

In [25]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_mag)
tflite_mag = converter.convert()

# Load the TFLite model and allocate tensors
interpreter = tf.lite.Interpreter(model_content=tflite_mag)
interpreter.allocate_tensors()

# Get input and output tensors
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

test_images = test_images.astype("float32")

import numpy as np

# Function to evaluate the TFLite model on all test data
def evaluate_model(interpreter, test_images, test_labels):
    input_index = input_details[0]['index']
    output_index = output_details[0]['index']
    
    # Run predictions on every image in the "test" dataset.
    prediction_digits = []
    for test_image in test_images:
        # Preprocessing: add batch dimension and convert to float32 to match with
        # the model's input data format.
        test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
        interpreter.set_tensor(input_index, test_image)
        
        # Run inference
        interpreter.invoke()
        
        # Postprocessing: remove batch dimension and find the predicted digit
        output = interpreter.tensor(output_index)
        digit = np.argmax(output()[0])
        prediction_digits.append(digit)

    # Compare prediction results with ground truth labels to calculate accuracy
    accurate_count = 0
    for index in range(len(prediction_digits)):
        if prediction_digits[index] == test_labels[index]:
            accurate_count += 1
    accuracy = accurate_count * 100 / len(prediction_digits)
    return accuracy

# Evaluate the model
accuracy = evaluate_model(interpreter, test_images, test_labels)
print('TensorFlow Lite model accuracy:', accuracy)

with open('models/mag.tflite', 'wb') as f:
    f.write(tflite_mag)





INFO:tensorflow:Assets written to: C:\Users\Sever\AppData\Local\Temp\tmpf9w8y6wo\assets


INFO:tensorflow:Assets written to: C:\Users\Sever\AppData\Local\Temp\tmpf9w8y6wo\assets


TensorFlow Lite model accuracy: 94.61


In [26]:
model_mag.compile(optimizer='sgd',  # Use the same optimizer as used in training
                loss='sparse_categorical_crossentropy',  # Use the appropriate loss function
                metrics=['accuracy']) 
_, baseline_model_mag_accuracy = model_mag.evaluate(
    test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_mag_accuracy)
print('Non-zero weights in the model:', non_zero_weights_summary(model_mag))
pruned_keras_file = r'C:\Users\Sever\ML_on_MCU\Pruning\models\mag.h5'
keras.models.save_model(model_mag, pruned_keras_file, include_optimizer=False)

Baseline test accuracy: 0.9460999965667725
Layer Name, Layer Type, Non-zero Weights
conv2d, Conv2D, 88
dense, Dense, 812
Total non-zero weights in the model: 900
Non-zero weights in the model: None


In [28]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += '\nunsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

    # Declare C variable
    c_str += 'unsigned char ' + var_name + '[] = {'
    hex_array = []
    for i, val in enumerate(hex_data) :

        # Construct string from hex
        hex_str = format(val, '#04x')

        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ','
        if (i + 1) % 12 == 0:
            hex_str += '\n '
        hex_array.append(hex_str)

    # Add closing brace
    c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str

In [27]:
c_model_name = 'mag'
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_mag, c_model_name))

In [16]:
# Load MNIST dataset
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# Normalize the input image so that each pixel value is between 0 and 1.
train_images = train_images / 255.0
test_images = test_images / 255.0

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32)  # Example batch size

# Define the model architecture.
model_test_2 = keras.Sequential([
    keras.layers.InputLayer(input_shape=(28, 28)),
    keras.layers.Reshape(target_shape=(28, 28, 1)),
    keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
    keras.layers.MaxPooling2D(pool_size=(2, 2)),
    keras.layers.Flatten(),
    keras.layers.Dense(10)
])

epochs = 4
accumulators = train_with_gradient_accumulation(model_test_2
                                                , train_dataset, epochs)

Epoch 1/4
Max gradient at step 0: 0.10940287262201309
Training loss (for one batch) at step 0: 2.2857
Max gradient at step 1: 0.14899222552776337
Max gradient at step 2: 0.14264975488185883
Max gradient at step 3: 0.1266109198331833
Max gradient at step 4: 0.1634834259748459
Max gradient at step 5: 0.10963931679725647
Max gradient at step 6: 0.1511652171611786
Max gradient at step 7: 0.09271976351737976
Max gradient at step 8: 0.11512503772974014
Max gradient at step 9: 0.12011048197746277
Max gradient at step 10: 0.11539633572101593
Max gradient at step 11: 0.138559028506279
Max gradient at step 12: 0.07779813557863235
Max gradient at step 13: 0.1136104092001915
Max gradient at step 14: 0.08423705399036407
Max gradient at step 15: 0.07488560676574707
Max gradient at step 16: 0.18862676620483398
Max gradient at step 17: 0.0919169932603836
Max gradient at step 18: 0.07644332200288773
Max gradient at step 19: 0.14812125265598297
Max gradient at step 20: 0.07693130522966385
Max gradient a

In [107]:
model_acc = prune_model_based_on_gradients(model_test_2, accumulators, threshold=100)

In [111]:
pruned_weights_mask = [tf.not_equal(w, 0.0) for w in model_acc.get_weights()]
retrain_model(model_acc, train_dataset, pruned_weights_mask, epochs=2, learning_rate=0.001)

Epoch 1/2
Epoch 2/2


<keras.engine.sequential.Sequential at 0x1d4461a73a0>

In [112]:
model_acc.compile(optimizer='sgd',  # Use the same optimizer as used in training
                loss='sparse_categorical_crossentropy',  # Use the appropriate loss function
                metrics=['accuracy']) 
_, baseline_model_acc_accuracy = model_acc.evaluate(
    test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_acc_accuracy)
print('Non-zero weights in the model:', non_zero_weights_summary(model_acc))
pruned_keras_file = r'C:\Users\Sever\ML_on_MCU\Pruning\models\grad.h5'
keras.models.save_model(model_acc, pruned_keras_file, include_optimizer=False)

Baseline test accuracy: 0.9319999814033508
Layer Name, Layer Type, Non-zero Weights
conv2d_1, Conv2D, 53
dense_1, Dense, 2885
Total non-zero weights in the model: 2938
Non-zero weights in the model: None


## Testing Network for the MCU

In [155]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

# Load MNIST dataset
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# Filter the dataset to keep only digits '0' and '1'
train_filter = np.where((train_labels == 0) | (train_labels == 1))
test_filter = np.where((test_labels == 0) | (test_labels == 1))

train_images, train_labels = train_images[train_filter], train_labels[train_filter]
test_images, test_labels = test_images[test_filter], test_labels[test_filter]

# Normalize the input image so that each pixel value is between 0 and 1
# Cast to float32 to ensure data type consistency
train_images = (train_images / 255.0).astype(np.float32)
test_images = (test_images / 255.0).astype(np.float32)

# Create TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.map(lambda x, y: (tf.expand_dims(x, -1), y))  # Reshape to include the channel dimension
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32)


# Define the model
model = keras.Sequential([
    keras.layers.Conv2D(filters=8, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    keras.layers.MaxPooling2D(pool_size=(2, 2)),
    keras.layers.Flatten(),
    keras.layers.Dense(1, activation='sigmoid')  # Binary output
])

# Compile the model with a binary classification loss function
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Train the model
model.fit(train_dataset, epochs=4)

# Representative dataset to enable quantization
def representative_dataset():
    for data in train_dataset.take(100):  # 100 batches of the dataset
        yield [data[0].numpy()]

# Convert the Keras model to a TensorFlow Lite model with quantization
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
converter.representative_dataset = representative_dataset

# Convert the model
tflite_model = converter.convert()

# Save the converted model to a file
tflite_model_name = 'mnist_binary_classifier'
with open(tflite_model_name + '.tflite', 'wb') as f:
    f.write(tflite_model)


Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4




INFO:tensorflow:Assets written to: C:\Users\Sever\AppData\Local\Temp\tmpp2g7xfck\assets


INFO:tensorflow:Assets written to: C:\Users\Sever\AppData\Local\Temp\tmpp2g7xfck\assets


In [156]:
model.summary()
score = model.evaluate(test_images, test_labels)

Model: "sequential_20"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_20 (Conv2D)          (None, 26, 26, 8)         80        
                                                                 
 max_pooling2d_20 (MaxPoolin  (None, 13, 13, 8)        0         
 g2D)                                                            
                                                                 
 flatten_20 (Flatten)        (None, 1352)              0         
                                                                 
 dense_20 (Dense)            (None, 1)                 1353      
                                                                 
Total params: 1,433
Trainable params: 1,433
Non-trainable params: 0
_________________________________________________________________


In [157]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += '\nunsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

    # Declare C variable
    c_str += 'unsigned char ' + var_name + '[] = {'
    hex_array = []
    for i, val in enumerate(hex_data) :

        # Construct string from hex
        hex_str = format(val, '#04x')

        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ','
        if (i + 1) % 12 == 0:
            hex_str += '\n '
        hex_array.append(hex_str)

    # Add closing brace
    c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str

In [158]:
c_model_name = 'test'
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_model, c_model_name))

In [159]:
tflite_interpreter = tf.lite.Interpreter(model_path=tflite_model_name + '.tflite')
tflite_interpreter.allocate_tensors()
input_details = tflite_interpreter.get_input_details()
output_details = tflite_interpreter.get_output_details()

print("== Input details ==")
print("name:", input_details[0]['name'])
print("shape:", input_details[0]['shape'])
print("type:", input_details[0]['dtype'])

print("\n== Output details ==")
print("name:", output_details[0]['name'])
print("shape:", output_details[0]['shape'])
print("type:", output_details[0]['dtype'])

== Input details ==
name: serving_default_conv2d_20_input:0
shape: [ 1 28 28  1]
type: <class 'numpy.int8'>

== Output details ==
name: StatefulPartitionedCall:0
shape: [1 1]
type: <class 'numpy.int8'>


In [160]:
import numpy as np
import tensorflow as tf

# Assuming 'tflite_interpreter' is correctly loaded and initialized

input_details = tflite_interpreter.get_input_details()
output_details = tflite_interpreter.get_output_details()

# Allocate tensors once outside the loop
tflite_interpreter.allocate_tensors()

predictions = []
for i in range(len(test_images)):
    # Get the test image and preprocess it
    test_image = test_images[i]
    test_image = np.expand_dims(test_image, axis=-1)  # Make sure it's [28, 28, 1]
    test_image = test_image.astype(np.float32)  # Ensure the type matches the expected float32

    # Normalize and quantize the image
    input_scale, input_zero_point = input_details[0]['quantization']
    # test_image = test_image #/ 255.0  # Normalize the image from [0, 255] to [0, 1]
    # test_image = test_image / input_scale + input_zero_point
    test_image = np.expand_dims(test_image, axis=0).astype(input_details[0]['dtype'])  # Reshape to [1, 28, 28, 1]

    # Set the tensor to the input of the model
    tflite_interpreter.set_tensor(input_details[0]['index'], test_image)

    # Run the model on the input data
    tflite_interpreter.invoke()

    # Retrieve the output of the model
    output_data = tflite_interpreter.get_tensor(output_details[0]['index'])
    predictions.append(output_data[0])

# Handle predictions as needed (e.g., post-processing, thresholding)


In [162]:
sum = 0
for i in range(len(predictions)):
    if (predictions[i] == test_labels[i]):
        sum = sum + 1
accuracy_score = sum / 100
print("Accuracy of quantized to int8 model is {}%".format(accuracy_score*100))
print("Compared to float32 accuracy of {}%".format(score[1]*100))
print("We have a change of {}%".format((accuracy_score-score[1])*100))

Accuracy of quantized to int8 model is 0.0%
Compared to float32 accuracy of 99.90543723106384%
We have a change of -99.90543723106384%


In [163]:
# Select an image from the test set
index = 0  # Change this index to test different images
image_to_test = test_images[index]

# Preprocess the image
image_to_test = image_to_test / 255.0  # Normalize the image
image_to_test = np.expand_dims(image_to_test, axis=-1)  # Add channel dimension
image_to_test = np.expand_dims(image_to_test, axis=0)  # Add batch dimension (model expects a batch)

# Perform inference
predictions = model.predict(image_to_test)
predicted_label = (predictions > 0.5).astype(int)

# Print the prediction result
print(f"Predicted label: {predicted_label[0][0]} (1 for '1' and 0 for '0')")
print(f"Actual label: {test_labels[index]}")






Predicted label: 1 (1 for '1' and 0 for '0')
Actual label: 1


In [164]:
import tensorflow as tf
import numpy as np

# Load MNIST dataset
(train_images, train_labels), _ = tf.keras.datasets.mnist.load_data()

# Normalize the images to [0, 1]
train_images = train_images / 255.0

# Select a random image from the training set
random_idx = np.random.randint(0, len(train_images))
random_image = train_images[random_idx]

# Since the image needs to be a flat array of floats for TFLite Micro,
# we flatten the 28x28 image into a 1D array of 784 elements
flat_image = random_image.flatten()

# Optionally, print the image shape and data type to confirm
print("Shape:", flat_image.shape)
print("Data Type:", flat_image.dtype)

# Convert image data to float32 if not already
flat_image = flat_image.astype('float32')


def save_image_to_c_array(flat_image, filename="mnist_image.h"):
    with open(filename, "w") as file:
        file.write("static const float mnist_image[784] = {\n")
        for i, pixel in enumerate(flat_image):
            file.write(f"{pixel:.6f}")
            if i < len(flat_image) - 1:
                file.write(", ")
            if (i + 1) % 10 == 0:
                file.write("\n")
        file.write("\n};\n")

# Call the function to save the image
save_image_to_c_array(flat_image)


Shape: (784,)
Data Type: float64


In [147]:
c_model_name = 'test'
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_mag, c_model_name))

In [148]:
import numpy as np

int_image = np.round(flat_image * 255)

# Now convert from [0, 255] to [-128, 127]
int_image = (int_image - 128).astype(np.int8)

# Function to save the array to a C header file
def save_image_to_c_array(int_image, filename="mnist_image.h"):
    with open(filename, "w") as file:
        file.write("signed char mnist_image[784] = {\n")
        for i in range(0, len(int_image), 12):  # Process in chunks of 12 for formatting
            line = ", ".join(f"{int(x)}" for x in int_image[i:i+12])
            if i + 12 < len(int_image):
                file.write("    " + line + ",\n")
            else:
                file.write("    " + line + "\n")
        file.write("};\n")

save_image_to_c_array(int_image)  # Call the function


## Apply techniques to standard networks VGGNet and AlexNet

In [24]:
from tensorflow.keras.applications import MobileNetV2

model_mobile = MobileNetV2(weights='imagenet', include_top=True)

model_mobile.summary()

Model: "mobilenetv2_1.00_224"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_8 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_8[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                              

In [None]:
epochs = 4
train_model(model_mobile, train_dataset, epochs)

## Fine-tune pre-trained model with pruning


### Define the model

You will apply pruning to the whole model and see this in the model summary.

In this example, you start the model with 50% sparsity (50% zeros in weights)
and end with 80% sparsity.

In the [comprehensive guide](https://www.tensorflow.org/model_optimization/guide/pruning/comprehensive_guide.md), you can see how to prune some layers for model accuracy improvements.

In [30]:
import tensorflow_model_optimization as tfmot

prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude

# Train a network as usual

# Load MNIST dataset
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# Normalize the input image so that each pixel value is between 0 and 1.
train_images = train_images / 255.0
test_images = test_images / 255.0

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32)  # Example batch size

# Define the model architecture.
model = keras.Sequential([
    keras.layers.InputLayer(input_shape=(28, 28)),
    keras.layers.Reshape(target_shape=(28, 28, 1)),
    keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
    keras.layers.MaxPooling2D(pool_size=(2, 2)),
    keras.layers.Flatten(),
    keras.layers.Dense(10)
])

# Compute end step to finish pruning after 2 epochs.
batch_size = 128
epochs = 2
validation_split = 0.1 # 10% of training set will be used for validation set. 

num_images = train_images.shape[0] * (1 - validation_split)
end_step = np.ceil(num_images / batch_size).astype(np.int32) * epochs

# Define model for pruning.
pruning_params = {
      'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.50,
                                                               final_sparsity=0.80,
                                                               begin_step=0,
                                                               end_step=end_step)
}

model_for_pruning = prune_low_magnitude(model, **pruning_params)

# `prune_low_magnitude` requires a recompile.
model_for_pruning.compile(optimizer='adam',
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

model_for_pruning.summary()

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 prune_low_magnitude_reshape  (None, 28, 28, 1)        1         
 _3 (PruneLowMagnitude)                                          
                                                                 
 prune_low_magnitude_conv2d_  (None, 26, 26, 12)       230       
 3 (PruneLowMagnitude)                                           
                                                                 
 prune_low_magnitude_max_poo  (None, 13, 13, 12)       1         
 ling2d_3 (PruneLowMagnitude                                     
 )                                                               
                                                                 
 prune_low_magnitude_flatten  (None, 2028)             1         
 _3 (PruneLowMagnitude)                                          
                                                      

### Train and evaluate the model against baseline

Fine tune with pruning for two epochs.

`tfmot.sparsity.keras.UpdatePruningStep` is required during training, and `tfmot.sparsity.keras.PruningSummaries` provides logs for tracking progress and debugging.

In [31]:
logdir = tempfile.mkdtemp()

callbacks = [
  tfmot.sparsity.keras.UpdatePruningStep(),
  tfmot.sparsity.keras.PruningSummaries(log_dir=logdir),
]
  
model_for_pruning.fit(train_images, train_labels,
                  batch_size=batch_size, epochs=epochs, validation_split=validation_split,
                  callbacks=callbacks)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x219a0c4cfa0>

For this example, there is minimal loss in test accuracy after pruning, compared to the baseline.

In [32]:
_, model_for_pruning_accuracy = model_for_pruning.evaluate(
   test_images, test_labels, verbose=0)

# print('Baseline test accuracy:', baseline_model_accuracy) 
print('Pruned test accuracy:', model_for_pruning_accuracy)

Pruned test accuracy: 0.9276000261306763


The logs show the progression of sparsity on a per-layer basis.

In [33]:
#docs_infra: no_execute
%tensorboard --logdir={logdir}

ERROR: Could not find `tensorboard`. Please ensure that your PATH
contains an executable `tensorboard` program, or explicitly specify
the path to a TensorBoard binary by setting the `TENSORBOARD_BINARY`
environment variable.

For non-Colab users, you can see [the results of a previous run](https://tensorboard.dev/experiment/sRQnrycaTMWQOaswXzClYA/#scalars&_smoothingWeight=0) of this code block on [TensorBoard.dev](https://tensorboard.dev/).

## Create 3x smaller models from pruning

Both `tfmot.sparsity.keras.strip_pruning` and applying a standard compression algorithm (e.g. via gzip) are necessary to see the compression
benefits of pruning.

*   `strip_pruning` is necessary since it removes every tf.Variable that pruning only needs during training, which would otherwise add to model size during inference
*   Applying a standard compression algorithm is necessary since the serialized weight matrices are the same size as they were before pruning. However, pruning makes most of the weights zeros, which is
added redundancy that algorithms can utilize to further compress the model.

First, create a compressible model for TensorFlow.

In [34]:
model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)

pruned_keras_file = r'C:\Users\Sever\ML_on_MCU\Pruning\models\default.h5'
keras.models.save_model(model_for_export, pruned_keras_file, include_optimizer=False)





Then, create a compressible model for TFLite.

In [36]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
pruned_tflite_model = converter.convert()

pruned_tflite_file = r'C:\Users\Sever\ML_on_MCU\Pruning\models\MFCC.tflite'

with open(pruned_tflite_file, 'wb') as f:
  f.write(pruned_tflite_model)

print('Saved pruned TFLite model to:', pruned_tflite_file)



INFO:tensorflow:Assets written to: C:\Users\Sever\AppData\Local\Temp\tmpmvxstglm\assets


INFO:tensorflow:Assets written to: C:\Users\Sever\AppData\Local\Temp\tmpmvxstglm\assets


Saved pruned TFLite model to: C:\Users\Sever\ML_on_MCU\Pruning\models\MFCC.tflite


Define a helper function to actually compress the models via gzip and measure the zipped size.

In [38]:
c_model_name = 'MFCC'
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_mag, c_model_name))

In [11]:
def get_gzipped_model_size(file):
  # Returns size of gzipped model, in bytes.
  import os
  import zipfile

  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)

Compare and see that the models are 3x smaller from pruning.

In [12]:
print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped pruned Keras model: %.2f bytes" % (get_gzipped_model_size(pruned_keras_file)))
print("Size of gzipped pruned TFlite model: %.2f bytes" % (get_gzipped_model_size(pruned_tflite_file)))

Size of gzipped baseline Keras model: 78239.00 bytes
Size of gzipped pruned Keras model: 25908.00 bytes
Size of gzipped pruned TFlite model: 24848.00 bytes


## Create a 10x smaller model from combining pruning and quantization

You can apply post-training quantization to the pruned model for additional benefits.

In [13]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_and_pruned_tflite_model = converter.convert()

_, quantized_and_pruned_tflite_file = tempfile.mkstemp('.tflite')

with open(quantized_and_pruned_tflite_file, 'wb') as f:
  f.write(quantized_and_pruned_tflite_model)

print('Saved quantized and pruned TFLite model to:', quantized_and_pruned_tflite_file)

print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped pruned and quantized TFlite model: %.2f bytes" % (get_gzipped_model_size(quantized_and_pruned_tflite_file)))

INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp35zwmyql/assets


INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp35zwmyql/assets


W0000 00:00:1709986618.942100   10506 tf_tfl_flatbuffer_helpers.cc:390] Ignored output_format.
W0000 00:00:1709986618.942130   10506 tf_tfl_flatbuffer_helpers.cc:393] Ignored drop_control_dependency.


Saved quantized and pruned TFLite model to: /tmpfs/tmp/tmp3v6lm0h4.tflite
Size of gzipped baseline Keras model: 78239.00 bytes
Size of gzipped pruned and quantized TFlite model: 8064.00 bytes


## See persistence of accuracy from TF to TFLite

Define a helper function to evaluate the TF Lite model on the test dataset.

In [14]:
import numpy as np

def evaluate_model(interpreter):
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]

  # Run predictions on ever y image in the "test" dataset.
  prediction_digits = []
  for i, test_image in enumerate(test_images):
    if i % 1000 == 0:
      print('Evaluated on {n} results so far.'.format(n=i))
    # Pre-processing: add batch dimension and convert to float32 to match with
    # the model's input data format.
    test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
    interpreter.set_tensor(input_index, test_image)

    # Run inference.
    interpreter.invoke()

    # Post-processing: remove batch dimension and find the digit with highest
    # probability.
    output = interpreter.tensor(output_index)
    digit = np.argmax(output()[0])
    prediction_digits.append(digit)

  print('\n')
  # Compare prediction results with ground truth labels to calculate accuracy.
  prediction_digits = np.array(prediction_digits)
  accuracy = (prediction_digits == test_labels).mean()
  return accuracy

You evaluate the pruned and quantized model and see that the accuracy from TensorFlow persists to the TFLite backend.

In [15]:
interpreter = tf.lite.Interpreter(model_content=quantized_and_pruned_tflite_model)
interpreter.allocate_tensors()

test_accuracy = evaluate_model(interpreter)

print('Pruned and quantized TFLite test_accuracy:', test_accuracy)
print('Pruned TF test accuracy:', model_for_pruning_accuracy)

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


Evaluated on 0 results so far.
Evaluated on 1000 results so far.


Evaluated on 2000 results so far.
Evaluated on 3000 results so far.


Evaluated on 4000 results so far.
Evaluated on 5000 results so far.


Evaluated on 6000 results so far.
Evaluated on 7000 results so far.


Evaluated on 8000 results so far.
Evaluated on 9000 results so far.




Pruned and quantized TFLite test_accuracy: 0.9691
Pruned TF test accuracy: 0.9686999917030334


## Conclusion

In this tutorial, you saw how to create sparse models with the TensorFlow Model Optimization Toolkit API for both TensorFlow and TFLite. You 
then combined pruning with post-training quantization for additional benefits.

You created a 10x smaller model for MNIST, with minimal accuracy difference.

We encourage you to try this new capability, which can be particularly important for deployment in resource-constrained environments.
