# Lab 2: STM32 Cube AI and TensorFlow, Activity Recognition with Multi-Layer Perceptron (MLP)

## Learning objectives

- Train the first model with TensorFlow and deploy the inference on a microcontroller using STM32 Cube AI
- Project-Based Lab: Develop the activity recognition application using multi-layer perceptron (MLP)
- Evaluate with and without feature extraction. Run and evaluate the performance on a STM32 microcontroller

## Preparation

In [None]:
import serial.tools.list_ports
import sklearn
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os, random

base_dir = os.getcwd()
samples_dir = os.path.join(base_dir, 'Samples')

Connect the MCU to the host PC

In [None]:
print('Com ports list:')
comPorts = list(serial.tools.list_ports.comports())
for comPort in comPorts:
    print(comPort)
chooseComPort = input('Please insert port number: ')
ser = serial.Serial('COM{}'.format(chooseComPort), 115200)

Helper functions for data acquisition

In [None]:
def convert_to_list(value):
    value = value.replace("b' ", "")
    vals = value.split(", ")
    del vals[-1]
    results = list(map(int, vals))
    return results

def convert_list_to_df(lst):
    x = lst[0::3]
    y = lst[1::3]
    z = lst[2::3]
    df = pd.DataFrame({'X': x, 'Y': y, 'Z': z})
    return df

## Data Acquisition
Input the letter of the sample you want to acquire. The MCU will send the XYZ Accelerometer data as a timeseries of length 30. Run the cell -> Press the blue button on the MCU and move it until the green LED turns off -> Insert the number 1 into the prompt to acquire the data or number 2 to exit -> Repeat

In [None]:
letter = input('Please insert letter to collect data: ')
stride = 30
f = os.path.join(samples_dir, 'letter_{}_stride_{}.csv'.format(letter, stride))
if os.path.exists(f):
    print('File exists and data will be appended...')
    xyz_df = pd.read_csv(f)
else:
    print('New sample, starting blank...')
    xyz_df = pd.DataFrame(columns=['X', 'Y', 'Z'])

while input('1 - acquire sample, 2 - exit: ') == '1':
    line = ser.readline()
    lineList = convert_to_list(str(line))
    new_df = convert_list_to_df(lineList)
    print('New data acquired:\n', new_df.describe())
    xyz_df = pd.concat([xyz_df, new_df], ignore_index=True)
    print('Total Data count:', int(xyz_df.shape[0]/stride))

print('Saving data to:', f)
print('Total data of sample {}:\n'.format(letter), xyz_df.describe())
xyz_df.to_csv(f, index=False)

Load all samples raw. Note that a single Accelerometer data batch will only contain 30 time steps. Thus to acquire single batches we must use an according "stride" of 30.

In [None]:
data_files = [file for file in os.listdir(samples_dir) if '.csv' in file]

stride = 30
data = []
labels = []
for idx, file in enumerate(data_files):
    df = pd.read_csv(os.path.join(samples_dir, file))
    x = df['X'].to_numpy()
    y = df['Y'].to_numpy()
    z = df['Z'].to_numpy()
    
    for i in range(int(df.shape[0]/stride)):
        base_idx = i * stride
        batch = np.array([x[base_idx:base_idx+stride], y[base_idx:base_idx+stride], z[base_idx:base_idx+stride]])
        batch = batch.reshape((3, stride))
        data.append(batch)
        labels.append(idx)
        
    print('Added {} data to the data list with label: {}'.format(file, idx))

Let's plot one data sample.

In [None]:
def plot_single_sample(data_sample, label='Not Specified'):
    plt.clf()
    scaling = 2**10 #The STM ADC is 10bit so scale to get [g]
    fig, axs = plt.subplots(3)
    t = np.linspace(0, data_sample.shape[1] * 100, data_sample.shape[1]) #Accelerometer sampled with 100ms
    axs[0].set_title(label='Single Data Sample of Label {}'.format(label))
    axs[0].plot(t, data_sample[0]/scaling, c='m')
    axs[0].set_ylabel('X [g]')
    plt.setp(axs[0].get_xticklabels(), visible=False)
    axs[1].plot(t, data_sample[1]/scaling, c='m')
    axs[1].set_ylabel('Y [g]')
    plt.setp(axs[1].get_xticklabels(), visible=False)
    axs[2].plot(t, data_sample[2]/scaling, c='m')
    axs[2].set_ylabel('Z [g]')
    plt.xlabel('Time [ms]')
    plt.show()
    
idx = random.randint(0, len(data)-1)
plot_single_sample(data_sample=data[idx], label=labels[idx])

## Model creation and training
Here, we create a neural network and train it! It is very small and simple.

