# TyBox incremental learning with CNAS baseline model


This Notebook shows the use of PyBox (www.github.com/pavmassimo/PyBox) for the generation of a quantized incremental solution able to address an incremental class learning scenario.

The CIFAR-10 dataset was used to prove the capabilities of the toolbox. Two classes in the dataset have been omitted from the initial training to simulate an incremental use case.

A full description of the experiment is provided in the paper TyBox.


In [78]:
!git clone https://github.com/pavmassimo/TyBox/tree/feature-extractor-quantization.git

In [79]:
!pip install matplotlib

In [80]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
from keras import layers
# from keras.utils import np_utils
from keras import backend as K 
K.set_image_data_format('channels_last')

from sklearn.preprocessing import OneHotEncoder

import sys
# insert at 1, 0 is the script path (or '' in REPL)
sys.path.append('.//TyBox')

# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Dense

from TyBox import TyBox
from TyBox import profiler

from matplotlib import pyplot as plt

Download and preparation of the dataset:

In [81]:
from keras.datasets import cifar10
# Load data 
(train_examples, train_labels), (test_examples, test_labels) = cifar10.load_data()

In [82]:
from keras.preprocessing.image import ImageDataGenerator

train_examples = train_examples.astype('float32')
test_examples = test_examples.astype('float32')

train_examples = train_examples / 255.0
test_examples = test_examples / 255.0

# # One hot encode target values
# train_labels = np_utils.to_categorical(train_labels)
# test_labels = np_utils.to_categorical(test_labels)

train_datagen = ImageDataGenerator(
    #   rotation_range=20,
      width_shift_range=0.1,
      height_shift_range=0.1,
    #   shear_range=0.2,
    #   zoom_range=0.2,
      horizontal_flip=True
    #   fill_mode='nearest'
)

# train_datagen.fit(train_examples)

# # Flow training images in batches of 96 using train_datagen generator
# train_generator = train_datagen.flow(train_examples, test_dataset, batch_size=96)

### Model summary

In [128]:
model = keras.models.load_model('input_model')

In [None]:
model.summary()

### Model compile

In [130]:
# Compile model
lrate = 0.00125
adam = tf.keras.optimizers.Adam(learning_rate=lrate)

model.compile(loss=keras.losses.CategoricalCrossentropy(), optimizer=adam, metrics=['accuracy'])

In [None]:
layer_w = model.layers[-1].get_weights()[0]
weights = []

for elem in layer_w:
  weights.append(elem[8:10])

biases = np.array(model.layers[-1].get_weights()[1])

weights = np.array(weights)
print(weights)

## Network Profiler
It is possible now to initialize the profiler with the defined model.
The profiler can provide useful information to the designer of the network on the memory occupations in terms of weights and activations, in total and at a per-layer granularity. 

In [None]:
# profiler = profiler.Profiler("model", model)
profiler = profiler.Profiler(network_name="model", model=model, precisions=[[32, 32], [32, 32]])

In [None]:
profiler.print_occupations()

In [None]:
profiler.print_per_layer()

## Train the base network

We use the complete train dataset to train the base neural network on the task of recognizing the first 7 digits.

In [None]:
categorical_cifar_labels = np.zeros((50000, 10))
for i in range(len(train_labels)):
    categorical_cifar_labels[i][train_labels[i]] = 1

train_labels = categorical_cifar_labels
train_labels.shape

In [None]:
categorical_test_labels = np.zeros(shape=(10000, 10))
for i in range(len(test_labels)):
    categorical_test_labels[i][test_labels[i]] = 1

test_labels = categorical_test_labels
test_labels.shape

In [137]:
incremental_examples = np.empty((10000, 32, 32, 3))
start_examples = np.empty((10000, 32, 32, 3))
incremental_labels = np.empty((10000,10), dtype=np.uint8)
start_labels = np.empty((10000,10), dtype=np.uint8)
index_incremental = 0
index_start = 0

