# Machine Learning for Embedded Systems
## Home assignment 2 - training NN model built in Keras
### Ondřej Schejbal
* Student code: 214308IV
* UNI-ID: onsche

In [1]:
import tensorflow as tf
import pathlib
from matplotlib import pyplot
import pickle
import os
import numpy as np
import tensorflow_model_optimization as tfmot
from tensorflow_model_optimization.python.core.quantization.keras import quantize

### Load MNIST dataset, normalize values and show few images from the dataset

In [2]:
# Load and prepare MNIST dataset
def loadMnistDataset():
    mnist = tf.keras.datasets.mnist
    (x_train, y_train) , (x_test, y_test) = mnist.load_data()
    # Normalize
    x_train = x_train / 255.0
    x_test = x_test / 255.0
    return x_train, y_train, x_test, y_test

In [3]:
x_train, y_train, x_test, y_test = loadMnistDataset()

In [4]:
# print('Train: X=%s, y=%s' % (x_train.shape, y_train.shape))
# print('Test: X=%s, y=%s' % (x_test.shape, y_test.shape))
# # plot first few images from the dataset
# for i in range(9):
#     pyplot.subplot(330 + 1 + i)
#     pyplot.imshow(x_train[i], cmap=pyplot.get_cmap('gray'))
# pyplot.show()

## First we construct and run the initial NN model

In [5]:
def prepareInitialModel():
    model = tf.keras.models.Sequential()
    # (dimensionality of the output space, activation)
    model.add(tf.keras.layers.Flatten(input_shape=(28, 28)))
    model.add(tf.keras.layers.Dense(80, activation='elu'))
    model.add(tf.keras.layers.Dense(60, activation='elu'))
    # Dropout layer randomly sets input units to 0 with a frequency of rate
    #    at each step during training time, which helps prevent overfitting
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Dense(10)) # empty activation == no activation fction
    return model

# modified model (for testing continuous steps)
# def prepareInitialModel():
#     model = tf.keras.models.Sequential()
#     model.add(tf.keras.layers.Flatten(input_shape=(28, 28)))
#     # model.add(tf.keras.layers.Dense(80, activation='elu'))
#     # model.add(tf.keras.layers.Dense(60, activation='elu'))
#     model.add(tf.keras.layers.Dense(50, activation='relu'))
#     model.add(tf.keras.layers.Dropout(0.2))
#     model.add(tf.keras.layers.Dense(10))
#     return model

In [6]:
initialModel = prepareInitialModel()
initialModel.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten (Flatten)            (None, 784)               0         
_________________________________________________________________
dense (Dense)                (None, 80)                62800     
_________________________________________________________________
dense_1 (Dense)              (None, 60)                4860      
_________________________________________________________________
dropout (Dropout)            (None, 60)                0         
_________________________________________________________________
dense_2 (Dense)              (None, 10)                610       
Total params: 68,270
Trainable params: 68,270
Non-trainable params: 0
_________________________________________________________________


In [7]:
def getTotalParamsCountFromModelSummary(model, print_it=True):
    paramsDetails = []
    model.summary(print_fn=lambda x: paramsDetails.append(x))
    paramsDetails = paramsDetails[-4].replace(",", "").split()
    paramCount = [int(s) for s in paramsDetails if s.isdigit()]
    if len(paramCount) > 1:
        raise "Unexpected length of paramCount"
    if print_it:
        print('Total params:', paramCount[0])
    return paramCount[0]

In [8]:
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metrics_arr = ['accuracy']
learning_rate = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

In [9]:
# loss_initial = loss_fn(y_train[:1], predictions).numpy()
# print('Untrained model inital loss: ' + str(loss_initial))

#### Train model

In [10]:
def fitModel(x_train, y_train, model, optimizer, loss_fn, metrics_arr, epochs = 5, callbacks = None):
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=metrics_arr)
    model.fit(x_train, y_train, epochs=epochs, callbacks=callbacks)
    return model

