In [2]:
import random
import time
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import scipy as sk
import pickle
import mne
import tqdm
from tqdm import trange

import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

from torch import nn

In [2]:
def extract_data(sub_range,verbose=False,feature_func=None,**kwargs):
    first=sub_range[0]
    last=sub_range[-1]

    for sub in trange(first,last+1): 
        #read data
        if sub <=9:
            path=f'/home/pigmaster96/openneuroNMAdata/ds005540-download/derivatives/sub-0{sub}/ses-vid/eeg/sub-0{sub}_ses-vid_task-emotion_reorder.npy'
        else:
            path=f'/home/pigmaster96/openneuroNMAdata/ds005540-download/derivatives/sub-{sub}/ses-vid/eeg/sub-{sub}_ses-vid_task-emotion_reorder.npy'
        datatemp=np.load(path,allow_pickle=True)
        datatemp=np.permute_dims(datatemp,[1,0,2])

        #if we want to convert features
        if not feature_func==None:
            datatemp=feature_func(datatemp,info=info)

        #cat data along new dimension
        datatemp=torch.tensor(datatemp).unsqueeze(0)
        if sub==first:
            data=datatemp
        else:
            data=torch.cat([data,datatemp],axis=0)

        if verbose:
            print(f'subject {sub} data extracted, vector size: {data.shape}')
    return data

def shuffle_and_split_data(X,y):
    """
    Helper function to shuffle and split data

    Args:
        X: torch.tensor
        Input data
        y: torch.tensor
        Corresponding target variables
        seed: int
        Set seed for reproducibility

    Returns:
        X_test: torch.tensor
        Test data [20% of X]
        y_test: torch.tensor
        Labels corresponding to above mentioned test data
        X_train: torch.tensor
        Train data [80% of X]
        y_train: torch.tensor
        Labels corresponding to above mentioned train data
    """
    
    N=X.size(0)
    shuffled_indices=torch.randperm(N) #get shuffled indices
    X=X[shuffled_indices]
    y=y[shuffled_indices]

    # split by 20% into train-test set
    test_size=int(0.2*N)
    X_train=X[test_size:]
    y_train=y[test_size:]
    X_test=X[:test_size]
    y_test=y[:test_size]

    return X_test,y_test,X_train,y_train

#create labels
#sad-dis-fear-neu-joy-ten-ins correspond to 1-7 respectively, each sample has 21 trials, for (7 emotions x 3 trials)
labels=np.array([])
for i in range(1,8):
    for n in range(0,3):
        labels=np.concatenate([labels,np.array([i])],axis=0)
labels=torch.tensor(labels)

In [3]:
data=extract_data([1,5])

  0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 5/5 [00:00<00:00,  8.32it/s]


In [4]:
def process_data(data):
    '''further process into trials (1 second trials, 200 points)'''
    for trial in tqdm.tqdm(range(int(data.size(3)/200))):
        datatrial=data[:,:,:,trial*200:(trial+1)*200]
        datatrial=torch.unsqueeze(datatrial,2)
        if trial==0:
            newdata=datatrial
        else:
            newdata=torch.concatenate((newdata,datatrial),2)
    return newdata

In [5]:
data=process_data(data)
data.shape #subject,label,trials,electrodes,timepoints

100%|██████████| 30/30 [00:01<00:00, 23.35it/s]


torch.Size([5, 21, 30, 64, 200])

In [6]:
for label in tqdm.tqdm(range(21)): #combine labels and trials
    temp=data[:,label,:,:,:]
    if label==0:
        X=temp
        y=labels[label]*torch.ones(30,1)
    else:
        X=torch.concatenate((X,temp),dim=1)
        y=torch.concatenate((y,labels[label]*torch.ones(30,1)),dim=0)

