In [1]:
#!pip install tensorflow==2.14


import os
from google.colab import drive
from google.colab import files
drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/EEG_dataset')

Mounted at /content/drive


In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.callbacks import TensorBoard
from keras.layers import Dense, Dropout, LSTM
import numpy as np
import pandas as pd
from scipy import signal
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split

In [3]:
print(tf.__version__)
print(np.__version__)


2.17.1
1.26.4


In [4]:
log_dir = "logs/"

In [5]:
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

In [6]:
# Windowing parameters
window_size = 255  # 2 seconds at 256 Hz
overlap = 0.5  # 50% overlap between windows

# Function to window the EEG data
def window_data(data, window_size, overlap):
    step_size = int(window_size * (1 - overlap))
    windows = []
    for i in range(0, len(data) - window_size + 1, step_size):
        windows.append(data[i:i + window_size])
    return np.array(windows)

# Load and window data
dataframes = []
y = []
fil = ['back', 'forward', 'up', 'down', 'left', 'right']

for j in fil:
    for i in range(30):
        data = np.genfromtxt(j + '-' + str(i + 1) + '.csv', delimiter=',', skip_header=1)[:, 1:]
        windowed_data = window_data(data, window_size, overlap)
        dataframes.append(windowed_data)
        y.append([j] * len(windowed_data))  # Match labels to the windowed data

dataframes = np.vstack(dataframes)
y = np.hstack(y)

# Preprocessing: train-test split
X_train, X_test, y_train, y_test = train_test_split(dataframes, y, test_size=0.2, random_state=42)

# Mapping class labels to integers
class_mapping = {'down': 0, 'up': 1, 'left': 2, 'right': 3, 'forward': 4, 'back': 5}
y_train_int = np.array([class_mapping[label] for label in y_train])
y_test_int = np.array([class_mapping[label] for label in y_test])

In [7]:
dataframes.shape

(5220, 255, 4)

In [8]:
X_train.shape

(4176, 255, 4)

In [9]:




# Bandpass filter
lowcut, highcut, fs = 0.1, 60, 256
def butter_bandpass(lowcut, highcut, fs, order=4):
    nyquist = 0.5 * fs
    low, high = lowcut / nyquist, highcut / nyquist
    b, a = signal.butter(order, [low, high], btype='band')
    return b, a

def apply_bandpass_filter(data, lowcut, highcut, fs, order=4):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    return signal.filtfilt(b, a, data, axis=0)

X_train = np.array([apply_bandpass_filter(x, lowcut, highcut, fs) for x in X_train])
X_test = np.array([apply_bandpass_filter(x, lowcut, highcut, fs) for x in X_test])

# Convert data to frequency domain
def convert_to_frequency_domain(data, fs):
    fft_data = np.fft.fft(data, axis=0)
    frequencies = np.fft.fftfreq(data.shape[0], 1/fs)
    positive_freq_indices = frequencies > 0
    return np.abs(fft_data[positive_freq_indices])

X_train = np.array([convert_to_frequency_domain(x, fs) for x in X_train])
X_test = np.array([convert_to_frequency_domain(x, fs) for x in X_test])



# # Robust scaling
# scaler = RobustScaler()
# for i in range(X_train.shape[2]):  # Iterate over channels
#     X_train[:, :, i] = scaler.fit_transform(X_train[:, :, i])
#     X_test[:, :, i] = scaler.transform(X_test[:, :, i])




# # Robust scaling
# scaler = RobustScaler()

# # Fit the scaler on the training data first
# for i in range(X_train.shape[2]):  # Iterate over channels
#     X_train[:, :, i] = scaler.fit_transform(X_train[:, :, i])  # Fit and transform the training data
#     X_test[:, :, i] = scaler.transform(X_test[:, :, i])  # Only transform the test data, without fitting again

import joblib


# # Save the scaler to a file
# joblib.dump(scaler, '/content/scaler.pkl')

# # Download the scaler file to your laptop
# from google.colab import files
# files.download('/content/scaler.pkl')


# Checking the shape of the data before training
print("Shape of X_train:", X_train.shape)
print("Shape of X_test:", X_test.shape)

# Example: If you are using a windowing function
windowed_data = window_data(data, window_size=127, overlap=0.5)
print("Shape of windowed_data:", windowed_data.shape)




# Initialize the RobustScaler
scaler = RobustScaler()

# Reshape and scale the training data
for i in range(X_train.shape[2]):  # Iterate over channels
    # Reshape each channel from (windows, 127) to (windows*channels, 127)
    reshaped_train_data = X_train[:, :, i].reshape(-1, X_train.shape[1])
    reshaped_test_data = X_test[:, :, i].reshape(-1, X_test.shape[1])

    # Fit the scaler on the reshaped training data
    X_train[:, :, i] = scaler.fit_transform(reshaped_train_data).reshape(X_train[:, :, i].shape)
    X_test[:, :, i] = scaler.transform(reshaped_test_data).reshape(X_test[:, :, i].shape)