In [11]:
initialModel = fitModel(x_train, y_train, initialModel, optimizer, loss_fn, metrics_arr)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [12]:
def roundLossAndAccuracy(loss, accuracy, ndigits=5):
    return round(loss, ndigits), round(accuracy, ndigits)

In [13]:
# Evaluate model performance
initialModel_loss, initialModel_accuracy = initialModel.evaluate(x_test, y_test)
initialModel_loss, initialModel_accuracy = roundLossAndAccuracy(initialModel_loss, initialModel_accuracy)



### Save model as tf Lite model

In [14]:
def saveModelAsTFL(model, fileName):
    # Convert the model
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    tflite_model = converter.convert()

    with open(fileName, 'wb') as f:
        f.write(tflite_model)

In [15]:
initialModelPath = 'initialModel.tflite'

In [16]:
saveModelAsTFL(initialModel, initialModelPath);

INFO:tensorflow:Assets written to: C:\Users\schejond\AppData\Local\Temp\tmpi54ao81d\assets


## I have decided to focus on optimizing the memory size of the prepared model

The target is to minimize the size of the model while also keeping the accuracy of the initial model as high as possible.

The model memory size is the **total number of parameters of the model + it's memory allocation size in KB when saved as TensorFlow Lite model**.

In [17]:
def prepareMyModel():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Flatten(input_shape=(28, 28)))
    
    # # code for pruning only 1 specific layer
    # mdl = tf.keras.layers.Dense(50, activation='relu')
    # toAdd = tfmot.sparsity.keras.prune_low_magnitude(mdl)
    # model.add(toAdd)
    
    model.add(tf.keras.layers.Dense(50, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Dense(10))
    return model

In [18]:
myModel = prepareMyModel()
myModel.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten_1 (Flatten)          (None, 784)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 50)                39250     
_________________________________________________________________
dropout_1 (Dropout)          (None, 50)                0         
_________________________________________________________________
dense_4 (Dense)              (None, 10)                510       
Total params: 39,760
Trainable params: 39,760
Non-trainable params: 0
_________________________________________________________________


### Pruning

In [19]:
prune_model = False

def apply_pruning_to_dense(layer):
    # other layers for pruning can be also filtered here
    if isinstance(layer, tf.keras.layers.Dense):
        return tfmot.sparsity.keras.prune_low_magnitude(layer)
    return layer

if prune_model:
    myModel = tf.keras.models.clone_model(myModel,
                                          clone_function=apply_pruning_to_dense)

In [20]:
pruning_schedule = tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.5,
                                                        final_sparsity=0.8,
                                                        begin_step=0,
                                                        end_step=np.ceil(np.int32(len(x_train / 32)) * 5))
# pruning_schedule = tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.0, final_sparsity=0.5,
#                                                         begin_step=2000, end_step=4000)

# logdir = tempfile.mkdtemp() 
pruningCallbacks = [
  tfmot.sparsity.keras.UpdatePruningStep(),
  # tfmot.sparsity.keras.PruningSummaries(log_dir=logdir)
]

if prune_model:
    myModel = tfmot.sparsity.keras.prune_low_magnitude(myModel, pruning_schedule=pruning_schedule)

#### QUANTIZATION

In [22]:
quantize_model = True
def applyQuantizationToSomeLayers(layer):
    # here I experimented with only adding some type of layers
    # if isinstance(layer, tf.keras.layers.Dense):
    #     return tfmot.quantization.keras.quantize_annotate_layer(layer)
    # return layer
    return tfmot.quantization.keras.quantize_annotate_layer(layer)

def quantizeModel(model, print_summary = True):
    annotated_model = tf.keras.models.clone_model(model, clone_function=applyQuantizationToSomeLayers)
    quantized_model = tfmot.quantization.keras.quantize_apply(annotated_model)
    if print_summary:
        quantized_model.summary()
    return quantized_model
    
if quantize_model:
    quantizeModel(myModel, False)

In [23]:
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# loss_fn = tf.keras.losses.BinaryCrossentropy()
metrics_arr = ['accuracy']
learning_rate = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