In [None]:
x_train, y_train = sklearn.utils.shuffle(np.array(data), np.array(labels))
y_train = tf.keras.utils.to_categorical(y_train, len(np.unique(y_train)))

model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(3, stride)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(30, activation="relu"),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(20, activation="relu"),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(len(np.unique(y_train)), activation="softmax")
        ]
    )

model.summary()

In [None]:
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
history = model.fit(x_train, y_train, batch_size=1, epochs=200, validation_split=0.4)

model.save('raw_model.h5')

In [None]:
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0, 1])
plt.legend(loc='lower right')

plt.figure()
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label = 'val loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim([0, 1000])
plt.legend(loc='lower right')

The default value of the Adam optimizer is 0.001. If the learning rate is too high, the model is more likely to overshoot the minia. On the other hand, if the learning rate is too small, the model reaches to the minia too slowly.

In [None]:
# High learning rate (lr = 1000)

model_hlr = tf.keras.models.clone_model(model)

optimizer = tf.optimizers.Adam(learning_rate = 1000)
model_hlr.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
history = model_hlr.fit(x_train, y_train, batch_size=32, epochs=100, validation_split=0.4)

plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label = 'val loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim([0, 1000])
plt.legend(loc='lower right')

In [None]:
# Low learning rate (lr = 0.0001)

model_llr = tf.keras.models.clone_model(model)

optimizer = tf.optimizers.Adam(learning_rate = 0.0001)
model_llr.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
history = model_llr.fit(x_train, y_train, batch_size=32, epochs=100, validation_split=0.4)

plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label = 'val loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim([0, 1000])
plt.legend(loc='lower right')

## Model inference
Here we perform a single inference of the model. 
Run the cell -> Draw one of your letter classes with the MCU again (try to keep the movements similar as before) -> Press enter to acquire the MCU data -> view the model prediction. Did it do well?

In [None]:
input('Press Enter once MCU is ready')
line = ser.readline()
lineList = convert_to_list(str(line))
new_df = convert_list_to_df(lineList)
print('New data acquired:\n', new_df.describe())

x = new_df['X'].to_numpy()
y = new_df['Y'].to_numpy()
z = new_df['Z'].to_numpy()

inf_data = np.array([x, y, z])
plot_single_sample(data_sample=inf_data.reshape((3, stride)))
#For inference we have to explicitly tell the model that the data has a batchsize of 1
inf_data = inf_data.reshape((1, 3, stride))

pred = model.predict(inf_data)
print('Model Prediction: ', np.argmax(pred))

## Feature extraction

