In [270]:
from glob import glob
import os
import mne
import numpy as np
import matplotlib.pyplot as plt
import random

import torch
import torch.nn as nn
import torchvision
from torch.utils.data import Dataset, DataLoader
import math

In [64]:
all_eeg_path = glob('Music Thinking/sub-*/ses-*/eeg/*.set')
len(all_eeg_path)

240

In [65]:
behaviour_data = np.genfromtxt('Music Thinking/stimuli/Behavioural_data.txt')
behaviour_data = behaviour_data[1:]

labels = behaviour_data[:,2]
for i in range(labels.shape[0]):
    # Enjoyment level 1 or 2 -> enjoy the most
    if labels[i] <= 2:
        labels[i] = 1
        
    # Neutral or does not enjoy (enjoyment levels 3 to 5)
    else:
        labels[i] = 0
        
# The proportion of people liking/disliking the music is relatively the same.
print(np.count_nonzero(labels == 1)) # LIKE
print(np.count_nonzero(labels == 0)) # DISLIKE

112
128


In [66]:
# Associate each path to the corresponding class (binary classification)
like_path = []
dislike_path = []

for i in range(labels.shape[0]):
    if labels[i] == 1:
        like_path = np.append(like_path, all_eeg_path[i])
    else:
        dislike_path = np.append(dislike_path, all_eeg_path[i])

## TEST
# 20 INDIVIDUALS FOR TESTING

like_path = like_path[0:10]
dislike_path = dislike_path[0:10]
        
print(like_path.shape[0], dislike_path.shape[0])

10 10


In [164]:
# Function that reads EEG Data (set extension file)
def read_set_data(path):
    music_data = mne.io.read_raw_eeglab(path, preload = True)
    #music_data.set_eeg_reference()
    
    epochs = mne.make_fixed_length_epochs(music_data, duration = 3, overlap = 2, preload = True)
    music_array = epochs.get_data()
    music_array = music_array[:,:,:750]
    
    number_epochs = 4
    array_epochs = np.empty(number_epochs, dtype = object)

    for i in range(number_epochs):
        chosen_number = random.randint(0, music_array.shape[0]-1)
        print(chosen_number)
        array_epochs[i] = music_array[chosen_number]
    
    return array_epochs

In [165]:
%%capture
random.seed(0)

like_epoch_array = np.empty((len(like_path)), dtype = object)
dislike_epoch_array = np.empty((len(dislike_path)), dtype = object)

for i in range(len(like_path)):
    like_epoch_array[i] = read_set_data(like_path[i])
    
for i in range(len(dislike_path)):
    dislike_epoch_array[i] = read_set_data(dislike_path[i])

In [166]:
# Assign the labels for each epoch
like_epoch_labels = np.empty((len(like_path)), dtype = object)
dislike_epoch_labels = np.empty((len(dislike_path)), dtype = object)

for i in range(len(like_path)):
    like_epoch_labels[i] = len(like_epoch_array[i]) * [1]
    
for i in range(len(dislike_path)):
    dislike_epoch_labels[i] = len(dislike_epoch_array[i]) * [0]
    
print(len(like_epoch_labels), len(dislike_epoch_labels))

10 10


In [167]:
dislike_epoch_array[6][0][0].shape

(750,)

In [219]:
X_train = np.hstack(np.append(like_epoch_array[0:6], dislike_epoch_array[0:6]))
X_val = np.hstack(np.append(like_epoch_array[6:], dislike_epoch_array[6:]))

y_train = np.hstack(np.append(like_epoch_labels[0:6], dislike_epoch_labels[0:6]))
y_val = np.hstack(np.append(like_epoch_labels[6:], dislike_epoch_labels[6:]))

X_train[:].shape

(48,)

In [216]:
X_train_tensor = np.zeros((X_train.shape[0], 1, 129, 750))
X_val_tensor = np.zeros((X_val.shape[0], 1, 129, 750))

for i in range(X_train_tensor.shape[0]):
    X_train_tensor[i,0,:,:] = X_train[i]
    
for i in range(X_val_tensor.shape[0]):
    X_val_tensor[i,0,:,:] = X_val[i]

print(X_train_tensor.shape, X_val_tensor.shape, y_train.shape, y_val.shape)

(48, 1, 129, 750) (32, 1, 129, 750) (48,) (32,)
<class 'numpy.ndarray'>


In [304]:
class EEGTrain(Dataset):
    
    def __init__(self):
        #data loading
        self.x = torch.from_numpy(X_train_tensor).float()
        self.y = torch.from_numpy(y_train).long()
        self.n_samples = len(y_train)
        
    def __getitem__(self, index):
        return self.x[index], self.y[index]
    
    def __len__(self):
        # len(dataset)
        return self.n_samples

