In [34]:
import os
import glob
os.environ["TF_CPP_MIN_LOG_LEVEL"] = '3'
import tensorflow as tf
import tensorflow.keras.models as tfm
import tensorflow.keras.layers as tfl
import tensorflow.keras.callbacks as tfc
import tensorflow.keras.utils as tfu
import sklearn.model_selection as skms
import sklearn.metrics as skm
import numpy as np
import matplotlib.pyplot as plt
import random
import re
import keras_flops as kf
import time
import functools
import IPython.display as ipd
import pandas as pd

SEED = 42
IMAGE_HEIGHT = 40
IMAGE_WIDTH = 40
SAMPLES_PER_MEASUREMENT = 119
LINES_PER_MEASUREMENT = SAMPLES_PER_MEASUREMENT + 1
IMAGE_WIDTH_HEIGHT_INDEX = IMAGE_WIDTH - 1
NUMBER_OF_LABELS = 5
LABELS = ["Avada Kedavra", "Locomotor", "Arresto Momentum", "Revelio", "Alohomora"]

np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)

# create input/output directories and download data
os.makedirs("figures", exist_ok=True)
os.makedirs("models", exist_ok=True)
os.makedirs("results", exist_ok=True)
os.makedirs("data", exist_ok=True)
if not os.path.isfile("data/spells.zip"):
    os.system("wget -P data/ https://github.com/xmihol00/embedded_ML/raw/main/data/spells.zip")
    os.system("unzip data/spells.zip")

def representative_dataset(data_set):
    for sample in data_set:
        yield [np.expand_dims(sample, 0)]

def collect_model_summary(summary_line, model_dict):
    match = re.match(r"(.*?): ([\d,]+)", summary_line)
    if match:
        match = match.groups()
        model_dict[match[0].replace("params", "parameters")] = int(match[1].replace(',', ''))

def get_stroke_samples(data):
    orientation_samples = np.zeros((SAMPLES_PER_MEASUREMENT, 3))
    stroke_samples = np.zeros((SAMPLES_PER_MEASUREMENT, 2))
    rows_of_samples = [list(map(lambda x: float(x), line.split(','))) for line in data.split('\n') if line]

    for i in range(0, len(rows_of_samples), SAMPLES_PER_MEASUREMENT): 
        measurment = np.array(rows_of_samples[i: i+SAMPLES_PER_MEASUREMENT])
        acceleration_average = np.average(measurment[:, 0:3], axis=0)

        # calcualte orientation
        previous_orientation = np.zeros(3)
        for j, gyro_sample in enumerate(measurment[:, 3:6]):
            orientation_samples[j] = previous_orientation + gyro_sample / SAMPLES_PER_MEASUREMENT
            previous_orientation = orientation_samples[j]     
        orientation_avg = np.average(orientation_samples, axis=0) # average orientation

        # calculate stroke
        acceleration_magnitude = np.sqrt(acceleration_average.dot(acceleration_average.T)) # dot product insted of squaring
        acceleration_magnitude += (acceleration_magnitude < 0.0001) * 0.0001 # prevent division by 0
        normalzied_acceleration = acceleration_average / acceleration_magnitude
        normalized_orientation = orientation_samples - orientation_avg
        stroke_samples[:, 0] = -normalzied_acceleration[1] * normalized_orientation[:, 1] - normalzied_acceleration[2] * normalized_orientation[:, 2]
        stroke_samples[:, 1] =  normalzied_acceleration[1] * normalized_orientation[:, 2] - normalzied_acceleration[2] * normalized_orientation[:, 1]
        yield stroke_samples

