In [1]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import torch 
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from braindecode.models import Deep4Net,EEGITNet
from braindecode.preprocessing import preprocess,Preprocessor
from sklearn.preprocessing import LabelEncoder,MinMaxScaler,StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score,f1_score
from torchvision import models
import librosa
import torchvision
from scipy.signal import butter, filtfilt, iirnotch
import warnings
warnings.filterwarnings('ignore')


In [2]:
torch.cuda.is_available()

True

In [3]:
import random

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [4]:
columns = [ 'FZ', 'C3', 'CZ', 'C4', 'PZ', 'PO7', 'OZ', 'PO8', 'AccX',
       'AccY', 'AccZ', 'Gyro1', 'Gyro2', 'Gyro3']
le = LabelEncoder()
scaler = StandardScaler()
TASK = "MI"
TARGETS = 2
if TASK == "SSVEP":
    TARGETS = 4
fs = 250

In [5]:
class EEGData(Dataset):
    
    def __init__(self,path:str,task:str,datatype:str):
        """
            path: string
            type: Literal["train","test"]
        """
        self.dataframe = pd.read_csv(path)
        self.dataframe = self.dataframe[self.dataframe['task'] == task]
        self.datatype = datatype

        if self.datatype != 'test':
          self.dataframe['label'] = le.fit_transform(self.dataframe['label'])


    def remove_base_line(self,signal):
        ecfft = np.fft.fft(signal)
        frequencies = np.fft.fftfreq(len(ecfft), d=1/250)
        ecfft[np.abs(frequencies) < 0.5] = 0
        baseline_removed = np.fft.ifft(ecfft).real
        return baseline_removed
    
    def power_line_interface_removal(self,signal):
        notch_freq = 50  # or 60 depending on your country
        quality_factor = 30
        b_notch, a_notch = iirnotch(notch_freq, quality_factor, fs)
        powerline_removed = filtfilt(b_notch, a_notch, signal)
        return powerline_removed
    
    def remove_high_freq(self,signal):
        cutoff = 40  # Hz
        b_lp, a_lp = butter(4, cutoff / (0.5 * fs), btype='low')
        smoothed_signal = filtfilt(b_lp, a_lp, signal)
        return smoothed_signal

    def __getitem__(self,index):
        path = self.datatype
        datapoint = self.dataframe.iloc[index]

        eeg = pd.read_csv(f'../data/{datapoint['task']}/{path}/{datapoint['subject_id']}/{datapoint['trial_session']}/EEGdata.csv')

        eeg.sort_values(by='Time',inplace=True)

        mag = len(eeg)
        trial  = int(datapoint['trial'])
        lower = mag*(trial-1)//10
        upper = mag*trial//10
        
        eeg = eeg[lower:upper]
        eeg = eeg[columns]

        for c in columns:
            eeg[c] = self.remove_high_freq(self.power_line_interface_removal(eeg[c]))

        eeg = np.array(eeg,dtype='float32')
        eeg = torch.from_numpy(eeg).permute(1,0)
        S = librosa.feature.melspectrogram(y=np.array(eeg), sr=250, n_fft=250, hop_length=50, n_mels=128)
        S_db = librosa.power_to_db(S, ref=np.max)
        S_db = (S_db - S_db.min()) / (S_db.max() - S_db.min())

        S_db = np.concatenate(S_db,axis=0)

        S_db = np.array([S_db,S_db,S_db])



        eeg = torch.from_numpy(S_db)
        if self.datatype != 'test':
            return eeg,int(datapoint['label'])
        else: 
            return eeg


    def __len__(self):
        return len(self.dataframe)

In [6]:
train_mi = EEGData('../data/train.csv',TASK,'train') 
train_mi_loader = DataLoader(train_mi,batch_size=128,shuffle=False) 

val_mi = EEGData('../data/validation.csv',TASK,'validation')
val_mi_loader = DataLoader(val_mi,batch_size=128,shuffle=False)

test_mi = EEGData('../data/test.csv',TASK,datatype='test')

In [7]:
model = models.resnet18(pretrained=True)


In [8]:
for param in model.parameters():
    param.requires_grad = False

In [9]:
model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features,out_features=256),
    nn.ReLU(),
    nn.Linear(in_features=256,out_features=64),
    nn.ReLU(),
    nn.Linear(in_features=64,out_features=TARGETS)
    )

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [10]:
import torch.nn as nn
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [None]:
n_epochs = 20
losses = []
accurecies = [] 
val_accurecies = []
for epoch in range(n_epochs):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for X_batch, y_batch in train_mi_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == y_batch).sum().item()
        total += y_batch.size(0)

    acc = correct / total
    losses.append(total_loss)
    print(f"Epoch {epoch+1}: Train Loss={total_loss:.4f}, Accuracy={acc:.4f}")
    accurecies.append(acc)
    # Validation
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_mi_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            X_batch = (X_batch - X_batch.mean(dim=2, keepdim=True)) / (X_batch.std(dim=2, keepdim=True) + 1e-6)

            outputs = model(X_batch)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)
    val_acc = correct / total
    val_accurecies.append(val_acc)
    print(f"          Val Accuracy={val_acc:.4f}")


Epoch 1: Train Loss=19.3741, Accuracy=0.5138
          Val Accuracy=0.4400
Epoch 2: Train Loss=13.4069, Accuracy=0.4933
          Val Accuracy=0.4400
Epoch 3: Train Loss=13.2582, Accuracy=0.4929
          Val Accuracy=0.4400
Epoch 4: Train Loss=13.1722, Accuracy=0.5054
          Val Accuracy=0.4400
Epoch 5: Train Loss=13.1733, Accuracy=0.5054
          Val Accuracy=0.4400


In [None]:
sns.lineplot(x=range(len(accurecies)),y=accurecies)

In [None]:
sns.lineplot(x=range(len(val_accurecies)),y=val_accurecies)


In [None]:
sns.lineplot(x=range(len(losses)),y=losses)


In [None]:
test = torch.from_numpy(np.array([point for point in test_mi]))

In [None]:
test = test.to(device)

In [None]:
pred = model(test)

In [None]:
_, predicted = torch.max(pred, 1)

In [None]:
test_frame = pd.read_csv('../data/test.csv')

In [None]:
test_frame = test_frame[test_frame['task']==TASK]

In [None]:
test_frame 

In [None]:
sub = test_frame[['id']]
sub['label'] = le.inverse_transform(predicted.to('cpu'))

In [None]:
predicted

In [None]:
sub.to_csv('submission_'+TASK+'.csv')