In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

filename = "punch_1.csv"

df = pd.read_csv("../include/" + filename)
# print(df.dtypes)

index = range(1, len(df["aX"]) + 1)

plt.rcParams["figure.figsize"] = (20, 10)

plt.plot(index, df["aX"], "g.", label="x", linestyle="solid", marker=",")
plt.plot(index, df["aY"], "b.", label="y", linestyle="solid", marker=",")
plt.plot(index, df["aZ"], "r.", label="z", linestyle="solid", marker=",")
plt.title("Acceleration")
plt.xlabel("Sample #")
plt.ylabel("Acceleration (G)")
plt.legend()
plt.show()

In [None]:
import tensorflow as tf
import keras

print(f"TensorFlow version = {tf.__version__}\n")

# Set a fixed random seed value, for reproducibility, this will allow us to get
# the same random numbers each time the notebook is run
SEED = 1337
np.random.seed(SEED)
tf.random.set_seed(SEED)

# the list of gestures
GESTURES = [
    "punch",
    "flex",
]
NUM_GESTURES = len(GESTURES)
TOTAL_RECORDS_PER_BATCH = 119

# create a one-hot encoded output matrix that is encoded to value 0 or 1
ONE_HOT_ENCODED_GESTURES = np.eye(NUM_GESTURES)
inputs = []
outputs = []

# read each csv file and push an input and output
for gesture_index in range(NUM_GESTURES):
    # get the current gesture (ex: punch or flex)
    gesture = GESTURES[gesture_index]
    print(f"Processing index {gesture_index} for gesture '{gesture}'.")

    # assign output to the corresponding input index (the output will be encoded to 0 or 1)
    output = ONE_HOT_ENCODED_GESTURES[gesture_index]

    # read input from csv file
    df = pd.read_csv("../include/" + gesture + ".csv")
    print(df.head())
    print(df.shape)
    print(df.shape[0], " -- ", TOTAL_RECORDS_PER_BATCH)

    # partition input to multiple batches --> reason: faster execution
    num_batches_per_gesture = int(df.shape[0] / TOTAL_RECORDS_PER_BATCH)

    print(f"\tThere are {num_batches_per_gesture} batches of the *{gesture}* gesture.\n")

    # go into each batch and start processing
    for batch in range(num_batches_per_gesture):
        tensor = []
        for record in range(TOTAL_RECORDS_PER_BATCH):
            index = batch * TOTAL_RECORDS_PER_BATCH + record
            # normalize the input data, between 0 to 1:
            # - acceleration is between: -2048 to +2048
            tensor += [
                (df["aX"][index] + 2048.0) / 4096.0,
                (df["aY"][index] + 2048.0) / 4096.0,
                (df["aZ"][index] + 2048.0) / 4096.0,
            ]
        inputs.append(tensor)
        outputs.append(output)

# convert the list to numpy array
inputs = np.array(inputs)
outputs = np.array(outputs)

print("Data set parsing and preparation complete.")
print(len(inputs), "\n")
print(
    "punch records length per batch = len([ax,ay,az]) * TOTAL_RECORDS_PER_BATCH = 3 * TOTAL_RECORDS_PER_BATCH = ",
    len(inputs[0]),
    "\n",
)
print("punch input: ", inputs[0], " output: ", outputs[0])
print("flex input: ", inputs[1], " output: ", outputs[1])


TensorFlow version = 2.15.0

Processing index 0 for gesture 'punch'.
    aX    aY   aZ
0 -715 -1381 -788
1 -715 -1381 -788
2 -715 -1381 -788
3 -715 -1381 -788
4 -715 -1381 -788
(1080, 3)
1080  --  119
	There are 9 batches of the *punch* gesture.

Processing index 1 for gesture 'flex'.
     aX   aY   aZ
0 -1497  138 -408
1 -1497  138 -408
2 -1497  138 -408
3 -1497  138 -408
4 -1469  155 -356
(1080, 3)
1080  --  119
	There are 9 batches of the *flex* gesture.

Data set parsing and preparation complete.
18 

punch records length per batch = len([ax,ay,az]) * TOTAL_RECORDS_PER_BATCH = 3 * TOTAL_RECORDS_PER_BATCH =  357 