#combine subjects
for subject in tqdm.tqdm(range(X.size(0))):
    temp=X[subject,:,:,:]
    if subject==0:
        X_full=temp
        y_full=y
    else:
        X_full=torch.concatenate((X_full,temp),dim=0)
        y_full=torch.concatenate((y_full,y),dim=0)

print(X_full.shape)
print(y_full.shape)

#split data
X_test,y_test,X_train,y_train=shuffle_and_split_data(X_full.unsqueeze(1),y_full.squeeze())

print(X_test.shape,y_test.shape,X_train.shape,y_train.shape)

100%|██████████| 21/21 [00:00<00:00, 23.65it/s]
100%|██████████| 5/5 [00:00<00:00, 21.14it/s]

torch.Size([3150, 64, 200])
torch.Size([3150, 1])
torch.Size([630, 1, 64, 200]) torch.Size([630]) torch.Size([2520, 1, 64, 200]) torch.Size([2520])





In [7]:
#now the model
class Net(nn.Module):
    def __init__(self):
        super(Net,self).__init__()
        self.conv1=nn.Conv2d(1,32,3,stride=1,padding=1)
        self.conv2=nn.Conv2d(32,16,3,stride=1,padding=1)
        self.fc1=nn.Linear(16*12800,128)
        self.fc2=nn.Linear(128,7)

    def forward(self,x):
        x=self.conv1(x)
        x=nn.functional.relu(x)
        x=self.conv2(x)
        x=nn.functional.relu(x)
        x=torch.flatten(x,1)
        x=self.fc1(x)
        x=nn.functional.relu(x)
        x=self.fc2(x)
        return x
    


def train_test(net,epochs,train_loader,test_loader,device):
    criterion=nn.CrossEntropyLoss()
    optimizer=optim.Adam(net.parameters(),lr=3e-4)
    train_acc=[]
    train_loss=[]
    test_acc=[]
    test_loss=[]
    net.to(device)
    for epoch in tqdm.tqdm(range(epochs)):
        net.train()
        running_loss=0.0
        correct,total=0,0
        for i,data in enumerate(train_loader,start=0):
            inputs,labels=data
            inputs=inputs.to(device).float()
            labels=labels.to(device)
            

            #train
            optimizer.zero_grad()
            outputs=net.forward(inputs)
            loss=criterion(outputs,labels)
            loss.backward()
            optimizer.step()

            running_loss+=loss.item()
            #training accuracy
            _,predicted=torch.max(outputs,1)
            total+=labels.size(0)
            correct+=(predicted==labels).sum()
        train_loss.append(running_loss/len(train_loader))
        train_acc.append(correct/total)
        print(f"epoch {epoch} --> TRAIN loss: {running_loss/len(train_loader)}, TRAIN accuracy: {correct/total}")

        #eval on test
        net.eval()
        running_loss=0.0
        correct,total=0,0
        for inputs,labels in test_loader:
            inputs,labels=inputs.to(device),labels.to(device)
            outputs=net.forward(inputs)
            loss=criterion(outputs,labels)
            running_loss+=loss.item()

            #test acc
            _,predicted=torch.max(outputs,1)
            total+=labels.size(0)
            correct+=(predicted==labels).sum()
        test_loss.append(running_loss/len(test_loader))
        test_acc.append(correct/total)
        print(f"epoch {epoch} --> TEST loss: {running_loss/len(train_loader)}, TEST accuracy: {correct/total}")

    return train_loss,train_acc,test_loss,test_acc


batch_size=150
test_data = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_data, batch_size=batch_size,
                         shuffle=False
                         )

train_data = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_data,
                          batch_size=batch_size,
                          drop_last=False,
                          shuffle=True
                          )

In [8]:
net=Net()
train_loss,train_acc,test_loss,test_acc=train_test(net,10,train_loader=train_loader,test_loader=test_loader,device=DEVICE)

  0%|          | 0/10 [00:01<?, ?it/s]


RuntimeError: m_device->CreateOperator(&opDesc, IID_PPV_ARGS(&op))