In [4]:
import time
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, accuracy_score, classification_report
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import warnings
warnings.filterwarnings('ignore')

In [5]:
import myimporter
from BCI_functions import *  # BCI_functions.ipynb contains some functions we might use multiple times in this tutorial

In [6]:
# ── Load & filter dataset ────────────────────────────────────────────────────
dataset_1 = np.load('1.npy')
print('dataset_1 shape:', dataset_1.shape)

dataset_1 shape: (259520, 65)


In [7]:
# Check GPU availability
gpus = tf.config.list_physical_devices('GPU')
device = 'GPU' if gpus else 'CPU'
print(f'We are using {device} now.')

We are using CPU now.


In [8]:
# Remove unwanted labels
removed_label = [2, 3, 4, 5, 6, 7, 8, 9, 10]
for ll in removed_label:
    idx = dataset_1[:, -1] != ll
    dataset_1 = dataset_1[idx]

In [9]:
# ── Hyperparameters ──────────────────────────────────────────────────────────
n_class        = int(11 - len(removed_label))
no_feature     = 64
segment_length = 16
LR             = 0.001
EPOCH          = 101
n_hidden       = 64
l2             = 0.005

In [10]:
# ── Segmentation & splitting ─────────────────────────────────────────────────
data_seg = extract(dataset_1, n_classes=n_class, n_fea=no_feature,
                   time_window=segment_length, moving=(segment_length / 2))
print('After segmentation, the shape of the data:', data_seg.shape)

no_longfeature   = no_feature * segment_length
data_seg_feature = data_seg[:, :no_longfeature]
data_seg_label   = data_seg[:, no_longfeature:no_longfeature + 1]

train_feature, test_feature, train_label, test_label = train_test_split(
    data_seg_feature, data_seg_label, shuffle=True
)

After segmentation, the shape of the data: (2440, 1025)


In [11]:
# ── Normalisation ─────────────────────────────────────────────────────────────
train_feature_2d = train_feature.reshape([-1, no_feature])
test_feature_2d  = test_feature.reshape([-1, no_feature])

scaler1          = StandardScaler().fit(train_feature_2d)
train_fea_norm1  = scaler1.transform(train_feature_2d)
test_fea_norm1   = scaler1.transform(test_feature_2d)

print('After normalization, training feature shape:', train_fea_norm1.shape,
      '\nAfter normalization, test feature shape:',     test_fea_norm1.shape)

# Reshape to 3-D: (samples, time_steps, features)
train_fea_norm1 = train_fea_norm1.reshape([-1, segment_length, no_feature]).astype(np.float32)
test_fea_norm1  = test_fea_norm1.reshape([-1, segment_length, no_feature]).astype(np.float32)

print('After reshape, training feature shape:', train_fea_norm1.shape,
      '\nAfter reshape, test feature shape:',     test_fea_norm1.shape)

train_label_flat = train_label.flatten().astype(np.int64)
test_label_flat  = test_label.flatten().astype(np.int64)

BATCH_size = test_fea_norm1.shape[0]  # mirrors original: use test size as batch size

After normalization, training feature shape: (29280, 64) 
After normalization, test feature shape: (9760, 64)
After reshape, training feature shape: (1830, 16, 64) 
After reshape, test feature shape: (610, 16, 64)


In [12]:
# ── tf.data pipelines ─────────────────────────────────────────────────────────
train_dataset = (
    tf.data.Dataset
    .from_tensor_slices((train_fea_norm1, train_label_flat))
    .shuffle(buffer_size=1024)
    .batch(BATCH_size)
    .prefetch(tf.data.AUTOTUNE)
)

test_dataset = tf.data.Dataset.from_tensor_slices(
    (test_fea_norm1, test_label_flat)
).batch(BATCH_size)

In [13]:
# ── Model definition ──────────────────────────────────────────────────────────
def build_gru_model(no_feature, n_hidden, n_class, l2_coeff):
    """
    Equivalent to the PyTorch GRU class:
      - 2-layer GRU (stacked)
      - Dropout 0.3 after GRU
      - Dense output layer
    """
    regularizer = keras.regularizers.l2(l2_coeff)

    model = keras.Sequential([
        # Layer 1 GRU – return sequences so layer 2 can see all time-steps
        layers.GRU(
            units=n_hidden,
            return_sequences=True,
            kernel_regularizer=regularizer,
            recurrent_regularizer=regularizer,
            bias_regularizer=regularizer,
            name='gru_layer_1'
        ),
        # Layer 2 GRU – return only last time-step output (mirrors [:, -1, :])
        layers.GRU(
            units=n_hidden,
            return_sequences=False,
            kernel_regularizer=regularizer,
            recurrent_regularizer=regularizer,
            bias_regularizer=regularizer,
            name='gru_layer_2'
        ),
        layers.Dropout(0.3),
        layers.Dense(n_class, name='output')          # logits; softmax applied in loss
    ])

    model.build(input_shape=(None, segment_length, no_feature))
    return model


gru_model = build_gru_model(no_feature, n_hidden, n_class, l2)
gru_model.summary()

optimizer  = keras.optimizers.Adam(learning_rate=LR)
loss_func  = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

In [14]:
# ── Custom training loop ──────────────────────────────────────────────────────
@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = gru_model(x, training=True)
        loss   = loss_func(y, logits)
    grads = tape.gradient(loss, gru_model.trainable_variables)
    optimizer.apply_gradients(zip(grads, gru_model.trainable_variables))
    return loss, logits


best_acc = []
best_auc = []

start_time = time.perf_counter()