# Ship (8) and truck (9) are temporarily excluded by the dataset
for i in range(len(train_examples)):
    if np.argmax(train_labels[i]) > 7:
        if index_incremental >= 10000:
            continue
        incremental_examples[index_incremental] = train_examples[i]
        incremental_labels[index_incremental] = train_labels[i]
        index_incremental += 1
    else:
        if index_start >= 10000:
            continue
        start_examples[index_start] = train_examples[i]
        start_labels[index_start] = train_labels[i]
        index_start += 1
        
incremental_examples_test = np.empty((2000, 32, 32, 3))
start_examples_test = np.empty((2000, 32, 32, 3))
incremental_labels_test = np.empty((2000,10), dtype=np.uint8)
start_labels_test = np.empty((2000,10), dtype=np.uint8)
index_incremental = 0
index_start = 0

for i in range(len(test_examples)):
    if np.argmax(test_labels[i]) > 7:
        if index_incremental >= 2000:
            continue
        incremental_examples_test[index_incremental] = test_examples[i]
        incremental_labels_test[index_incremental] = test_labels[i]
        index_incremental += 1
    else:
        if index_start >= 2000:
            continue
        start_examples_test[index_start] = test_examples[i]
        start_labels_test[index_start] = test_labels[i]
        index_start += 1

In [None]:
layer_2 = model.layers[-1].get_weights()
layer_2_w = layer_2[0]

for i in range(len(layer_2_w)):
  layer_2_w[i][8] = weights[i][0]
  layer_2_w[i][9] = weights[i][1]

layer_2[0] = layer_2_w

layer_2[1][8] = biases[0]
layer_2[1][9] = biases[1]

model.layers[-1].set_weights(layer_2)
print(model.layers[-1].get_weights())

In [None]:
#load the model weights
# model.load_weights('/kaggle/input/bestcheckpoint/model_trained')
model.load_weights('D:\Download\IncLearn_resultReplication_modelWeights\model_trained')

In [None]:
print(model.layers[-1].get_weights())

In [None]:
# model.load_weights('model_trained')
start_examples_scores = model.evaluate(start_examples_test, start_labels_test)
incremental_examples_scores = model.evaluate(incremental_examples_test, incremental_labels_test)
test_scores = model.evaluate(test_examples, test_labels)
print(start_examples_scores, incremental_examples_scores, test_scores)

## Python Incremental solution

To generate the python version of the incremental solution, it's sufficient to call 

*Tybox.create_python_learning_solution(tf_model, mem_available, precision)*

where mem_available is the amount of memory that can be dedicated to the machine learning on device (in Bytes), and precision is data precision of model and activations in bit (currently TyBox support only 32-bit floating point precision).


In [None]:
Mf_lite, Mc_python = TyBox.create_python_learning_solution(model, 9479965, 32)

# # Create a compressible model for TFLite using integer-only quantization
# def representative_data_gen():
#     for input_value in tf.data.Dataset.from_tensor_slices(train_examples).batch(1).take(500):
#         yield [input_value]
# yield_representative_dataset = representative_data_gen
# Mf_lite, Mc_python = TyBox.create_python_learning_solution(model, 2416132, 8, yield_representative_dataset)

In [None]:
with open("mf.tflite", "wb") as file:
    file.write(Mf_lite)

#tf-lite model preparation
interpreter = tf.lite.Interpreter('mf.tflite')
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()[0]
print(input_details)
output_details = interpreter.get_output_details()[0]
print(output_details)

#feature extraction with the tf-lite model
extracted_features = []
for i in range(len(train_labels)):
    input_data = train_examples[i].reshape((1, 32, 32, 3))
    
    if input_details['dtype'] == np.uint8:
        input_scale, input_zero_point = input_details["quantization"]
        input_data = input_data / input_scale + input_zero_point
    
    input_data = input_data.astype(input_details["dtype"])
    # print(input_data)
    interpreter.set_tensor(input_details["index"], input_data)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    # print(output)

    extracted_features.append(interpreter.get_tensor(output_details['index']))
extracted_features = np.array(extracted_features)[:,0,:]
# print(test_feature.shape, test_feature.dtype)

