Load dataset

In [1]:

from google.colab import files
import zipfile
import io
import os
import glob

# Change to true to opload new data
if not os.path.exists('/content/data.zip'):
    uploaded = files.upload()

    for fn in uploaded.keys():
        print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))

    if 'data.zip' in uploaded:
        zf = zipfile.ZipFile(io.BytesIO(uploaded['data.zip']), "r")
        zf.extractall()

training_folder = "/content/training_data"
testing_folder = "/content/testing_data"

Saving data.zip to data.zip
User uploaded file "data.zip" with length 773029 bytes


Load library

In [2]:
import tensorflow as tf
print("TensorFlow version:", tf.__version__)

import numpy as np
from tensorflow.keras import layers

TensorFlow version: 2.12.0


Create dataset

In [3]:
from tensorflow.keras.datasets import mnist

category_number = {"still" : 0, "circle" : 1, "eight" : 2, "line" : 3}

def load_to_numpy(path: str, category: dict):
    files = glob.glob(path + "/**/*.txt", recursive=True)

    final_x_data = []
    final_y_data = []

    for file in files:
        print(file)
        y_val = category_number[os.path.basename(os.path.dirname(file))]

        x_data = np.genfromtxt(file, delimiter=",")
        x_data = x_data[:, :-1] / (2**12)

        y_data = [y_val]
        y_data = tf.keras.utils.to_categorical(y_data, num_classes=4)

        final_x_data.append(x_data)
        final_y_data.append(y_data)

    return final_x_data, final_y_data

def train_generator(list_x_data, list_y_data):
    while True:
        sequence_length = np.random.randint(50, 1000)

        final_x_train = None
        final_y_train = None

        for x_data, y_data in zip(list_x_data, list_y_data):

            size_first_axis = x_data.shape[0] // sequence_length
            cropped_x_data = x_data[:size_first_axis * sequence_length, :]

            # Reshape the cropped array
            x_train = cropped_x_data.reshape((size_first_axis, sequence_length, x_data.shape[1]))
            y_train = np.tile(y_data, (size_first_axis, 1))

            if final_x_train is None:
                final_x_train = x_train
            else:
                final_x_train = np.vstack((final_x_train, x_train))

            if final_y_train is None:
                final_y_train = y_train
            else:
                final_y_train = np.vstack((final_y_train, y_train))

        yield final_x_train, final_y_train

def test_generator(list_x_data, list_y_data):
    for x_data, y_data in zip(list_x_data, list_y_data):
        yield x_data.reshape(1, x_data.shape[0], x_data.shape[1]), y_data

list_x_train, list_y_train = load_to_numpy(training_folder, category_number)

print(list_x_train[0][0])

list_x_test, list_y_test = load_to_numpy(testing_folder, category_number)

gen = train_generator(list_x_train, list_y_train)
x, y = next(gen)
print(f"{x.shape = } and {y.shape = }")


gen = test_generator(list_x_test, list_y_test)

for x, y in gen:
    print(f"{x.shape = } and {y.shape = }")

/content/training_data/eight/eight_pattern_0.txt
/content/training_data/eight/eight_pattern_1.txt
/content/training_data/eight/eight_pattern_2.txt
/content/training_data/line/line_3.txt
/content/training_data/line/line_2.txt
/content/training_data/line/line_0.txt
/content/training_data/line/line_1.txt
/content/training_data/circle/circle_2.txt
/content/training_data/circle/circle_0.txt
/content/training_data/circle/circle_1.txt
/content/training_data/still/still_1.txt
/content/training_data/still/still_0.txt
[-0.05126953  0.18579102  0.46166992  0.66040039  0.4362793   0.30078125]
/content/testing_data/eight/eight2.txt
/content/testing_data/eight/eight1.txt
/content/testing_data/eight/eight0.txt
/content/testing_data/eight/eight3.txt
/content/testing_data/line/line0.txt
/content/testing_data/line/line1.txt
/content/testing_data/line/line3.txt
/content/testing_data/line/line2.txt
/content/testing_data/circle/circle1.txt
/content/testing_data/circle/circle3.txt
/content/testing_data/circ

Create network