for epoch in range(EPOCH):
    # ── training ──
    for step, (train_x, train_y) in enumerate(train_dataset):
        loss, train_logits = train_step(train_x, train_y)

    # ── evaluation every 10 epochs ──
    if epoch % 10 == 0:
        # Test inference (no dropout)
        test_logits = gru_model(test_fea_norm1, training=False)
        test_loss   = loss_func(test_label_flat, test_logits).numpy()

        # Softmax probabilities for AUC
        pred_score    = tf.nn.softmax(test_logits, axis=1).numpy()
        test_y_onehot = one_hot(test_label_flat)          # your helper from BCI_functions
        auc_score     = roc_auc_score(test_y_onehot, pred_score)

        # Hard predictions
        pred_y     = np.argmax(pred_score, axis=1)
        pred_train = np.argmax(tf.nn.softmax(train_logits, axis=1).numpy(), axis=1)

        test_acc  = accuracy_score(test_label_flat, pred_y)
        train_acc = accuracy_score(train_y.numpy(), pred_train)

        print(
            f'Epoch: {epoch} | train loss: {loss.numpy():.4f}  train ACC: {train_acc:.4f}'
            f' | test loss: {test_loss:.4f}  test ACC: {test_acc:.4f} | AUC: {auc_score:.4f}'
        )
        best_acc.append(test_acc)
        best_auc.append(auc_score)

running_time = time.perf_counter() - start_time

print(classification_report(test_label_flat, pred_y))
print(f'BEST TEST ACC: {max(best_acc)}, AUC: {max(best_auc)}')
print(f'Total Running Time: {round(running_time, 2)} seconds')

Epoch: 0 | train loss: 0.7150  train ACC: 0.5016 | test loss: 0.6889  test ACC: 0.5377 | AUC: 0.5685
Epoch: 10 | train loss: 0.4184  train ACC: 0.8344 | test loss: 0.4594  test ACC: 0.8033 | AUC: 0.8722
Epoch: 20 | train loss: 0.1810  train ACC: 0.9459 | test loss: 0.2215  test ACC: 0.9230 | AUC: 0.9683
Epoch: 30 | train loss: 0.0965  train ACC: 0.9803 | test loss: 0.2111  test ACC: 0.9180 | AUC: 0.9791
Epoch: 40 | train loss: 0.0418  train ACC: 0.9918 | test loss: 0.1844  test ACC: 0.9607 | AUC: 0.9824
Epoch: 50 | train loss: 0.0572  train ACC: 0.9852 | test loss: 0.1605  test ACC: 0.9557 | AUC: 0.9859
Epoch: 60 | train loss: 0.0378  train ACC: 0.9885 | test loss: 0.1745  test ACC: 0.9492 | AUC: 0.9858
Epoch: 70 | train loss: 0.1003  train ACC: 0.9672 | test loss: 0.1974  test ACC: 0.9459 | AUC: 0.9857
Epoch: 80 | train loss: 0.0174  train ACC: 0.9918 | test loss: 0.1518  test ACC: 0.9541 | AUC: 0.9893
Epoch: 90 | train loss: 0.0170  train ACC: 0.9885 | test loss: 0.1856  test ACC: 0.

In [15]:
# ── Model inspection (mirrors the final notebook cells) ──────────────────────
print(gru_model.summary())

# Print each layer and its weights (equivalent to named_parameters())
for layer in gru_model.layers:
    for weight in layer.weights:
        print(f"{weight.name}: {weight.shape}")

None
kernel: (64, 192)
recurrent_kernel: (64, 192)
bias: (2, 192)
kernel: (64, 192)
recurrent_kernel: (64, 192)
bias: (2, 192)
kernel: (64, 2)
bias: (2,)


In [16]:
tflite_model_name = 'gru_model'  # Will be given .tflite suffix
c_model_name = 'gru_model'       # Will be given .h suffix

In [18]:
converter = tf.lite.TFLiteConverter.from_keras_model(gru_model)
converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_SIZE]

# Add these two lines to fix the TensorListReserve error
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,
    tf.lite.OpsSet.SELECT_TF_OPS
]
converter._experimental_lower_tensor_list_ops = False

tflite_model = converter.convert()
open(tflite_model_name + '.tflite', 'wb').write(tflite_model)



INFO:tensorflow:Assets written to: C:\Users\ajrbe\AppData\Local\Temp\tmpbpzcy3c0\assets


INFO:tensorflow:Assets written to: C:\Users\ajrbe\AppData\Local\Temp\tmpbpzcy3c0\assets


Saved artifact at 'C:\Users\ajrbe\AppData\Local\Temp\tmpbpzcy3c0'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 16, 64), dtype=tf.float32, name='keras_tensor')
Output Type:
  TensorSpec(shape=(None, 2), dtype=tf.float32, name=None)
Captures:
  1798697846736: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1798697845776: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1798697846160: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1798697845008: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1798697847120: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1798697846544: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1798697847696: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1798697847504: TensorSpec(shape=(), dtype=tf.resource, name=None)




83280

In [19]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

  c_str = ''

  # Create header guard
  c_str += '#ifndef ' + var_name.upper() + '_H\n'
  c_str += '#define ' + var_name.upper() + '_H\n\n'

  # Add array length at top of file
  c_str += '\nunsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

  # Declare C variable
  c_str += 'unsigned char ' + var_name + '[] = {'
  hex_array = []
  for i, val in enumerate(hex_data) :

    # Construct string from hex
    hex_str = format(val, '#04x')

    # Add formatting so each line stays within 80 characters
    if (i + 1) < len(hex_data):
      hex_str += ','
    if (i + 1) % 12 == 0:
      hex_str += '\n '
    hex_array.append(hex_str)

  # Add closing brace
  c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

  # Close out header guard
  c_str += '#endif //' + var_name.upper() + '_H'

  return c_str

In [20]:
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
  file.write(hex_to_c_array(tflite_model, c_model_name))