In [144]:
extracted_features_test = []
for i in range(len(test_labels)):
    input_data = test_examples[i].reshape((1, 32, 32, 3))
    
    if input_details['dtype'] == np.uint8:
        input_scale, input_zero_point = input_details["quantization"]
        input_data = input_data / input_scale + input_zero_point
    
    input_data = input_data.astype(input_details["dtype"])
    # print(input_data)
    interpreter.set_tensor(input_details["index"], input_data)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    # print(output)

    extracted_features_test.append(interpreter.get_tensor(output_details['index']))
extracted_features_test = np.array(extracted_features_test)[:,0,:]
# print(test_feature.shape, test_feature.dtype)

In [145]:
extracted_start_features_test = []
for i in range(len(start_labels_test)):
    input_data = start_examples_test[i].reshape((1, 32, 32, 3))
    
    if input_details['dtype'] == np.uint8:
        input_scale, input_zero_point = input_details["quantization"]
        input_data = input_data / input_scale + input_zero_point
    
    input_data = input_data.astype(input_details["dtype"])
    # print(input_data)
    interpreter.set_tensor(input_details["index"], input_data)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    # print(output)

    extracted_start_features_test.append(interpreter.get_tensor(output_details['index']))
extracted_start_features_test = np.array(extracted_start_features_test)[:,0,:]
# print(test_feature.shape, test_feature.dtype)

In [146]:
extracted_start_features = []
for i in range(len(start_labels)):
    input_data = start_examples[i].reshape((1, 32, 32, 3))
    
    if input_details['dtype'] == np.uint8:
        input_scale, input_zero_point = input_details["quantization"]
        input_data = input_data / input_scale + input_zero_point
    
    input_data = input_data.astype(input_details["dtype"])
    # print(input_data)
    interpreter.set_tensor(input_details["index"], input_data)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    # print(output)

    extracted_start_features.append(interpreter.get_tensor(output_details['index']))
extracted_start_features = np.array(extracted_start_features)[:,0,:]
# print(test_feature.shape, test_feature.dtype)

In [147]:
extracted_incremental_features = []
for i in range(len(incremental_labels)):
    input_data = incremental_examples[i].reshape((1, 32, 32, 3))
    
    if input_details['dtype'] == np.uint8:
        input_scale, input_zero_point = input_details["quantization"]
        input_data = input_data / input_scale + input_zero_point
    
    input_data = input_data.astype(input_details["dtype"])
    # print(input_data)
    interpreter.set_tensor(input_details["index"], input_data)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    # print(output)

    extracted_incremental_features.append(interpreter.get_tensor(output_details['index']))
extracted_incremental_features = np.array(extracted_incremental_features)[:,0,:]
# print(test_feature.shape, test_feature.dtype)

In [148]:
extracted_incremental_features_test = []
for i in range(len(incremental_labels_test)):
    input_data = incremental_examples_test[i].reshape((1, 32, 32, 3))
    
    if input_details['dtype'] == np.uint8:
        input_scale, input_zero_point = input_details["quantization"]
        input_data = input_data / input_scale + input_zero_point
    
    input_data = input_data.astype(input_details["dtype"])
    # print(input_data)
    interpreter.set_tensor(input_details["index"], input_data)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    # print(output)

    extracted_incremental_features_test.append(interpreter.get_tensor(output_details['index']))
extracted_incremental_features_test = np.array(extracted_incremental_features_test)[:,0,:]
# print(test_feature.shape, test_feature.dtype)

### TyBox: python model usage

In [None]:
import random

#n of times the experiment will be repeated.
n_repetitions = 5

#n of data used in the experiment
exp_l = 700

#time of the introduction of the new classes
new_subject_time = 50

accuracies_tb = []

for repetition in range(n_repetitions):
    res_acc_old_tb = []
    res_acc_new_tb = []

    #load the original model and convert it
#     model.load_weights('model_trained')
    fe_model, python_model = TyBox.create_python_learning_solution(model, 9479965, 32)
