In [1]:
import tensorflow as tf
gpus = tf.config.list_physical_devices("GPU")
if gpus:
    for gpu in gpus:
        print("Found a GPU with the name:", gpu)
else:
    print("Failed to detect a GPU.")


Found a GPU with the name: PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


In [2]:
import numpy as np
import os
import glob

In [3]:
# files = np.sort(glob.glob("./EEG FOLDER/*"))
# print("Total number of files: ", len(files))
# print("Showing first 10 files...")
# files[:10]

In [4]:
# Recursively glob for CSV files within each subdirectory of the EEG FOLDER
files = glob.glob('./EEG FOLDER/*/*.csv', recursive=True)
files.sort()  # Optional: sort the files for consistency

In [5]:
print("Total number of files: ", len(files))
print("Showing first 10 files...")
print(files[:10])


Total number of files:  360
Showing first 10 files...
['./EEG FOLDER\\Fear\\cz_eeg_data_11.csv', './EEG FOLDER\\Fear\\cz_eeg_data_15.csv', './EEG FOLDER\\Fear\\cz_eeg_data_17.csv', './EEG FOLDER\\Fear\\cz_eeg_data_18.csv', './EEG FOLDER\\Fear\\cz_eeg_data_2.csv', './EEG FOLDER\\Fear\\cz_eeg_data_5.csv', './EEG FOLDER\\Fear\\ha_eeg_data_11.csv', './EEG FOLDER\\Fear\\ha_eeg_data_15.csv', './EEG FOLDER\\Fear\\ha_eeg_data_17.csv', './EEG FOLDER\\Fear\\ha_eeg_data_18.csv']


In [6]:
import pandas as pd
import re            # To match regular expression for extracting labels

In [7]:
glob.glob("./EEG FOLDER/*")

['./EEG FOLDER\\Fear',
 './EEG FOLDER\\Happy',
 './EEG FOLDER\\Neutral',
 './EEG FOLDER\\Sad']

In [8]:
glob.glob("./EEG FOLDER/Neutral/*")[:10] # Showing first 10 files of Neutral folder

['./EEG FOLDER/Neutral\\cz_eeg_data_21.csv',
 './EEG FOLDER/Neutral\\cz_eeg_data_23.csv',
 './EEG FOLDER/Neutral\\cz_eeg_data_4.csv',
 './EEG FOLDER/Neutral\\cz_eeg_data_6.csv',
 './EEG FOLDER/Neutral\\cz_eeg_data_7.csv',
 './EEG FOLDER/Neutral\\cz_eeg_data_9.csv',
 './EEG FOLDER/Neutral\\ha_eeg_data_21.csv',
 './EEG FOLDER/Neutral\\ha_eeg_data_23.csv',
 './EEG FOLDER/Neutral\\ha_eeg_data_4.csv',
 './EEG FOLDER/Neutral\\ha_eeg_data_6.csv']

In [9]:
glob.glob("./EEG FOLDER/Sad/*")[:10]  #Sad

['./EEG FOLDER/Sad\\cz_eeg_data_1.csv',
 './EEG FOLDER/Sad\\cz_eeg_data_10.csv',
 './EEG FOLDER/Sad\\cz_eeg_data_12.csv',
 './EEG FOLDER/Sad\\cz_eeg_data_13.csv',
 './EEG FOLDER/Sad\\cz_eeg_data_14.csv',
 './EEG FOLDER/Sad\\cz_eeg_data_8.csv',
 './EEG FOLDER/Sad\\ha_eeg_data_1.csv',
 './EEG FOLDER/Sad\\ha_eeg_data_10.csv',
 './EEG FOLDER/Sad\\ha_eeg_data_12.csv',
 './EEG FOLDER/Sad\\ha_eeg_data_13.csv']

In [10]:
glob.glob("./EEG FOLDER/Fear/*")[:10]  #Fear

['./EEG FOLDER/Fear\\cz_eeg_data_11.csv',
 './EEG FOLDER/Fear\\cz_eeg_data_15.csv',
 './EEG FOLDER/Fear\\cz_eeg_data_17.csv',
 './EEG FOLDER/Fear\\cz_eeg_data_18.csv',
 './EEG FOLDER/Fear\\cz_eeg_data_2.csv',
 './EEG FOLDER/Fear\\cz_eeg_data_5.csv',
 './EEG FOLDER/Fear\\ha_eeg_data_11.csv',
 './EEG FOLDER/Fear\\ha_eeg_data_15.csv',
 './EEG FOLDER/Fear\\ha_eeg_data_17.csv',
 './EEG FOLDER/Fear\\ha_eeg_data_18.csv']