In [24]:
myModel = fitModel(x_train, y_train, myModel, optimizer, loss_fn, metrics_arr, 5, pruningCallbacks)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [25]:
# evaluate model performance
model_loss, model_accuracy = myModel.evaluate(x_test, y_test)
model_loss, model_accuracy = roundLossAndAccuracy(model_loss, model_accuracy)



In [26]:
# we need to strip_pruning before saving
if prune_model:
    myModel = tfmot.sparsity.keras.strip_pruning(myModel)

In [27]:
myModelPath = 'myModel.tflite'
saveModelAsTFL(myModel, myModelPath);

INFO:tensorflow:Assets written to: C:\Users\schejond\AppData\Local\Temp\tmpb4jzcx4m\assets


INFO:tensorflow:Assets written to: C:\Users\schejond\AppData\Local\Temp\tmpb4jzcx4m\assets


### Compare file size of initial model and the final model

In [28]:
def printModelMemorySize(model, modelPath):
    print(getTotalParamsCountFromModelSummary(model, False), '+', os.stat(modelPath).st_size/1024, 'KB');
    print('Total model size:', getTotalParamsCountFromModelSummary(model, False) + os.stat(modelPath).st_size/1024, '\n')

In [29]:
print('---------------------------------------------------------')
print('Initial model stats:\n')
printModelMemorySize(initialModel, initialModelPath)
print('Model loss:', initialModel_loss, 'Model prediction accuracy:', initialModel_accuracy)
print('---------------------------------------------------------')
print('My model stats:\n')
printModelMemorySize(myModel, myModelPath)
print('Model loss:', model_loss, 'Model prediction accuracy:', model_accuracy)
print('---------------------------------------------------------')
print('Model with better accuracy:',
      'myModel' if model_accuracy > initialModel_accuracy else 'initialModel', round(abs(model_accuracy - initialModel_accuracy),4))
myModelWeight = getTotalParamsCountFromModelSummary(myModel, False) + os.stat(myModelPath).st_size/1024
initialModelWeight = getTotalParamsCountFromModelSummary(initialModel, False) + os.stat(initialModelPath).st_size/1024
print('Model with less weight:',
      'myModel' if myModelWeight < initialModelWeight else 'initialModel', round(abs(myModelWeight - initialModelWeight), 4))

---------------------------------------------------------
Initial model stats:

68270 + 71.125 KB
Total model size: 68341.125 

Model loss: 0.08969 Model prediction accuracy: 0.9728
---------------------------------------------------------
My model stats:

39760 + 42.078125 KB
Total model size: 39802.078125 

Model loss: 0.10195 Model prediction accuracy: 0.9696
---------------------------------------------------------
Model with better accuracy: initialModel 0.0032
Model with less weight: myModel 28539.0469


### Accuracy on my own data

I have prepared my own handwritten numbers 0-9

In the cells below I have evaluated them on my final model and shown the prediction accuracy.

In [30]:
from PIL import Image, ImageOps

In [31]:
# load images to features and labels
def load_images_to_data(x_data, y_data):
    list_of_files = os.listdir("MyNumbers")
    for file in list_of_files:
        image_file_name = os.path.join("MyNumbers", file)
        if ".png" in image_file_name:
            img = Image.open(image_file_name).convert("L")
            img = np.resize(img, (28,28))
            im2arr = np.array(img)
            im2arr = im2arr.reshape(1,28,28)
            
            if len(x_data) == 0:
                x_data = im2arr
            else:
                x_data = np.append(x_data, im2arr, axis=0)
            
            if len(y_data) == 0:
                y_data = [np.uint8(file[0])]
            else:
                y_data = np.append(y_data, [np.uint8(file[0])], axis=0)
    return x_data, y_data

In [32]:
x_test_my = []
y_test_my = []

x_test_my, y_test_my = load_images_to_data(x_test_my, y_test_my)

x_test_my = x_test_my / 255.0

In [33]:
# evaluate my final model's performance on my handwritten numbers
myModel.evaluate(x_test_my, y_test_my);



In [34]:
initialModel.evaluate(x_test_my, y_test_my);