def load_as_images(one_hot=True):
    data = ""
    labels = []
    for i, file_name in enumerate(glob.glob("data/*.csv")):
        file = open(file_name, "r")
        file.readline() # skip header
        read_lines = file.read()
        labels += [i] * (read_lines.count("\n") // LINES_PER_MEASUREMENT)
        data += read_lines
        file.close()

    colors = np.linspace(255 - 2 * SAMPLES_PER_MEASUREMENT + 2, 255, SAMPLES_PER_MEASUREMENT) / 255
    images = np.zeros((len(labels), IMAGE_HEIGHT, IMAGE_WIDTH, 1), dtype=np.float32)

    for i, stroke_samples in enumerate(get_stroke_samples(data)): 
        # rasterize stroke
        stroke_samples -= np.min(stroke_samples, axis=0) # make samples in range from 0 to x
        pixels = np.round(stroke_samples * IMAGE_WIDTH_HEIGHT_INDEX / np.max(stroke_samples, axis=0), 0).astype(np.uint8) # normalize samples to the whole image
        image = np.zeros((IMAGE_WIDTH, IMAGE_HEIGHT))
        image[pixels[:, 1], pixels[:, 0]] = colors
        images[i] = image.reshape(IMAGE_WIDTH, IMAGE_HEIGHT, 1).astype(np.float32)

    X_train, X_test, y_train, y_test = skms.train_test_split(images, labels, test_size=0.2, random_state=SEED)
    if one_hot:
        # one-hot encoding of labels
        y_train = tfu.to_categorical(y_train, num_classes=5)

    return X_train, X_test, y_train, np.array(y_test)

def load_as_array(one_hot=True):
    data = ""
    labels = []
    for i, file_name in enumerate(glob.glob("data/*.csv")):
        file = open(file_name, "r")
        file.readline() # skip header
        read_lines = file.read()
        labels += [i] * (read_lines.count("\n") // LINES_PER_MEASUREMENT)
        data += read_lines
        file.close()

    arrays = np.zeros((len(labels), 2 * SAMPLES_PER_MEASUREMENT), dtype=np.float32)

    for i, stroke_samples in enumerate(get_stroke_samples(data)): 
        stroke_samples -= np.min(stroke_samples, axis=0) # make samples in range from 0 to x
        stroke_samples /= np.max(stroke_samples, axis=0) # normalize values from 0 to 1
        arrays[i] = stroke_samples.reshape(-1)

    X_train, X_test, y_train, y_test = skms.train_test_split(arrays, labels, test_size=0.2, random_state=SEED)
    if one_hot:
        # one-hot encoding of labels
        y_train = tfu.to_categorical(y_train, num_classes=5)

    return X_train, X_test, y_train, np.array(y_test)


hidden_activation = tf.keras.layers.LeakyReLU(0.1)
droput_1 = 0.4
droput_2 = 0.3
droput_3 = 0.25

models = [
    tfm.Sequential([
        tfl.Dense(units=5, activation="softmax")
    ]),

    tfm.Sequential([
        tfl.Dense(units=100, activation=hidden_activation),
        tfl.Dense(units=5, activation="softmax")
    ]),

    tfm.Sequential([
        tfl.Dense(units=125, activation=hidden_activation),
        tfl.Dense(units=75, activation=hidden_activation),
        tfl.Dense(units=5, activation="softmax")
    ]),

    tfm.Sequential([
        tfl.Dense(units=150, activation=hidden_activation),
        tfl.Dense(units=100, activation=hidden_activation),
        tfl.Dense(units=50, activation=hidden_activation),
        tfl.Dense(units=5, activation="softmax")
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=8, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=16, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Flatten(),
        tfl.Dense(units=64, activation=hidden_activation),
        tfl.Dense(units=5, activation="softmax"),
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=16, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=64, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Flatten(),
        tfl.Dense(units=128, activation=hidden_activation),
        tfl.Dense(units=5, activation="softmax"),
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=8, kernel_size=(5, 5), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=16, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.Flatten(),
        tfl.Dense(units=64, activation=hidden_activation),
        tfl.Dense(units=5, activation="softmax"),
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=16, kernel_size=(5, 5), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=64, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.Flatten(),
        tfl.Dense(units=128, activation=hidden_activation),
        tfl.Dense(units=5, activation="softmax"),
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=8, kernel_size=(5, 5), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=16, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=64, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.Conv2D(filters=32, kernel_size=(1, 1), activation=hidden_activation, padding="same"),
        tfl.Conv2D(filters=5, kernel_size=(1, 1), activation="softmax", padding="same"),
        tfl.Reshape([5])
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=16, kernel_size=(5, 5), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=64, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=128, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.Conv2D(filters=64, kernel_size=(1, 1), activation=hidden_activation, padding="same"),
        tfl.Conv2D(filters=5, kernel_size=(1, 1), activation="softmax", padding="same"),
        tfl.Reshape([5])
    ]),

    tfm.Sequential([
        tfl.Dense(units=100, activation=hidden_activation),
        tfl.Dropout(droput_1),
        tfl.Dense(units=5, activation="softmax")
    ]),

    tfm.Sequential([
        tfl.Dense(units=125, activation=hidden_activation),
        tfl.Dropout(droput_1),
        tfl.Dense(units=75, activation=hidden_activation),
        tfl.Dropout(droput_2),
        tfl.Dense(units=5, activation="softmax")
    ]),

    tfm.Sequential([
        tfl.Dense(units=150, activation=hidden_activation),
        tfl.Dropout(droput_1),
        tfl.Dense(units=100, activation=hidden_activation),
        tfl.Dropout(droput_2),
        tfl.Dense(units=50, activation=hidden_activation),
        tfl.Dropout(droput_3),
        tfl.Dense(units=5, activation="softmax")
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=8, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=16, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Flatten(),
        tfl.Dropout(droput_1),
        tfl.Dense(units=64, activation=hidden_activation),
        tfl.Dropout(droput_2),
        tfl.Dense(units=5, activation="softmax"),
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=16, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=64, kernel_size=(3, 3), activation=hidden_activation, padding="same"),
        tfl.MaxPool2D(),
        tfl.Flatten(),
        tfl.Dropout(droput_1),
        tfl.Dense(units=128, activation=hidden_activation),
        tfl.Dropout(droput_2),
        tfl.Dense(units=5, activation="softmax"),
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=8, kernel_size=(5, 5), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=16, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.Flatten(),
        tfl.Dropout(droput_1),
        tfl.Dense(units=64, activation=hidden_activation),
        tfl.Dropout(droput_2),
        tfl.Dense(units=5, activation="softmax"),
    ]),

    tfm.Sequential([
        tfl.Conv2D(filters=16, kernel_size=(5, 5), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=32, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.MaxPool2D(),
        tfl.Conv2D(filters=64, kernel_size=(3, 3), activation=hidden_activation, padding="valid"),
        tfl.Flatten(),
        tfl.Dropout(droput_1),
        tfl.Dense(units=128, activation=hidden_activation),
        tfl.Dropout(droput_2),
        tfl.Dense(units=5, activation="softmax"),
    ]),
]

model_names = [
    "baseline_linear",
    "only_DENS_S", "only_DENS_M", "only_DENS_L",
    "CONV_DENS_1_S", "CONV_DENS_1_L", 
    "CONV_DENS_2_S", "CONV_DENS_2_L", 
    "only_CONV_S", "only_CONV_L",
    "only_DENS_S_DO", "only_DENS_M_DO", "only_DENS_L_DO",
    "CONV_DENS_1_S_DO", "CONV_DENS_1_L_DO", 
    "CONV_DENS_2_S_DO", "CONV_DENS_2_L_DO"
]

model_data_set = [
    0,
    0, 0, 0,
    1, 1,
    1, 1,
    1, 1,
    0, 0, 0,
    1, 1,
    1, 1,
]

table_header = ["Total parameters", "Trainable parameters", "Non-trainable parameters", "Size", "Optimized size", 
                "Training time GPU", "Epochs", "FLOPS", "Full model accuracy", "Optimized model accuracy"]

data_sets = [load_as_array(), load_as_images()]
results = {}

for model, model_name, data_set in zip(models, model_names, model_data_set):
    X_train, X_test, y_train, y_test = data_sets[data_set]
    results[model_name] = {}
    results_model = results[model_name]

    # get weights for the given seed
    model.build(X_train.shape)
    weights = model.get_weights()

    # get the best number of epochs based on validation data set
    model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
    history = model.fit(X_train, y_train, epochs=100, validation_split=0.2, batch_size=16, verbose=2,
                        callbacks=[tfc.EarlyStopping(monitor="val_accuracy", patience=3, mode="max", restore_best_weights=False)]).history
    model.set_weights(weights)
    epochs = len(history["loss"]) - 3
    results_model["Epochs"] = epochs
    
    # train on the whole train data set
    model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
    train_start = time.time()
    model.fit(X_train, y_train, epochs=epochs, validation_split=0.0, batch_size=16, verbose=2)
    results_model["Training time GPU"] = f"{time.time() - train_start:.2f} s"

    # predict and evaluate the prediction
    predictions = model.predict(X_test, verbose=2)
    predictions = np.argmax(predictions, axis=1)
    results_model["Full model accuracy"] = f"{(predictions == y_test).sum() / y_test.shape[0] * 100:.2f} \\%"

    # plot the full model confusion metrix
    figure, axis = plt.subplots(2, 1, figsize=(12, 18))
    figure.suptitle(f"{model_name} confusion matrices", fontsize=16) 
    confusion_matrix = tf.math.confusion_matrix(y_test, predictions).numpy()
    confusion_matrix = skm.ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=LABELS)
    axis[0].set_title(f"Full model")
    confusion_matrix.plot(cmap="Blues", ax=axis[0])

    # get the summary of the model
    model.summary(print_fn=lambda x, y=results_model: collect_model_summary(x, y))
    results_model["FLOPS"] = kf.get_flops(model, batch_size=1)

    # convert the model without optimiziation (evaluation is not necessary, the results after conversion are the same)
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()
    results_file = open(f"models/{model_name}.tflite", "wb")
    results_file.write(tflite_model)
    results_file.close()
    results_model["Size"] = os.path.getsize(f"models/{model_name}.tflite")
    os.system(f'echo "const unsigned char model[] = {{" > models/{model_name}.h && cat models/{model_name}.tflite | xxd -i >> models/{model_name}.h && echo "}};" >> models/{model_name}.h && rm -f models/{model_name}.tflite')
    del tflite_model

    # convert the model with optimization 
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8
    converter.inference_output_type = tf.int8
    converter.representative_dataset = lambda x=X_train: representative_dataset(x)
    tflite_model_opt = converter.convert()
    results_file = open(f"models/{model_name}.tflite", "wb")
    results_file.write(tflite_model_opt)
    results_file.close()
    results_model["Optimized size"] = os.path.getsize(f"models/{model_name}.tflite")
    os.system(f'echo "const unsigned char model[] = {{" > models/{model_name}_opt.h && cat models/{model_name}.tflite | xxd -i >> models/{model_name}_opt.h && echo "}};" >> models/{model_name}_opt.h && rm -f models/{model_name}.tflite')

    # predict using the optimized model and evaluate the prediction
    interpreter = tf.lite.Interpreter(model_content=tflite_model_opt)
    interpreter.allocate_tensors()
    input_index = interpreter.get_input_details()[0]["index"]
    output_index = interpreter.get_output_details()[0]["index"]
    input_scale, input_zero_point = interpreter.get_output_details()[0]["quantization"]
    predictions = np.zeros((y_test.shape[0]))
    for i, sample in enumerate(X_test):
        interpreter.set_tensor(input_index, np.expand_dims(sample / input_scale + input_zero_point, 0).astype(np.int8))
        interpreter.invoke()
        predictions[i] = np.argmax(interpreter.get_tensor(output_index)[0]) # rescaling is not needed
    results_model["Optimized model accuracy"] = f"{(predictions == y_test).sum() / y_test.shape[0] * 100:.2f} \\%"
    
    # plot the confusuion matrix of the optimized model
    confusion_matrix = tf.math.confusion_matrix(y_test, predictions).numpy()
    confusion_matrix = skm.ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=LABELS)
    axis[1].set_title(f"Optimized model")
    confusion_matrix.plot(cmap="Blues", ax=axis[1])
    plt.savefig(f"figures/{model_name}_confusion_matrix.png", dpi=300)
    plt.close()
    del tflite_model_opt

    # clear cell output after each model
    ipd.clear_output()