As the previously trained model performence is rather poor (because we don't have much data to train with). We will now be showing the powerfull effect of feature extraction.

![Feature Extraction](feature.png)

Instead of directly using the raw data to train a network, in some applications better results are achieved by first preprocessing the data into more abstract features, to _condense_ the contained information into fewer data points. This not only leads to less memory required for the features, but also to potentially smaller networks and better performance. Finding good features can be very hard, but for many applications you can read papers about which features perform well and how to calculate them.

In this task a sliding window is used to calculate the piece wise mean and variance of the accelerator data.


In [None]:
data_files = [file for file in os.listdir(samples_dir) if '.csv' in file]

stride = 30
slidingWindowExt = 6
feature_data = []
feature_labels = []
for idx, file in enumerate(data_files):
    df = pd.read_csv(os.path.join(samples_dir, file))
    x = df['X'].to_numpy()
    y = df['Y'].to_numpy()
    z = df['Z'].to_numpy()
    
    for i in range(int(df.shape[0]/stride)):
        base_idx = i * stride

        # Mean feature
        x_mean_ext = np.array([np.mean(x[i:i + slidingWindowExt]) for i in range(base_idx, base_idx + stride, slidingWindowExt)])
        y_mean_ext = np.array([np.mean(y[i:i + slidingWindowExt]) for i in range(base_idx, base_idx + stride, slidingWindowExt)])
        z_mean_ext = np.array([np.mean(z[i:i + slidingWindowExt]) for i in range(base_idx, base_idx + stride, slidingWindowExt)])
        # STD feature
        x_std_ext = np.array([np.std(x[i:i + slidingWindowExt]) for i in range(base_idx, base_idx + stride, slidingWindowExt)])
        y_std_ext = np.array([np.std(y[i:i + slidingWindowExt]) for i in range(base_idx, base_idx + stride, slidingWindowExt)])
        z_std_ext = np.array([np.std(z[i:i + slidingWindowExt]) for i in range(base_idx, base_idx + stride, slidingWindowExt)])
        
        batch = np.array([x_mean_ext, y_mean_ext, z_mean_ext, x_std_ext, y_std_ext, z_std_ext])
        feature_data.append(batch)
        feature_labels.append(idx)
        
    print('Added {} data to the feature data list with label: {}'.format(file, idx))

In [None]:
def plot_single_feature_sample(data_sample, label='Not Specified'):
    plt.clf()
    fig, axs = plt.subplots(3)
    t = np.linspace(0, data_sample.shape[1], data_sample.shape[1])
    scaling = 2**10 #The STM ADC is 10bit so scale to get [g]
    axs[0].set_title(label='Single Data Sample of Label {}'.format(feature_labels[idx]))
    
    xf_means = data_sample[0] / scaling
    yf_means = data_sample[1] / scaling
    zf_means = data_sample[2] / scaling
    
    xf_stds = data_sample[3] / scaling
    yf_stds = data_sample[4] / scaling
    zf_stds = data_sample[5] / scaling
    
    axs[0].grid()
    axs[0].plot(t, xf_means, c='m')
    axs[0].fill_between(t, xf_means - xf_stds, xf_means + xf_stds, alpha=0.2, color='m')
    axs[0].set_ylabel('X [g]')
    plt.setp(axs[0].get_xticklabels(), visible=False)
    axs[1].grid()
    axs[1].plot(t, yf_means, c='m')
    axs[1].fill_between(t, yf_means - yf_stds, yf_means + yf_stds, alpha=0.2, color='m')
    axs[1].set_ylabel('Y [g]')
    plt.setp(axs[1].get_xticklabels(), visible=False)
    axs[2].grid()
    axs[2].plot(t, zf_means, c='m')
    axs[2].fill_between(t, zf_means - zf_stds, zf_means + zf_stds, alpha=0.2, color='m')
    axs[2].set_ylabel('Z [g]')
    
    plt.xlabel('Sample')
    plt.show()
    
idx = random.randint(0, len(feature_data)-1)
plot_single_feature_sample(data_sample=feature_data[idx], label=feature_labels[idx])

## Feature-based model
The neural network basically uses the same architecture as before. Simply the input shape has changed. So we can show the effect that preprocessing your data has on the model.

In [None]:
x_train, y_train = sklearn.utils.shuffle(np.array(feature_data), np.array(feature_labels))
y_train = tf.keras.utils.to_categorical(y_train, len(np.unique(y_train)))

data_shape = x_train[0].shape
model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=data_shape),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(data_shape[0] * data_shape[1], activation="relu"),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(20, activation="relu"),
            tf.keras.layers.Dense(len(np.unique(y_train)), activation="softmax")
        ]
    )

model.summary()
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
model.fit(x_train, y_train, batch_size=32, epochs=500, validation_split=0.4)

## Feature-based model inference
Here we perform a single inference of the model, but with the feature based model! 
Run the cell -> Draw one of your letter classes with the MCU again (try to keep the movements similar as before) -> Press enter to acquire the MCU data -> view the model prediction. How does it perform now?

In [None]:
input('Press Enter once MCU is ready')
line = ser.readline()
lineList = convert_to_list(str(line))
new_df = convert_list_to_df(lineList)
print('New data acquired:\n', new_df.describe())

x = new_df['X'].to_numpy()
y = new_df['Y'].to_numpy()
z = new_df['Z'].to_numpy()

# Mean feature
x_mean_ext = np.array([np.mean(x[i:i + slidingWindowExt]) for i in range(0, stride, slidingWindowExt)])
y_mean_ext = np.array([np.mean(y[i:i + slidingWindowExt]) for i in range(0, stride, slidingWindowExt)])
z_mean_ext = np.array([np.mean(z[i:i + slidingWindowExt]) for i in range(0, stride, slidingWindowExt)])
# STD feature
x_std_ext = np.array([np.std(x[i:i + slidingWindowExt]) for i in range(0, stride, slidingWindowExt)])
y_std_ext = np.array([np.std(y[i:i + slidingWindowExt]) for i in range(0, stride, slidingWindowExt)])
z_std_ext = np.array([np.std(z[i:i + slidingWindowExt]) for i in range(0, stride, slidingWindowExt)])

inf_data = np.array([x_mean_ext, y_mean_ext, z_mean_ext, x_std_ext, y_std_ext, z_std_ext])
#For inference we have to explicitly tell the model that the data has a batchsize of 1
plot_single_feature_sample(data_sample=inf_data)
inf_data = inf_data.reshape((1, data_shape[0], data_shape[1]))

pred = model.predict(inf_data)
print('Model Prediction: ', np.argmax(pred))

Let's save the feature model an some input data together with the respective output for STM Cube AI.

In [None]:
with open('test.npy', 'wb') as f:
    np.save(f, x_train)

with open('test_out.npy', 'wb') as f:
    np.save(f, y_train)
    
model.save('feature_mlp.h5')