In [305]:
class EEGVal(Dataset):
    
    def __init__(self):
        #data loading
        self.x = torch.from_numpy(X_val_tensor).float()
        self.y = torch.from_numpy(y_val).long()
        self.n_samples = len(y_val)
        
    def __getitem__(self, index):
        return self.x[index], self.y[index]
    
    def __len__(self):
        # len(dataset)
        return self.n_samples

In [264]:
'''
TESTING
# Batch-size: let's test with 4 for now
train_data = EEGTrain()
train_dl = DataLoader(dataset = train_data, batch_size = 4, shuffle = True)

train_features, train_labels = next(iter(train_dl))
print(train_features.size(), train_labels.size())

train_labels

num_epochs = 2
total_samples = len(train_data)
n_iterations = math.ceil(total_samples/4)
print(n_iterations)

for epoch in range(num_epochs):
    for i, (inputs, labels) in enumerate(train_dl):
        #forward and backward pass, update our weights
        print(f'epoch {epoch + 1}/{num_epochs}, step {i + 1}/{n_iterations}, inputs {inputs.shape}')
        print(labels)
'''


12


## ATTEMPT FOR RNN IMPLEMENTATION HAHA... :(

In [319]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# HYPERPARAMETERS
hidden_size = 128
num_classes = 2
num_epochs = 1
batch_size = 5
learning_rate = 0.001

input_size = 129
sequence_length = 750
num_layers = 2

# DATASETS
train_data = EEGTrain()
train_dl = DataLoader(dataset = train_data, batch_size = batch_size, shuffle = True)

val_data = EEGVal()
val_dl = DataLoader(dataset = val_data, batch_size = batch_size, shuffle = True)

In [320]:
# NETWORK
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first = True)
        # x -> (batch_size, sequence_length, input_size)
        
        # or:
        #self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        #self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        out, _ = self.rnn(x, h0)
        # out -> (batch_size, sequence_length, hidden_size)
        # out -> (N, 129, 128) ->> NOTE: CHANGE SEQUENCE_LENGTH AS 750 LATER (TRANPOSE THE TENSOR)
        out = out[:, -1, :]
        # out -> (N, 128)
        out = self.fc(out)
        return out

model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)

In [321]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  

In [322]:
# Train the model
n_total_steps = len(train_dl)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_dl):  
        # origin shape: [N, 1, 28, 28]
        # resized: [N, 28, 28]
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        print (f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{n_total_steps}], Loss: {loss.item():.4f}')


Epoch [1/1], Step [1/10], Loss: 0.6650
Epoch [1/1], Step [2/10], Loss: 0.5961
Epoch [1/1], Step [3/10], Loss: 0.6840
Epoch [1/1], Step [4/10], Loss: 0.6978
Epoch [1/1], Step [5/10], Loss: 1.0363
Epoch [1/1], Step [6/10], Loss: 0.8185
Epoch [1/1], Step [7/10], Loss: 0.6746
Epoch [1/1], Step [8/10], Loss: 0.9143
Epoch [1/1], Step [9/10], Loss: 0.7299
Epoch [1/1], Step [10/10], Loss: 0.6694


In [323]:
# Test the model
# In test phase, we don't need to compute gradients (for memory efficiency)
with torch.no_grad():
    n_correct = 0
    n_samples = 0
    for images, labels in val_dl:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        # max returns (value ,index)
        
        _, predicted = torch.max(outputs.data, 1)
        
        print(predicted, labels)
        n_samples += labels.size(0)
        n_correct += (predicted == labels).sum().item()

    acc = 100.0 * n_correct / n_samples
    print(f'Accuracy of the network: {acc} %')

tensor([0, 0, 0, 0, 0]) tensor([0, 1, 0, 0, 1])
tensor([0, 0, 0, 0, 0]) tensor([1, 1, 1, 0, 1])
tensor([0, 0, 0, 0, 0]) tensor([0, 1, 0, 1, 0])
tensor([0, 0, 0, 0, 0]) tensor([1, 0, 1, 1, 1])
tensor([0, 0, 0, 0, 0]) tensor([0, 0, 0, 1, 0])
tensor([0, 0, 0, 0, 0]) tensor([1, 0, 1, 0, 0])
tensor([0, 0]) tensor([1, 0])
Accuracy of the network: 50.0 %


In [None]:
'''
DOCUMENTATION:

Accomplished tasks:
-> Set datasets and dataloaders for Pytorch Neural networks' inputs
-> Testing our very first RNN model ever with a very small sample_size (20 individuals to be exact)

Upcoming steps...
-> Transpose the matrix 129-750 (129 electrodes x 750 time points).
-> Experiment and add new layers in RNN (add dropout rate, etc.). The model implemented was only one hidden layer.
-> Experiment with a larger sample size in the cluster.
'''