# export colected statistics to LaTex table and pandas data frame
data_frame = pd.DataFrame()
with open("results/statistics.tex", "w") as results_file:
    print = functools.partial(print, file=results_file)
    row_end = "\\\\"
    backslash_underscore = "\\_"
    print("\\begin{table}[ht]", "\\tiny", "\\center", "\\begin{tabular}{ |c|c|c|c|c|c|c|c|c|c| }", sep="\n")        
    print("\\hline")

    print("& ", end="")
    for header in table_header[:-1]:
        print(f"\\thead{{{header.replace(' ', row_end)}}} & ", end="")
    print(f"\\thead{{{table_header[-1].replace(' ', row_end)}}} {row_end}")
    print("\\hline")

    for model_name in model_names:
        results_model = results[model_name]
        print(f"\\thead{{{model_name.replace('_', backslash_underscore)}}} & ", end="")
        for header in table_header[:-1]:
            print(f"{results_model[header]} & ", end="")
            data_frame.at[model_name, header] = results_model[header]
        print(f"{results_model[table_header[-1]]} {row_end}")
        data_frame.at[model_name, table_header[-1]] = results_model[table_header[-1]]

    print("\\hline")
    print("\\end{tabular}", "\\end{table}", sep="\n")

data_frame.replace("\\\%", '%', regex=True)