# Save the scaler to a file
joblib.dump(scaler, '/content/scaler.pkl')

# Download the scaler file to your laptop
files.download('/content/scaler.pkl')


Shape of X_train: (4176, 127, 4)
Shape of X_test: (1044, 127, 4)
Shape of windowed_data: (59, 127, 4)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [10]:
# New CNN + LSTM Model
def create_model(input_shape, num_classes):
    model = models.Sequential()

    # First Convolutional Layer with L2 Regularization
    model.add(layers.Conv2D(32, (3, 3), activation='relu', padding="same", input_shape=input_shape,
                            kernel_regularizer=regularizers.l2(0.01)))
    model.add(layers.MaxPooling2D((2, 2),padding='same'))
    # model.add(Dropout(0.3))  # Dropout to prevent overfitting

    # Second Convolutional Layer
    model.add(layers.Conv2D(64, (5, 5), activation='relu', padding="same"))
    model.add(layers.MaxPooling2D((2, 2),padding='same'))
    # model.add(Dropout(0.3))

    # Third Convolutional Layer
    model.add(layers.Conv2D(128, (7, 7), activation='relu', padding="same"))
    model.add(layers.MaxPooling2D((2, 2),padding='same'))
    # model.add(Dropout(0.3))

    model.add(layers.Conv2D(256, (7, 7), activation='relu', padding="same"))
    model.add(layers.MaxPooling2D((2, 2),padding='same'))

    # Global Average Pooling instead of Flatten
    model.add(layers.GlobalAveragePooling2D())

    gap_output_shape = 256

    # LSTM Layer to capture temporal dependencies
    model.add(layers.Reshape((-1, gap_output_shape))) # Reshape to (1, number of filters)
    model.add(layers.LSTM(64, return_sequences=False, activation='tanh',recurrent_activation='sigmoid'))

    # Fully Connected Layers
    model.add(Dense(64, activation='relu'))
    # model.add(Dropout(0.2))
    model.add(Dense(num_classes, activation='softmax'))

    return model

In [11]:
# Instead of reshaping to window_size // 2, use the actual time steps (255 in this case)
window_size = 127  # From X_train.shape
num_channels = 4

# Number of windows is already calculated as 2016
X_train_reshaped = X_train.reshape(X_train.shape[0], window_size, num_channels, 1)
X_test_reshaped = X_test.reshape(X_test.shape[0], window_size, num_channels, 1)

# print(f'Reshaped X_train shape: {X_train_reshaped.shape}')
# print(f'Reshaped X_test shape: {X_test_reshaped.shape}')


input_shape = (window_size, num_channels, 1) # Define the input shape for the model

num_classes = 6



# Create the model
model = create_model(input_shape, num_classes)

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train_reshaped, y_train_int, epochs=50, batch_size=32, validation_data=(X_test_reshaped, y_test_int), callbacks=[tensorboard_callback])