In [4]:

def create_model():
    # Hyperparameters
    drop = 0.3
    rec_drop = 0.3

    model = tf.keras.Sequential(name="LSTM-Model")

    model.add(tf.keras.Input(shape=(None, 6)))
    model.add(layers.LSTM(32, activation="tanh", recurrent_activation='hard_sigmoid', dropout=drop, recurrent_dropout=rec_drop, return_sequences=True))
    model.add(layers.LSTM(16, activation="tanh", recurrent_activation='hard_sigmoid', dropout=drop, recurrent_dropout=rec_drop))
    model.add(tf.keras.layers.Dense(4, activation="softmax"))

    return model

model = create_model()

print(model.summary())

# Restore the weights
#model.load_weights('/content/my_checkpoint')


model.compile(
    loss=tf.keras.losses.CategoricalCrossentropy(),
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    metrics=["accuracy"],
)


Model: "LSTM-Model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, None, 32)          4992      
                                                                 
 lstm_1 (LSTM)               (None, 16)                3136      
                                                                 
 dense (Dense)               (None, 4)                 68        
                                                                 
Total params: 8,196
Trainable params: 8,196
Non-trainable params: 0
_________________________________________________________________
None


Train model

In [12]:
model.fit(train_generator(list_x_train, list_y_train), steps_per_epoch=10, epochs=20, verbose=2)

Epoch 1/20
10/10 - 15s - loss: 0.8505 - accuracy: 0.6795 - 15s/epoch - 2s/step
Epoch 2/20
10/10 - 14s - loss: 0.8029 - accuracy: 0.7193 - 14s/epoch - 1s/step
Epoch 3/20
10/10 - 19s - loss: 0.8534 - accuracy: 0.7075 - 19s/epoch - 2s/step
Epoch 4/20
10/10 - 17s - loss: 0.6396 - accuracy: 0.7822 - 17s/epoch - 2s/step
Epoch 5/20
10/10 - 18s - loss: 0.8629 - accuracy: 0.5984 - 18s/epoch - 2s/step
Epoch 6/20
10/10 - 24s - loss: 0.9870 - accuracy: 0.5313 - 24s/epoch - 2s/step
Epoch 7/20
10/10 - 18s - loss: 0.8120 - accuracy: 0.6873 - 18s/epoch - 2s/step
Epoch 8/20
10/10 - 18s - loss: 0.7406 - accuracy: 0.7462 - 18s/epoch - 2s/step
Epoch 9/20
10/10 - 15s - loss: 1.1327 - accuracy: 0.6182 - 15s/epoch - 1s/step
Epoch 10/20
10/10 - 17s - loss: 0.7442 - accuracy: 0.7539 - 17s/epoch - 2s/step
Epoch 11/20
10/10 - 17s - loss: 0.7042 - accuracy: 0.7677 - 17s/epoch - 2s/step
Epoch 12/20
10/10 - 17s - loss: 0.8330 - accuracy: 0.7030 - 17s/epoch - 2s/step
Epoch 13/20
10/10 - 16s - loss: 0.7025 - accuracy

<keras.callbacks.History at 0x7f29d88b2800>

In [13]:
model.evaluate(test_generator(list_x_test, list_y_test), verbose=2)

16/16 - 3s - loss: 0.5347 - accuracy: 0.8125 - 3s/epoch - 196ms/step


[0.534674346446991, 0.8125]

Save weights to c-style txt files

In [14]:
def write_values(file, weights, last=True, top=True):
    file.write('{')
    for i in range(weights.shape[0]):
        if 1 == len(weights.shape):
            file.write(str(weights[i]))
            if i != weights.shape[0]-1:
                file.write(', ')
        else:
            write_values(file, weights[i], (i==weights.shape[0]-1), top=False)
    file.write('}')
    if top:
        file.write(';')

    if last:
        file.write('\n')
    else:
        file.write(', \n')