In [11]:
glob.glob("./EEG FOLDER/Happy/*")[:10]  #Happy

['./EEG FOLDER/Happy\\cz_eeg_data_16.csv',
 './EEG FOLDER/Happy\\cz_eeg_data_19.csv',
 './EEG FOLDER/Happy\\cz_eeg_data_20.csv',
 './EEG FOLDER/Happy\\cz_eeg_data_22.csv',
 './EEG FOLDER/Happy\\cz_eeg_data_24.csv',
 './EEG FOLDER/Happy\\cz_eeg_data_3.csv',
 './EEG FOLDER/Happy\\ha_eeg_data_16.csv',
 './EEG FOLDER/Happy\\ha_eeg_data_19.csv',
 './EEG FOLDER/Happy\\ha_eeg_data_20.csv',
 './EEG FOLDER/Happy\\ha_eeg_data_22.csv']

In [12]:
def data_generator(file_list, batch_size=20):
    i = 0
    while True:
        if i * batch_size >= len(file_list):  # This loop is used to run the generator indefinitely.
            i = 0
            np.random.shuffle(file_list)
        else:
            file_chunk = file_list[i * batch_size:(i + 1) * batch_size]
            data = []
            labels = []
            label_classes = ["Neutral", "Sad", "Fear", "Happy"]
            for file_path in file_chunk:
                directory_path = os.path.dirname(file_path)
                label = os.path.basename(directory_path)  # This should correctly extract the folder name as the label
                label_index = label_classes.index(label)  # Get the index of the label in label_classes
                temp = pd.read_csv(file_path)
                temp = temp.iloc[:, 1:].values.reshape(400, 62, 1)
                data.append(temp)
                labels.append(label_index)
            data = np.asarray(data).reshape(-1, 400, 62, 1)
            labels = np.asarray(labels)
            yield data, labels
            i += 1


In [13]:
generated_data = data_generator(files, batch_size = 100)

In [14]:
num = 0
for data, labels in generated_data:
    print(data.shape, labels.shape)
    print(labels, "<--Labels")  # Just to see the lables
    print()
    num = num + 1
    if num > 5: break

(100, 400, 62, 1) (100,)
[2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 3 3 3 3 3 3 3 3 3 3] <--Labels

(100, 400, 62, 1) (100,)
[3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3
 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3
 3 3 3 3 3 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] <--Labels

(100, 400, 62, 1) (100,)
[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1] <--Labels

(60, 400, 62, 1) (60,)
[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1] <--Labels

