In [1]:
%%capture
import mne
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from sklearn.model_selection import train_test_split
import os
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from scipy.signal import welch


In [2]:
%%capture
#Convert to DataFrame
def convert_bdf_to_dataframe(bdf_filename):
    raw_data = mne.io.read_raw_bdf(bdf_filename, preload=True)
    eeg_data_raw = raw_data.get_data()
    channel_names = raw_data.ch_names
    time_index = raw_data.times
    eeg_data = pd.DataFrame(data=eeg_data_raw.T, columns=channel_names, index=time_index)
    col_names = ['O1', 'O2', 'F3', 'F4', 'C3' ,'C4' , 'Fp1', 'Fp2']
    eeg_data = eeg_data[col_names]
    segment_size = 1024
    num_segments = len(eeg_data) // segment_size
    reduced_df = pd.DataFrame(columns=eeg_data.columns)
    for i in range(num_segments):
        start_idx = i * segment_size
        end_idx = (i + 1) * segment_size
        segment_data = eeg_data.iloc[start_idx:end_idx]    
        mean_values = segment_data.mean()
        reduced_df = reduced_df.append(mean_values, ignore_index=True)

    return reduced_df

In [3]:
%%capture
# Define a function to extract labels from filenames
def extract_label_from_filename(filename):
    # Assuming filenames are in the format: subject_task.bdf
    task = filename.split('_')[1].split('.')[0]
    sub = filename.split('_')[0]
    return task, sub

In [4]:
%%capture

# Example code to create a spectrogram image from each EEG channel
def eeg_channel_to_spectrogram(channel_data, ch_idx):
    plt.specgram(channel_data, NFFT=256, Fs=1000, noverlap=128)
    plt.axis('off')  # Turn off axis
    plt.savefig(f'{ch_idx}_channel_spectrogram.png', bbox_inches='tight', pad_inches=0)
    plt.close()
    image = Image.open(f'{ch_idx}_channel_spectrogram.png')
    path = f'{ch_idx}_channel_spectrogram.png'
    return image, path

In [5]:
%%capture

from PIL import Image
data_dir = './Rishikest_MIT_Dataset'  # Replace with the path to your .bdf data directory
eeg_channel_images = []
subject_data = {}

In [8]:
%%capture
# Load and preprocess the data

output_dir_med = './output_images/Meditation'
output_dir_think = './output_images/Thinking'
os.makedirs(output_dir_med, exist_ok=True)
os.makedirs(output_dir_think, exist_ok=True)
i = 1

# Loop through all files in the directory
for filename in os.listdir(data_dir):
    if filename.endswith('.bdf'):
        # Convert the .bdf file to a DataFrame
        eeg_data = convert_bdf_to_dataframe(os.path.join(data_dir, filename))
        
        # Extract labels from filenames
        label, sub = extract_label_from_filename(filename)
        Dirs = ''
        subject = f'sub{i}'
        if 'med'in label:
#             Dirs = f'./output_images/{subject}_Med'
            Dirs = f'{output_dir_med}/{subject}'
            os.makedirs(Dirs, exist_ok=True)
        else:
#             Dirs = f'./output_images/{subject}_Think'
            Dirs = f'{output_dir_think}/{subject}'
            os.makedirs(Dirs, exist_ok=True)
        
        
        for channel_idx in range(eeg_data.shape[1]):
            channel_data = eeg_data.iloc[:,channel_idx]
            channel_image, path = eeg_channel_to_spectrogram(channel_data, eeg_data.columns[channel_idx])
            image_path = os.path.join(Dirs, path)
            print(image_path)
            channel_image.save(image_path)
#             eeg_channel_images.append(image_filename)
            if subject not in subject_data:
                subject_data[subject] = []
            subject_data[subject].append(channel_data)
        i+=1 

In [1]:
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split


In [2]:
data_directory = "./output_images/"
image_size = (256, 256)  # Adjust to your desired image size
batch_size = 16  # Adjust as needed
validation_split = 0.1  # 20% of the data for validation
test_split = 0.1  # 10% of the data for testing