# Evaluate the model
test_loss, test_acc = model.evaluate(X_test_reshaped, y_test_int)
print(f'Test Accuracy: {test_acc}')

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/50
[1m131/131[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 29ms/step - accuracy: 0.1676 - loss: 1.8001 - val_accuracy: 0.1810 - val_loss: 1.7677
Epoch 2/50
[1m131/131[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 17ms/step - accuracy: 0.2138 - loss: 1.7497 - val_accuracy: 0.2050 - val_loss: 1.7712
Epoch 3/50
[1m131/131[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 16ms/step - accuracy: 0.2351 - loss: 1.7316 - val_accuracy: 0.2711 - val_loss: 1.7028
Epoch 4/50
[1m131/131[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 17ms/step - accuracy: 0.2618 - loss: 1.6961 - val_accuracy: 0.1877 - val_loss: 1.7326
Epoch 5/50
[1m131/131[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 19ms/step - accuracy: 0.2871 - loss: 1.6562 - val_accuracy: 0.2969 - val_loss: 1.6220
Epoch 6/50
[1m131/131[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 17ms/step - accuracy: 0.2882 - loss: 1.6377 - val_accuracy: 0.3075 - val_loss: 1.5789
Epoch 7/50
[1m131/13

In [12]:
model.save('cnn_model.h5')



In [13]:
import tensorflow as tf

# Load the model
model = tf.keras.models.load_model('cnn_model.h5')

# Check the input shape of the model
input_shape = model.input_shape
print(f"Model expects input shape: {input_shape}")

# Convert the model to TensorFlow Lite
converter = tf.lite.TFLiteConverter.from_keras_model(model)

# Restrict to TensorFlow Lite built-in operators
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]

# Convert the model
try:
    tflite_model = converter.convert()
    # Save the TFLite model
    with open('cnn_lstm_model_no_flex.tflite', 'wb') as f:
        f.write(tflite_model)
    print("Model successfully converted without Flex Delegate!")
except Exception as e:
    print("Error during model conversion:", str(e))




Model expects input shape: (None, 127, 4, 1)
Saved artifact at '/tmp/tmpbzqt8cnz'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 127, 4, 1), dtype=tf.float32, name='input_layer')
Output Type:
  TensorSpec(shape=(None, 6), dtype=tf.float32, name=None)
Captures:
  138892887117136: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887119600: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887278864: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887280976: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887292592: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887290832: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887414688: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887412752: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887424192: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892887610592: TensorSpec(shape=(), dtyp

In [None]:
files.download('cnn_model.h5')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [14]:
import tensorflow as tf

# Load the model
model = tf.keras.models.load_model('cnn_model.h5')

# Check the input shape of the model
input_shape = model.input_shape
print(f"Model expects input shape: {input_shape}")

# Convert the model with custom ops allowed
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.allow_custom_ops = True  # Allow custom operations

# Specify supported ops and settings for conversion
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,
    tf.lite.OpsSet.SELECT_TF_OPS
]
converter._experimental_lower_tensor_list_ops = False  # Disable lowering of tensor list ops

# Perform the conversion
try:
    tflite_model = converter.convert()
    with open('cnn_lstm_model.tflite', 'wb') as f:
        f.write(tflite_model)
    print("Model converted successfully!")
except Exception as e:
    print("Error during model conversion:", str(e))




Model expects input shape: (None, 127, 4, 1)
Saved artifact at '/tmp/tmp2ebgg9w2'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 127, 4, 1), dtype=tf.float32, name='input_layer')
Output Type:
  TensorSpec(shape=(None, 6), dtype=tf.float32, name=None)
Captures:
  138892570919152: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892570925312: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892571019040: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892571023968: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892571032064: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892571032768: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892571106944: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892571105008: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892570920736: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138892571109056: TensorSpec(shape=(), dtyp

In [15]:
with open('cnn_lstm_model.tflite', 'wb') as f:
    f.write(tflite_model)
print("Model saved as cnn_lstm_model.tflite")


Model saved as cnn_lstm_model.tflite


In [None]:
files.download('cnn_lstm_model.tflite')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [16]:
import numpy as np

# Step 1: Load CSV data
# Replace 'your_data.csv' with the path to your actual CSV file
data = np.loadtxt('forward-10.csv', delimiter=',', skiprows=1)  # Adjust delimiter if necessary

# Step 2: Flatten the array to 1D
flat_data = data.flatten()

# Step 3: Print the data in C++ array format
print("float input_data[] = {")
print(", ".join(map(str, flat_data)))
print("};")

float input_data[] = {
0.0, 6245.94911884655, -8928.829005817057, -60131.15467204508, -4064.441211753037, 1.0, 6244.809179879329, -8939.043753033133, -60124.65031440858, -4107.580078551779, 2.0, 6467.0302232539925, -8972.05727959362, -60151.91944264406, -3898.591267894658, 3.0, 6737.732200352216, -8997.515916528215, -60183.971844192965, -3581.8894007074123, 4.0, 6603.867602809382, -8981.869695409501, -60191.5043820744, -3664.702613914325, 5.0, 6236.04729605285, -8931.734732596247, -60142.15173031709, -4053.578263947757, 6.0, 6229.207662249525, -8922.726979580759, -60121.90104984058, -4107.378912851681, 7.0, 6458.625967338797, -8955.159360785408, -60143.76105591787, -3904.492128430859, 8.0, 6734.37943868392, -8988.463460023817, -60179.657957513096, -3590.114842666965, 9.0, 6626.576975175974, -8982.115564598509, -60188.598655295216, -3648.542302673138, 10.0, 6260.902435887151, -8948.610299660004, -60139.82714889374, -4037.328545728749, 11.0, 6238.573043176299, -8947.492712437239, -60118.

In [17]:
# Load a single dataset file
data = np.genfromtxt('up-13.csv', delimiter=',', skip_header=1)[:, 1:]

# Apply the same preprocessing steps used for training data
windowed_data = window_data(data, window_size, overlap)

# Apply bandpass filter
filtered_data = apply_bandpass_filter(windowed_data, lowcut, highcut, fs)

# Convert to frequency domain
frequency_domain_data = convert_to_frequency_domain(filtered_data, fs)

# Robust scaling
for i in range(frequency_domain_data.shape[2]):
    frequency_domain_data[:, :, i] = scaler.transform(frequency_domain_data[:, :, i])

# Reshape for the model
input_data = frequency_domain_data.reshape(frequency_domain_data.shape[0], window_size, num_channels, 1)

# Make predictions
predictions = model.predict(input_data)

# Get predicted classes
predicted_classes = np.argmax(predictions, axis=1)

# You can then map the predicted classes back to labels if needed
predicted_labels = [list(class_mapping.keys())[i] for i in predicted_classes]

single_output = list(class_mapping.keys())[np.argmax(np.bincount(predicted_classes))]

print(single_output)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 311ms/step
up