(100, 400, 62, 1) (100,)
[1 3 1 0 2 2 1 2 0 3 0 3 2 0 0 0 1 1 1 3 1 0 3 3 0 1 3 3 1 2 2 2 1 2 2 2 2
 0 1 2 3 1 2 3 1 0 3 3 

In [15]:
import tensorflow as tf


In [22]:
def tf_data_generator(file_list, batch_size = 20):
    i = 0
    while True:
        if i*batch_size >= len(file_list):  
            i = 0
            np.random.shuffle(file_list)
        else:
            file_chunk = file_list[i*batch_size:(i+1)*batch_size] 
            data = []
            labels = []
            label_classes = tf.constant(["Neutral", "Sad", "Fear", "Happy"]) # This line has changed.
            for file in file_chunk:
                directory_path = os.path.dirname(file)
                label = os.path.basename(directory_path)  # This should correctly extract the folder name as the label
                label_index = tf.where(tf.equal(label_classes, label))  # Find index of label in label_classes
                label_index = tf.squeeze(label_index)  # Remove extra dimensions
                temp = pd.read_csv(open(file,'r'))
                temp = temp.iloc[:, 1:].values  # Convert DataFrame to numpy array after dropping the first column

                # Normalize each feature to zero mean and unit variance
                mean = np.mean(temp, axis=0)
                std = np.std(temp, axis=0)
                normalized_temp = (temp - mean) / (std + 1e-8)  # Adding epsilon to avoid division by zero

                # Reshape for the model
                normalized_temp = normalized_temp.reshape(400, 62, 1)
                data.append(normalized_temp)
           
                labels.append(label_index)

            data = np.asarray(data).reshape(-1,400,62,1)
            labels = np.asarray(labels)
            yield data, labels
            i = i + 1

In [23]:
check_data = tf_data_generator(files, batch_size = 100)

In [24]:
num = 0
for data, labels in check_data:
    print(data.shape, labels.shape)
    print(labels, "<--Labels")
    print()
    num = num + 1
    if num > 5: break


(100, 400, 62, 1) (100,)
[3 0 2 1 0 0 3 2 3 1 1 1 3 1 1 3 0 3 1 1 2 2 3 2 3 0 1 2 3 1 0 0 1 3 0 2 0
 1 0 2 2 2 1 2 3 3 3 2 1 0 0 1 2 3 2 2 2 3 3 3 2 2 3 1 0 0 1 3 1 0 0 3 1 3
 1 0 0 0 0 0 2 0 1 0 3 0 2 1 3 0 3 0 0 0 3 3 1 3 1 1] <--Labels

(100, 400, 62, 1) (100,)
[0 2 0 1 2 1 3 1 2 2 2 3 3 1 1 2 2 3 2 2 1 0 3 3 3 1 3 1 0 3 1 2 0 2 1 1 0
 0 1 2 1 1 2 0 3 1 3 1 0 3 3 1 3 3 3 1 2 3 0 0 2 3 3 1 1 0 2 0 3 2 2 3 1 0
 2 1 3 2 2 2 2 2 0 0 3 1 0 3 0 3 3 2 1 2 3 3 3 2 1 0] <--Labels

(100, 400, 62, 1) (100,)
[3 0 1 3 2 1 3 0 3 2 2 0 2 3 2 2 1 2 0 0 1 3 1 0 3 0 0 0 1 0 0 0 1 1 1 1 0
 2 0 2 1 3 2 0 2 1 2 2 1 0 1 2 2 0 1 0 1 0 3 3 3 3 3 0 3 1 3 1 0 1 2 0 0 2
 1 2 0 2 3 1 3 1 1 0 1 3 3 2 3 1 2 0 2 0 3 2 3 0 2 1] <--Labels

(60, 400, 62, 1) (60,)
[3 2 3 2 3 3 1 1 2 2 2 2 3 0 0 2 2 1 0 3 1 1 0 0 2 3 2 3 1 1 0 0 0 0 1 3 3
 2 0 2 1 1 1 3 2 2 0 2 1 1 0 2 3 2 1 2 2 0 0 0] <--Labels

(100, 400, 62, 1) (100,)
[2 0 1 3 1 2 1 1 1 3 2 2 0 3 1 2 1 2 2 2 0 0 1 2 0 0 1 0 3 3 2 3 3 0 2 0 0
 1 2 1 1 0 0 1 2 1 0 1 

In [25]:
batch_size = 150
dataset = tf.data.Dataset.from_generator(tf_data_generator,args= [files, batch_size],output_types = (tf.float32, tf.float32),
                                                output_shapes = ((None,400,62,1),(None,)))

In [26]:
# Check whether dataset works or not.
num = 0
for data, labels in dataset:
    print(data.shape, labels.shape)
    print(labels)
    print()
    num = num + 1
    if num > 7: break


(150, 400, 62, 1) (150,)
tf.Tensor(
[2. 0. 1. 3. 1. 2. 1. 1. 1. 3. 2. 2. 0. 3. 1. 2. 1. 2. 2. 2. 0. 0. 1. 2.
 0. 0. 1. 0. 3. 3. 2. 3. 3. 0. 2. 0. 0. 1. 2. 1. 1. 0. 0. 1. 2. 1. 0. 1.
 1. 1. 2. 3. 2. 0. 1. 2. 0. 0. 2. 2. 3. 1. 2. 2. 1. 3. 2. 2. 3. 1. 0. 0.
 2. 2. 1. 3. 0. 3. 3. 0. 1. 3. 0. 3. 2. 2. 2. 2. 0. 0. 0. 0. 1. 2. 1. 2.
 2. 0. 0. 1. 0. 0. 3. 3. 3. 1. 3. 1. 3. 0. 3. 3. 3. 0. 1. 1. 0. 2. 1. 2.
 2. 3. 3. 1. 3. 3. 1. 3. 2. 1. 3. 0. 2. 2. 2. 2. 1. 1. 3. 1. 2. 3. 0. 1.
 2. 0. 1. 1. 1. 2.], shape=(150,), dtype=float32)

(150, 400, 62, 1) (150,)
tf.Tensor(
[1. 2. 1. 3. 3. 3. 3. 3. 3. 3. 2. 2. 2. 1. 1. 3. 3. 1. 0. 3. 2. 2. 0. 1.
 1. 0. 1. 2. 3. 1. 1. 1. 3. 0. 1. 2. 1. 1. 3. 1. 0. 2. 3. 0. 1. 2. 0. 0.
 3. 0. 1. 3. 1. 3. 0. 3. 3. 0. 1. 3. 2. 3. 0. 2. 0. 3. 0. 2. 1. 2. 3. 2.
 3. 2. 1. 0. 2. 3. 0. 0. 0. 3. 2. 0. 1. 0. 1. 3. 0. 0. 2. 2. 2. 0. 3. 2.
 3. 2. 2. 0. 2. 0. 0. 2. 1. 1. 1. 3. 0. 2. 3. 1. 0. 1. 3. 1. 3. 0. 2. 0.
 1. 3. 3. 0. 1. 0. 0. 1. 2. 0. 2. 1. 1. 1. 2. 1. 3. 3. 3. 3. 2. 3. 1. 3.
 

In [27]:
# Building data pipeline and training CNN model¶
import shutil

In [28]:
Neutral_files = glob.glob("./EEG FOLDER/Neutral/*")
Sad_files = glob.glob("./EEG FOLDER/Sad/*")
Fear_files = glob.glob("./EEG FOLDER/Fear/*")
Happy_files = glob.glob("./EEG FOLDER/Happy/*")


In [29]:
from sklearn.model_selection import train_test_split


In [30]:
Neutral_train, Neutral_test = train_test_split(Neutral_files, test_size = 20, random_state = 5)
Sad_train, Sad_test = train_test_split(Sad_files, test_size = 20, random_state = 54)
Fear_train, Fear_test = train_test_split(Fear_files, test_size = 20, random_state = 543)
Happy_train, Happy_test = train_test_split(Happy_files, test_size = 20, random_state = 5432)


In [31]:
Neutral_train, Neutral_val = train_test_split(Neutral_train, test_size = 10, random_state = 1)
Sad_train, Sad_val = train_test_split(Sad_train, test_size = 10, random_state = 12)
Fear_train, Fear_val = train_test_split(Fear_train, test_size = 10, random_state = 123)
Happy_train, Happy_val = train_test_split(Happy_train, test_size = 10, random_state = 1234)


In [32]:
train_file_names = Neutral_train + Sad_train + Fear_train + Happy_train 
validation_file_names = Neutral_val + Sad_val + Fear_val + Happy_val
test_file_names = Neutral_test + Sad_test + Fear_test + Happy_test 

In [33]:
print("Number of train_files:" ,len(train_file_names))
print("Number of validation_files:" ,len(validation_file_names))
print("Number of test_files:" ,len(test_file_names))

Number of train_files: 240
Number of validation_files: 40
Number of test_files: 80


In [34]:
batch_size = 100
train_dataset = tf.data.Dataset.from_generator(tf_data_generator, args = [train_file_names, batch_size], 
                                              output_shapes = ((None,400,62,1),(None,)),
                                              output_types = (tf.float32, tf.float32))

validation_dataset = tf.data.Dataset.from_generator(tf_data_generator, args = [validation_file_names, batch_size],
                                                   output_shapes = ((None,400,62,1),(None,)),
                                                   output_types = (tf.float32, tf.float32))

test_dataset = tf.data.Dataset.from_generator(tf_data_generator, args = [test_file_names, batch_size],
                                             output_shapes = ((None,400,62,1),(None,)),
                                             output_types = (tf.float32, tf.float32))

In [35]:
# Now create the model.
from tensorflow.keras import layers

In [36]:
model = tf.keras.Sequential([
    layers.Conv2D(16, 3, activation = "relu", input_shape = (400,62,1)),
    layers.MaxPool2D(2),
    layers.Conv2D(62, 3, activation = "relu"),
    layers.MaxPool2D(2),
    layers.Flatten(),
    layers.Dense(16, activation = "relu"),
    layers.Dense(5, activation = "softmax")
])
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 398, 60, 16)       160       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 199, 30, 16)      0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 197, 28, 62)       8990      
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 98, 14, 62)       0         
 2D)                                                             
                                                                 
 flatten (Flatten)           (None, 85064)             0         
                                                                 
 dense (Dense)               (None, 16)                1

In [37]:
# Compile the model.
model.compile(loss = "sparse_categorical_crossentropy", optimizer = "adam", metrics = ["accuracy"])


In [38]:
steps_per_epoch = int(np.ceil(len(train_file_names)/batch_size))
validation_steps = int(np.ceil(len(validation_file_names)/batch_size))
steps = int(np.ceil(len(test_file_names)/batch_size))
print("steps_per_epoch = ", steps_per_epoch)
print("validation_steps = ", validation_steps)
print("steps = ", steps)

steps_per_epoch =  3
validation_steps =  1
steps =  1


In [None]:
model.fit(train_dataset, validation_data = validation_dataset, steps_per_epoch = steps_per_epoch,
         validation_steps = validation_steps, epochs = 100)

Epoch 1/100

In [69]:
test_loss, test_accuracy = model.evaluate(test_dataset, steps = 10)



In [70]:
print("Test loss: ", test_loss)
print("Test accuracy:", test_accuracy)

Test loss:  939.6615600585938
Test accuracy: 0.27000001072883606