In [9]:
subfolder_names = os.listdir(data_directory)

num_classes = 2  # Specify the number of classes (in this case, 2)

# Split data into train, validation, and test sets
subfolder_names = os.listdir(data_directory)
train_data, test_data = train_test_split(
    subfolder_names,
    test_size=test_split,
    random_state=42,
    stratify=None  # No need to stratify with only two classes
)

train_data, val_data = train_test_split(
    train_data,
    test_size=validation_split,
    random_state=42,
    stratify=None  # No need to stratify with only two classes
)

# Create data generators
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

train_generator = train_datagen.flow_from_directory(
    data_directory,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='binary',  # Use 'binary' for binary classification
    subset='training',
    shuffle=True  # Shuffle training data
)

validation_generator = train_datagen.flow_from_directory(
    data_directory,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='binary',  # Use 'binary' for binary classification
    subset='validation',
    shuffle=False  # Do not shuffle validation data
)

test_datagen = ImageDataGenerator(rescale=1./255)

test_generator = test_datagen.flow_from_directory(
    data_directory,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='binary',  # Use 'binary' for binary classification
    subset=None,  # Test set should not be a subset of train/val
    shuffle=False  # Do not shuffle test data
)

Found 64 images belonging to 8 classes.
Found 0 images belonging to 8 classes.
Found 64 images belonging to 8 classes.


In [10]:
NUM_CLASSES = 2

# Fixed for Cats & Dogs color images
CHANNELS = 3

IMAGE_RESIZE = 256
RESNET50_POOLING_AVERAGE = 'avg'
DENSE_LAYER_ACTIVATION = 'softmax'
OBJECTIVE_FUNCTION = 'categorical_crossentropy'

# Common accuracy metric for all outputs, but can use different metrics for different output
LOSS_METRICS = ['accuracy']

# EARLY_STOP_PATIENCE must be < NUM_EPOCHS
NUM_EPOCHS = 100
EARLY_STOP_PATIENCE = 10

STEPS_PER_EPOCH_TRAINING = 10
STEPS_PER_EPOCH_VALIDATION = 10

# These steps value should be proper FACTOR of no.-of-images in train & valid folders respectively
# NOTE that these BATCH* are for Keras ImageDataGenerator batching to fill epoch step input
BATCH_SIZE_TRAINING = 100
BATCH_SIZE_VALIDATION = 100

# Using 1 to easily manage mapping between test_generator & prediction for submission preparation
BATCH_SIZE_TESTING = 1

In [11]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

#resnet_weights_path =  r"D:\ML\ML-codes\resnet50_weights_.h5"
model = Sequential()

# 1st layer as the lumpsum weights from resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
# NOTE that this layer will be set below as NOT TRAINABLE, i.e., use it as is
model.add(ResNet50(include_top = False, pooling = RESNET50_POOLING_AVERAGE, weights = 'imagenet',input_shape=(256,256,3)))

# 2nd layer as Dense for 2-class classification, i.e., dog or cat using SoftMax activation
model.add(Dense(NUM_CLASSES, activation = DENSE_LAYER_ACTIVATION))

# Say not to train first layer (ResNet) model as it is already trained
model.layers[0].trainable = False

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [12]:
from tensorflow.keras import optimizers

sgd = optimizers.SGD(learning_rate = 0.01,  momentum = 0.9, nesterov = True)
model.compile(optimizer = sgd, loss = OBJECTIVE_FUNCTION, metrics = LOSS_METRICS)


from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.preprocessing.image import ImageDataGenerator

image_size = IMAGE_RESIZE

# data_generator = ImageDataGenerator(preprocessing_function=preprocess_input)

# train_generator = data_generator.flow_from_directory(
#         r"/workspace/train",
#         target_size=(image_size, image_size),
#         batch_size=BATCH_SIZE_TRAINING,
#         class_mode='categorical')

# validation_generator = data_generator.flow_from_directory(
#         r"/workspace/test",
#         target_size=(image_size, image_size),
#         batch_size=BATCH_SIZE_VALIDATION,
#         class_mode='categorical') 