#for w in range(1, len(model.layers)):
for layer in range(0, 2):
    
    weight_filename = "/content/layer_" + str(layer) + "_weights.txt" 

    with open(weight_filename, 'w') as file: # clear file
    
        file.write(f"#define layer_{layer}_input_size {model.layers[layer].weights[0].numpy().shape[0]} \n")
        file.write(f"#define layer_{layer}_kernel_size {model.layers[layer].weights[0].numpy().shape[1]} \n")
        file.write(f"const data_t layer_{layer}_kernel_weights[layer_{layer}_input_size][layer_{layer}_kernel_size] = ")
        write_values(file, model.layers[layer].weights[0].numpy())

        file.write(f"#define layer_{layer}_recurrent_kernel_size_0 {model.layers[layer].weights[1].numpy().shape[0]} \n")
        file.write(f"#define layer_{layer}_recurrent_kernel_size_1 {model.layers[layer].weights[1].numpy().shape[1]} \n")
        file.write(f"const data_t layer_{layer}_recurrent_kernel_weights[layer_{layer}_recurrent_kernel_size_0][layer_{layer}_recurrent_kernel_size_1] = ")
        write_values(file, model.layers[layer].weights[1].numpy())

        file.write(f"#define layer_{layer}_bias_size {model.layers[layer].weights[2].numpy().shape[0]} \n")
        file.write(f"const data_t layer_{layer}_bias_weights[layer_{layer}_bias_size] = ")
        write_values(file, model.layers[layer].weights[2].numpy())


layer = 2
weight_filename = "/content/layer_" + str(layer) + "_weights.txt"

with open(weight_filename, 'w') as file: # clear file

    file.write(f"#define layer_{layer}_input_size {model.layers[layer].weights[0].numpy().shape[0]} \n")
    file.write(f"#define layer_{layer}_kernel_size {model.layers[layer].weights[0].numpy().shape[1]} \n")
    file.write(f"const data_t layer_{layer}_weights[layer_{layer}_input_size][layer_{layer}_kernel_size] = ")
    write_values(file, model.layers[layer].weights[0].numpy())

    file.write(f"#define layer_{layer}_bias_size {model.layers[layer].weights[1].numpy().shape[0]} \n")
    file.write(f"const data_t layer_{layer}_bias_weights[layer_{layer}_bias_size] = ")
    write_values(file, model.layers[layer].weights[1].numpy())

Zip weights

In [15]:
# Directory path where the files are located
directory = '/content'

# Find all files ending with "weights.txt"
file_paths = glob.glob(directory + '/*weights.txt')

# Zip the files
with zipfile.ZipFile('/content/weights.zip', 'w') as zip_file:
    for file_path in file_paths:
        zip_file.write(file_path, arcname=file_path.split('/')[-1])

In [16]:
model.save_weights('/content/my_checkpoint')

For debugging layers

In [18]:
from tensorflow.keras.models import Model

test_model = create_model()

print(model.summary())
# Restore the weights
test_model.load_weights('/content/my_checkpoint')

XX = test_model.input 
YY = test_model.layers[0].output
new_model = Model(XX, YY)

for i in range(10):
    x = np.full((1, i + 1, 6), 5.0/4096)
    y = test_model(x)
    print(y.numpy())
    print(np.argmax(y.numpy()))
    

Model: "LSTM-Model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, None, 32)          4992      
                                                                 
 lstm_1 (LSTM)               (None, 16)                3136      
                                                                 
 dense (Dense)               (None, 4)                 68        
                                                                 
Total params: 8,196
Trainable params: 8,196
Non-trainable params: 0
_________________________________________________________________




None
[[0.346277   0.23488112 0.24178158 0.17706029]]
0
[[0.33470246 0.23595706 0.25155118 0.1777893 ]]
0
[[0.31555048 0.23938486 0.26574817 0.17931646]]
0
[[0.29011178 0.24390319 0.28416643 0.18181865]]
0
[[0.26049334 0.24788085 0.30615917 0.18546669]]
2
[[0.22941634 0.24946567 0.33055726 0.19056082]]
2
[[0.19968942 0.24693656 0.3557511  0.19762294]]
2
[[0.17353098 0.23915485 0.37996903 0.20734519]]
2
[[0.15210976 0.22591941 0.401646   0.2203248 ]]
2
[[0.13554733 0.20807372 0.4197126  0.23666638]]
2