Unnamed: 0,Total parameters,Trainable parameters,Non-trainable parameters,Size,Optimized size,Training time GPU,Epochs,FLOPS,Full model accuracy,Optimized model accuracy
baseline_linear,1195.0,1195.0,0.0,6104.0,2736.0,0.30 s,3.0,2410.0,80.77 %,79.23 %
only_DENS_S,24405.0,24405.0,0.0,99532.0,26968.0,0.57 s,15.0,48730.0,96.15 %,96.92 %
only_DENS_M,39705.0,39705.0,0.0,161304.0,43280.0,0.53 s,12.0,79230.0,96.92 %,96.92 %
only_DENS_L,56255.0,56255.0,0.0,228076.0,60832.0,0.53 s,12.0,112230.0,97.69 %,97.69 %
CONV_DENS_1_S,57477.0,57477.0,0.0,234828.0,64888.0,1.78 s,4.0,2221534.0,93.85 %,95.38 %
CONV_DENS_1_L,228869.0,228869.0,0.0,920396.0,237984.0,1.07 s,5.0,8334238.0,96.92 %,97.69 %
CONV_DENS_2_S,80133.0,80133.0,0.0,325264.0,87328.0,0.66 s,5.0,1618270.0,94.62 %,95.38 %
CONV_DENS_2_L,319237.0,319237.0,0.0,1281680.0,328136.0,1.00 s,6.0,5374622.0,96.92 %,96.92 %
only_CONV_S,26757.0,26757.0,0.0,113628.0,38424.0,2.45 s,12.0,1512638.0,98.46 %,98.46 %
only_CONV_L,105989.0,105989.0,0.0,430556.0,121760.0,1.32 s,8.0,4950366.0,96.15 %,96.92 %