In [13]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resnet50 (Functional)       (None, 2048)              23587712  
                                                                 
 dense (Dense)               (None, 2)                 4098      
                                                                 
Total params: 23,591,810
Trainable params: 4,098
Non-trainable params: 23,587,712
_________________________________________________________________


In [14]:
for i, layer in enumerate(model.layers):
    # Check if the layer is trainable
    if layer.trainable:
        print(f"Layer {i} ({layer.name}) is trainable")
    else:
        print(f"Layer {i} ({layer.name}) is not trainable")

Layer 0 (resnet50) is not trainable
Layer 1 (dense) is trainable


In [15]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

cb_early_stopper = EarlyStopping(monitor = 'val_loss', patience = EARLY_STOP_PATIENCE)
cb_checkpointer = ModelCheckpoint(filepath = r'/workspace/resnet50.hdf5', monitor = 'val_loss', save_best_only = True, mode = 'auto')


fit_history = model.fit_generator(
        train_generator,
        steps_per_epoch=STEPS_PER_EPOCH_TRAINING,
        epochs = NUM_EPOCHS,
        validation_data=validation_generator,
        validation_steps=STEPS_PER_EPOCH_VALIDATION,
        callbacks=[cb_checkpointer, cb_early_stopper]
)

  fit_history = model.fit_generator(


Epoch 1/100


ValueError: in user code:

    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\engine\training.py", line 1160, in train_function  *
        return step_function(self, iterator)
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\engine\training.py", line 1146, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\engine\training.py", line 1135, in run_step  **
        outputs = model.train_step(data)
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\engine\training.py", line 994, in train_step
        loss = self.compute_loss(x, y, y_pred, sample_weight)
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\engine\training.py", line 1052, in compute_loss
        return self.compiled_loss(
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\engine\compile_utils.py", line 265, in __call__
        loss_value = loss_obj(y_t, y_p, sample_weight=sw)
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\losses.py", line 152, in __call__
        losses = call_fn(y_true, y_pred)
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\losses.py", line 272, in call  **
        return ag_fn(y_true, y_pred, **self._fn_kwargs)
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\losses.py", line 1990, in categorical_crossentropy
        return backend.categorical_crossentropy(
    File "C:\Users\LENOVOi7\anaconda3\lib\site-packages\keras\backend.py", line 5529, in categorical_crossentropy
        target.shape.assert_is_compatible_with(output.shape)

    ValueError: Shapes (None, 1) and (None, 2) are incompatible


In [None]:
print(fit_history.history.keys())

plt.figure(1, figsize = (15,8)) 

plt.subplot(221)  
plt.plot(fit_history.history['accuracy'])  
plt.plot(fit_history.history['val_accuracy'])  
plt.title('model accuracy')  
plt.ylabel('accuracy')  
plt.xlabel('epoch')  
plt.legend(['train', 'valid']) 

plt.figure(1, figsize = (15,8)) 
plt.subplot(222)  
plt.plot(fit_history.history['loss'])  
plt.plot(fit_history.history['val_loss'])  
plt.title('model loss')  
plt.ylabel('loss')  
plt.xlabel('epoch')  
plt.legend(['train', 'valid']) 

plt.show()

In [None]:
test_generator = data_generator.flow_from_directory(
    directory = r"/workspace/test",
    target_size = (image_size, image_size),
    batch_size = BATCH_SIZE_TESTING,
    class_mode = None,
    shuffle = False,
    seed = 123
)

test_generator.reset()

pred = model.predict_generator(test_generator, steps = len(test_generator), verbose = 1)

predicted_class_indices = np.argmax(pred, axis = 1)


In [None]:
true_classes = test_generator.classes
class_indices = train_generator.class_indices
class_indices = dict((v,k) for k,v in class_indices.items())

res_preds = model.predict(test_generator)
res_pred_classes = np.argmax(res_preds, axis=1)

In [None]:
from sklearn.metrics import accuracy_score

res_acc = accuracy_score(true_classes, res_pred_classes)
print("ResNet50 Model Accuracy with Fine-Tuning: {:.2f}%".format(res_acc * 100))