#     def representative_data_gen():
#         for input_value in tf.data.Dataset.from_tensor_slices(train_examples).batch(1).take(500):
#             yield [input_value]
#     yield_representative_dataset = representative_data_gen
#     fe_model, python_model = TyBox.create_python_learning_solution(model, 2416132, 8, yield_representative_dataset)
    python_model.set_lr(0.00025)

    #fix seed for reproducibility
    random.seed(395 + repetition*52)

    indices = random.sample(range(len(extracted_features)), exp_l)
    indices_2 = random.sample(range(len(extracted_start_features)), exp_l)
    
    step = 0

    for i in indices:

        if step < new_subject_time:
            sample = indices_2[step]
            sample_data = extracted_start_features[sample]
            sample_label = start_labels[sample]
        
        else:
            sample = i
            sample_data = extracted_features[sample]
            sample_label = train_labels[sample]
        
        #push datum into buffer and train on the whole buffer
        python_model.push_and_train(sample_data, sample_label)

        #evaluate accuracies on test sets

        accuracy_old = python_model.evaluate(extracted_start_features_test[:200], start_labels_test[:200], output_details["quantization"])
        accuracy_new = python_model.evaluate(extracted_incremental_features_test[:200], incremental_labels_test[:200], output_details["quantization"])
        
        res_acc_old_tb.append(accuracy_old)
        res_acc_new_tb.append(accuracy_new)

        
        print(step, accuracy_new, accuracy_old)
        step += 1
    accuracies_tb.append((res_acc_old_tb, res_acc_new_tb))

In [150]:
avg_accuracies_old_tb = []
for i in range(len(accuracies_tb[0][0])):
  avg = 0
  for ii in range(len(accuracies_tb)):
    avg += accuracies_tb[ii][0][i]
  avg = avg / len(accuracies_tb)
  avg_accuracies_old_tb.append(avg)

In [151]:
import math 

std_dev_accuracies_old_tb = []
for i in range(len(avg_accuracies_old_tb)):
  std_dev = 0
  for ii in range(len(accuracies_tb)):
    std_dev += (avg_accuracies_old_tb[i] - accuracies_tb[ii][0][i]) ** 2
  std_dev = math.sqrt(std_dev / len(accuracies_tb))
  std_dev_accuracies_old_tb.append(std_dev)

In [152]:
avg_accuracies_new_tb = []
for i in range(len(accuracies_tb[0][1])):
  avg = 0
  for ii in range(len(accuracies_tb)):
    avg += accuracies_tb[ii][1][i]
  avg = avg / len(accuracies_tb)
  avg_accuracies_new_tb.append(avg)

In [153]:
std_dev_accuracies_new_tb = []
for i in range(len(avg_accuracies_new_tb)):
  std_dev = 0
  for ii in range(len(accuracies_tb)):
    std_dev += (avg_accuracies_new_tb[i] - accuracies_tb[ii][1][i]) ** 2
  std_dev = math.sqrt(std_dev / len(accuracies_tb))
  std_dev_accuracies_new_tb.append(std_dev)

In [None]:
plt.figure(figsize=(10,7))

plt.plot(avg_accuracies_old_tb, label="toolbox 0-7")
std_low = [avg_accuracies_old_tb[i] - 2*std_dev_accuracies_old_tb[i] for i in range(len(avg_accuracies_old_tb))]
std_high = [min(avg_accuracies_old_tb[i] + 2*std_dev_accuracies_old_tb[i], 1) for i in range(len(avg_accuracies_old_tb))]
plt.fill_between([i for i in range(exp_l)], std_low, std_high, alpha=0.5)

plt.plot(avg_accuracies_new_tb, label="toolbox 8-9")
std_low = [max(avg_accuracies_new_tb[i] - 2*std_dev_accuracies_new_tb[i], 0) for i in range(len(avg_accuracies_new_tb))]
std_high = [min(avg_accuracies_new_tb[i] + 2*std_dev_accuracies_new_tb[i], 1) for i in range(len(avg_accuracies_new_tb))]
plt.fill_between([i for i in range(exp_l)], std_low, std_high, alpha=0.5)

plt.title("Incremental learning")
plt.xlabel("Time")
plt.ylabel("Accuracy")
plt.legend(loc=4)