punch input:  [0.32543945 0.1628418  0.30761719 0.32543945 0.1628418  0.30761719
 0.32543945 0.1628418  0.30761719 0.32543945 0.1628418  0.30761719
 0.32543945 0.1628418  0.30761719 0.32543945 0.1628418  0.30761719
 0.32543945 0.1628418  0.30761719 0.32543945 0.1628418  0.30761719
 0.21899414 0.09106445 0.40698242 0.21899414 0.09106445 0.40698242
 0.21899414 0.09106445 0.

In [None]:
# Randomize the order of the inputs, so they can be evenly distributed for training, testing, and validation
# https://stackoverflow.com/a/37710486/2020087
num_inputs = len(inputs)
randomize = np.arange(num_inputs)
np.random.shuffle(randomize)

# Swap the consecutive indexes (0, 1, 2, etc) with the randomized indexes
inputs = inputs[randomize]
outputs = outputs[randomize]

# Split the recordings (group of samples) into three sets: training, testing and validation
TRAIN_SPLIT = int(0.6 * num_inputs)
TEST_SPLIT = int(0.2 * num_inputs + TRAIN_SPLIT)

inputs_train, inputs_test, inputs_val = np.split(inputs, [TRAIN_SPLIT, TEST_SPLIT])
outputs_train, outputs_test, outputs_val = np.split(outputs, [TRAIN_SPLIT, TEST_SPLIT])

print(inputs_train.shape, outputs_train.shape)
print(inputs_test.shape, outputs_test.shape)
print(inputs_val.shape, outputs_val.shape)

print("Data set randomization and splitting complete.")

(10, 357) (10, 2)
(3, 357) (3, 2)
(5, 357) (5, 2)
Data set randomization and splitting complete.


In [None]:
# Build the model
model = keras.Sequential()
model.add(keras.layers.Dense(50, activation="softplus", input_shape=(357,)))  # relu is used for performance
model.add(keras.layers.Dense(15, activation="softplus"))
model.add(keras.layers.Dense(NUM_GESTURES, activation="softmax"))  # softmax is used, because we only expect one gesture to occur per input

model.layers

[<keras.src.layers.core.dense.Dense at 0x1f44322ad90>,
 <keras.src.layers.core.dense.Dense at 0x1f44321a850>,
 <keras.src.layers.core.dense.Dense at 0x1f4432db5d0>]

In [None]:
# Train Model
model.compile(optimizer="rmsprop", loss="mse", metrics=["mae"])

print(inputs_train[0].shape, inputs_train.shape)

history = model.fit(
    inputs_train,
    outputs_train,
    epochs=200,
    batch_size=1,
    validation_data=(inputs_val, outputs_val),
)

(357,) (10, 357)
Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoc

In [None]:
model.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_11 (Dense)            (None, 50)                17900     
                                                                 
 dense_12 (Dense)            (None, 15)                765       
                                                                 
 dense_13 (Dense)            (None, 2)                 32        
                                                                 
Total params: 18697 (73.04 KB)
Trainable params: 18697 (73.04 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
# use the model to predict the test inputs
print(inputs_test[0].shape)
predictions = model.predict(inputs_test)
# print the predictions and the expected ouputs
print("predictions =\n", np.round(predictions, decimals=3))
print("actual =\n", outputs_test)
print(
    len(inputs_train),
    " ",
    len(inputs_test),
    " ",
    len(inputs_val),
    " ",
    len(predictions),
)


# Convert predictions to class labels
predicted_labels = np.argmax(predictions, axis=1)
actual_labels = np.argmax(outputs_test, axis=1)

# Compare predictions with actual labels
correct_predictions = np.sum(predicted_labels == actual_labels)
total_predictions = len(actual_labels)

# Calculate accuracy
accuracy = correct_predictions / total_predictions
print("Accuracy:", accuracy)

(357,)
predictions =
 [[0.714 0.286]
 [0.    1.   ]
 [1.    0.   ]]
actual =
 [[1. 0.]
 [0. 1.]
 [1. 0.]]
10   3   5   3
Accuracy: 1.0


In [None]:
def representative_dataset():
    for data in tf.data.Dataset.from_tensor_slices((inputs)).batch(1).take(100):
        yield [tf.dtypes.cast(data, tf.float32)]

In [None]:
# Convert the model to the TensorFlow Lite format without quantization
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# converter.representative_dataset = representative_dataset
# # converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]
# converter.inference_input_type = tf.int8  # or tf.uint8
# converter.inference_output_type = tf.int8  # or tf.uint8
# converter.inference_input_type = tf.float32  # or tf.uint8
# converter.inference_output_type = tf.float32  # or tf.uint8
tflite_model = converter.convert()

# Save the model to disk
open("gesture_model.tflite", "wb").write(tflite_model)

import os

basic_model_size = os.path.getsize("gesture_model.tflite")
print("Model is %d bytes" % basic_model_size)

INFO:tensorflow:Assets written to: C:\Users\quoct\AppData\Local\Temp\tmplwp1q338\assets


INFO:tensorflow:Assets written to: C:\Users\quoct\AppData\Local\Temp\tmplwp1q338\assets


Model is 23432 bytes


In [None]:
# Get Interpreter to execute model
interpreter = tf.lite.Interpreter(model_path="gesture_model.tflite")
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print(f"input_details: {input_details}")
print(f"output_details: {output_details}")

input_data = inputs_test[0:1].astype(np.float32)

print(input_data)
# push input data to interpreter
interpreter.set_tensor(input_details[0]["index"], input_data)

# start execution
interpreter.invoke()

# get the output of this model
output_data = interpreter.get_tensor(output_details[0]["index"])

print(output_data)
print(output_data.shape)

input_details: [{'name': 'serving_default_dense_input:0', 'index': 0, 'shape': array([  1, 357]), 'shape_signature': array([ -1, 357]), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}]
output_details: [{'name': 'StatefulPartitionedCall:0', 'index': 10, 'shape': array([1, 2]), 'shape_signature': array([-1,  2]), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}]
[[0.27539062 0.65893555 0.25683594 0.27539062 0.65893555 0.25683594
  0.27539062 0.65893555 0.25683594 0.27539062 0.65893555 0.25683594
  0.3684082  0.14794922 0.17041016 0.3684082  0.14794922 0.17041016
  0.3684082  0.14794922 0.17041016 0.3684082  0.14794922 0.17041016
  0.2998047  0.03588867 0.19

In [None]:
def convert_tflite_to_header(tflite_path, output_header_path):
    with open(tflite_path, "rb") as tflite_file:
        tflite_content = tflite_file.read()

    hex_lines = [", ".join([f"0x{byte:02x}" for byte in tflite_content[i : i + 12]]) for i in range(0, len(tflite_content), 12)]

    hex_array = ",\n  ".join(hex_lines)

    with open(output_header_path, "w") as header_file:
        header_file.write("alignas(8) const unsigned char g_model[] = {\n  ")
        header_file.write(f"{hex_array}\n")
        header_file.write("};\n\n")


tflite_path = "gesture_model.tflite"
output_header_path = "../include/g_model.h"

convert_tflite_to_header(tflite_path, output_header_path)

In [None]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):
    c_str = ""

    # Create header guard
    c_str += "#ifndef " + var_name.upper() + "_H\n"
    c_str += "#define " + var_name.upper() + "_H\n\n"

    # Add array length at top of file
    c_str += "\nunsigned int " + var_name + "_len = " + str(len(hex_data)) + ";\n"

    # Declare C variable
    c_str += "alignas(8) const unsigned char " + var_name + "[] = {"
    hex_array = []
    for i, val in enumerate(hex_data):
        # Construct string from hex
        hex_str = format(val, "#04x")
        # hex_str = val
        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ","
        if (i + 1) % 12 == 0:
            hex_str += "\n "
        hex_array.append(hex_str)

    # Add closing brace
    c_str += "\n " + format(" ".join(hex_array)) + "\n};\n\n"

    # Close out header guard
    c_str += "#endif //" + var_name.upper() + "_H"

    return c_str


c_model_name = "g_model"
with open(c_model_name + ".h", "w") as file:
    file.write(hex_to_c_array(tflite_model, c_model_name))

In [None]:
import tensorflow as tf

model = tf.keras.models.load_model("../include/g_model.h")
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
open("converted_model.tflite", "wb").write(tflite_model)

OSError: Unable to synchronously open file (file signature not found)