In [1]:
import sys
sys.path.append("./MNE-Python tutorial")

In [2]:
from bnci import *
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import numpy as np

from mne.filter import filter_data

In [3]:
SFREQ = 250 #250Hz
LOWCUT = 4
HIGHCUT = 40

LEARNING_RATE = 0.001
BATCH_SIZE = 16
EPOCHS = 10

데이터 준비

In [12]:
#Training Data
t_datas= []
t_labels = []
for subject in range(1,10):
    data, label = get_data_2a(subject, True)
    # bandpass filter
    data = np.array([filter_data(data=d, sfreq=SFREQ, l_freq=LOWCUT, h_freq=HIGHCUT,
                                        verbose=False, fir_design='firwin') for d in data])
    # crop the data (imagery part -- 2s to 6s)
    data = data[:, :, int(SFREQ * 2):SFREQ * 6]

    t_datas.append(data)
    t_labels.append(label)

#Evaluation Data
e_datas=[]
e_labels=[]
for subject in range(1,10):
    data, label = get_data_2a(subject, False)
    # bandpass filter
    data = np.array([filter_data(data=d, sfreq=SFREQ, l_freq=LOWCUT, h_freq=HIGHCUT,
                                        verbose=False, fir_design='firwin') for d in data])
    # crop the data (imagery part -- 2s to 6s)
    data = data[:, :, int(SFREQ * 2):SFREQ * 6]

    e_datas.append(data)
    e_labels.append(label)    

https://tutorials.pytorch.kr/beginner/basics/buildmodel_tutorial.html  
https://tutorials.pytorch.kr/beginner/blitz/cifar10_tutorial.html  
참조

In [5]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")


Using cuda device


Data normalization 함수

In [14]:
def _normalize_data(data):
        '''

        :param data: 3-D EEG data in ndarray (shape: (n_samples, n_channels, n_times))
        :return: normalized data in 3-D
        '''
        data_mean = np.mean(data, axis=(1, 2), keepdims=True)
        data_std = np.std(data, axis=(1, 2), keepdims=True)
        # divide with data_std with rows that have non-zero std (avoid dividing by zero), std=0 comes from channel_padding
        data_std[data_std == 0] = 1
        data = (data - data_mean) / data_std
        return data

Shallow ConvNet 정의

In [29]:
class ShallowConvNet(nn.Module):
    def __init__(self, ):
        super().__init__()
        self.tempConv = nn.Conv2d(1, 40, (1,25))
        self.spaConv = nn.Conv2d(40, 40, (22,1)) 
        # Batch Normalization",
        #self.batch_norm = nn.BatchNorm2d(40, momentum=0.1, affine=True, eps=1e-5)

        self.pool = nn.AvgPool2d((1,75), (1,15))
        self.dropout1 = nn.Dropout(p=0.3)
        self.flatten = nn.Flatten()

        # Linear Layer for Classification
        self.fc = nn.Linear(2440, 4)
    
    def forward(self, x):
        if len(x.shape)==3:
            x = x.unsqueeze(1)
        # Temporal Convolution
        x = self.tempConv(x)
        x = self.spaConv(x)
        #x = self.batch_norm(x)
        x = torch.square(x)
        # Mean Pooling
        x = self.pool(x)
        x = torch.log(torch.clamp(x,min=1e-6))
        x = self.dropout1(x)
        x = self.flatten(x)
        
        # Linear Classification
        x = self.fc(x)
        
        return x


In [30]:


for subject in range(9):
    print(f"for {subject + 1}th subject")
    #Network
    net = ShallowConvNet()
    net.to(device) #using gpu

    #prepairing datasets
    x_train = _normalize_data(t_datas[subject])
    y_train = t_labels[subject]
    x_eval = _normalize_data(e_datas[subject])
    y_eval = e_labels[subject]

    x_eval, y_eval = torch.from_numpy(x_eval).type(torch.FloatTensor), torch.from_numpy(y_eval).type(torch.LongTensor)

    test_dataset = torch.utils.data.TensorDataset(x_eval, y_eval)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
    del test_dataset

    #k-fold cross validation은 생략
    x_train, y_train = torch.from_numpy(x_train).type(torch.FloatTensor), torch.from_numpy(y_train).type(torch.LongTensor)

    train_dataset = torch.utils.data.TensorDataset(x_train, y_train)
    train_loader = torch.utils.data.DataLoader(train_dataset)
    del train_dataset

    #Define a loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

    criterion.to(device)

    #####Training#####
    for epoch in range(EPOCHS):
        net.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for _, (data,labels) in enumerate(train_loader):
            ##need to implement train_loader
            data, labels = data.to(device), labels.to(device)
            
            optimizer.zero_grad()  # Zero the parameter gradients
            outputs = net(data)  # Forward pass
            loss = criterion(outputs, labels)  # Compute loss
            
            loss.backward()  # Backward pass (compute gradients)
            optimizer.step()  # Update weights
            
            train_loss += loss.item()*labels.size(0)
            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += float((predicted == labels).cpu().numpy().astype(int).sum())
        train_acc = train_correct / train_total
        train_loss = train_loss / train_total
        
        ###Test(Eval)###
        test_correct = 0
        test_loss=0.0
        test_total = 0

        for _, (data,labels) in enumerate(test_loader):
            ##need to implement train_loader
            data, labels = data.to(device), labels.to(device)
            outputs = net(data)
            loss = criterion(outputs, labels)
            test_loss += loss.item()*labels.size(0)

            #가장 높은 energy 가지는 class를 정답으로 선택
            _, predicted = torch.max(outputs.data,1)
            test_total += labels.size(0)
            test_correct += float((predicted == labels).cpu().numpy().astype(int).sum())

        test_acc = test_correct / test_total
        test_loss = test_loss / test_total

        print('Epoch:', epoch,
                          '  Train accuracy %.6f' % train_acc,
                          '  Train loss: %.6f' % train_loss,
                          '  Test accuracy is %.6f' % test_acc,
                          '  Test loss: %.6f' % test_loss,)

for 1th subject
Epoch: 0   Train accuracy 0.368056   Train loss: 1.576887   Test accuracy is 0.454861   Test loss: 1.159610
Epoch: 1   Train accuracy 0.565972   Train loss: 1.088603   Test accuracy is 0.510417   Test loss: 1.073150
Epoch: 2   Train accuracy 0.673611   Train loss: 0.821886   Test accuracy is 0.503472   Test loss: 1.145994
Epoch: 3   Train accuracy 0.708333   Train loss: 0.666591   Test accuracy is 0.461806   Test loss: 1.629284
Epoch: 4   Train accuracy 0.777778   Train loss: 0.545241   Test accuracy is 0.541667   Test loss: 1.278384
Epoch: 5   Train accuracy 0.826389   Train loss: 0.521090   Test accuracy is 0.503472   Test loss: 1.867642
Epoch: 6   Train accuracy 0.857639   Train loss: 0.378194   Test accuracy is 0.482639   Test loss: 1.816792
Epoch: 7   Train accuracy 0.885417   Train loss: 0.313712   Test accuracy is 0.493056   Test loss: 1.683508
Epoch: 8   Train accuracy 0.913194   Train loss: 0.221519   Test accuracy is 0.493056   Test loss: 1.607452
Epoch: 9   T