In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from torch import Tensor
import torch.nn.functional as F
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.tensorboard import SummaryWriter

![image.png](attachment:image.png)

In [None]:
writer = SummaryWriter()

In [None]:
train_set = pd.read_csv("data/final_format/train_set.csv",header=None).to_numpy()
train_label = pd.read_csv("data/final_format/train_label.csv",header=None).to_numpy()
test_set = pd.read_csv("data/final_format/test_set.csv",header=None).to_numpy()
test_label = pd.read_csv("data/final_format/test_label.csv",header=None).to_numpy()

In [None]:
print(train_set.shape, train_label.shape, test_set.shape, test_label.shape)

In [None]:
#delet first row data
train_set = train_set[1:]
train_label = train_label[1:]
test_set = test_set[1:]
test_label = test_label[1:]
print(train_set.shape, train_label.shape, test_set.shape, test_label.shape)

In [None]:
train_set = train_set.reshape((-1,1,64,64))
test_set = test_set.reshape((-1,1,64,64))
print(train_set.shape, train_label.shape, test_set.shape, test_label.shape)

In [None]:
train_label = train_label.reshape(-1)
test_label = test_label.reshape(-1)

print(train_set.shape, train_label.shape, test_set.shape, test_label.shape)

In [None]:
# Hyper parameters
num_epochs = 3000
num_classes = 4
batch_size = 64
learning_rate = 1e-3

In [None]:
train_set_tensor = Tensor(train_set) 
train_label_tensor = Tensor(train_label).type(torch.LongTensor)

train_dataset = TensorDataset(train_set_tensor,train_label_tensor) 
train_loader = DataLoader(train_dataset, batch_size=batch_size) 

test_set_tensor = Tensor(test_set) 
test_label_tensor = Tensor(test_label).type(torch.LongTensor)

test_dataset = TensorDataset(test_set_tensor,test_label_tensor) 
test_loader = DataLoader(test_dataset, batch_size=batch_size) 

In [None]:
# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

In [None]:
class EEGNet(nn.Module):
    def __init__(self, classes_num):
        super(EEGNet, self).__init__()
        self.drop_out = 0.25

        self.block_1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,  # input shape (1, 64, 64)
                out_channels=8,  # num_filters
                kernel_size=(1, 5),  # filter size
                padding = 'same',
                bias=False
            ),  # output shape (8, 64, 64)
            nn.BatchNorm2d(8)  # output shape (8, 64, 64)
        )

        # block 2 and 3 are implementations of Depthwise Convolution and Separable Convolution
        self.block_2 = nn.Sequential(
            nn.Conv2d(
                in_channels=8,  # input shape (8, 64, 64)
                out_channels=16,  # num_filters
                kernel_size=(64, 1),  # filter size
                groups=8,
                bias=False
            ),  # output shape (16, 1, 64)
            nn.BatchNorm2d(16),  # output shape (16, 1, 64)
            nn.ELU(),
            nn.AvgPool2d((1, 4)),  # output shape (16, 1, 16)
            nn.Dropout(self.drop_out)  # output shape (16, 1, 16)
        )

        self.block_3 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,  # input shape (16, 1, 16)
                out_channels=16,  # num_filters
                kernel_size=(1, 3),  # filter size
                groups=16,
                padding = 'same',
                bias=False
            ),  # output shape (16, 1, 16)
            nn.Conv2d(
                in_channels=16,  # input shape (16, 1, 16)
                out_channels=16,  # num_filters
                kernel_size=(1, 1),  # filter size
                bias=False
            ),  # output shape (16, 1, 16)
            nn.BatchNorm2d(16),  # output shape (16, 1, 16)
            nn.ELU(),
            nn.AvgPool2d((1, 8)),  # output shape (16, 1, 2)
            nn.Dropout(self.drop_out)
        )

        self.out = nn.Linear((16 * 2), classes_num)

    def forward(self, x):
        x = self.block_1(x)    
        x = self.block_2(x)  
        x = self.block_3(x)
        x = x.view(x.size(0), -1)
        x = self.out(x)
        return  x  # return x for visualization


In [None]:
model = EEGNet(4)
a = torch.randn((32,1,64,64))
b = model(a)
b.shape

In [None]:
model = EEGNet(4).to(device)

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-3) 
#optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) 
milestones = [50,100,150,200,250]
milestones = [a * len(train_loader) for a in milestones]
scheduler = MultiStepLR(optimizer, milestones=milestones, gamma=0.5)

In [None]:

# Train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
    correct=0
    total=0
    running_loss = 0
    for i, (X, Y) in enumerate(train_loader):
        X = X.to(device)
        Y = Y.to(device)


        # Forward pass
        outputs = model(X)
        loss = criterion(outputs, Y)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        #scheduler.step() 
        #print(scheduler.get_last_lr()[0])

        optimizer.step()
        scheduler.step() 
        #print(optimizer.param_groups[0]["lr"])

        _, predicted = outputs.max(1)
        total += Y.size(0)
        correct += predicted.eq(Y).sum().item()
        running_loss += loss.item()
        accu=100.*correct/total
        train_loss = running_loss/(i+1)
        print ('Epoch [{}/{}], Step [{}/{}], Training Accuracy: {:.4f}%, Training Loss: {:.4f}%'.format(epoch+1, num_epochs, i+1, total_step, accu, train_loss))


        #writer.add_scalar(f'train/accuracy', accu, epoch)
        #writer.add_scalar(f'train/loss', train_loss, epoch)
        writer.add_scalars(f'train/accuracy_loss', {
            'accuracy': accu,
            'loss': train_loss,
        }, epoch)


In [None]:

# Test the model
model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
with torch.no_grad():
    correct = 0
    total = 0
    for X, Y in test_loader:
        X = X.to(device)
        Y = Y.to(device)
        outputs = model(X)
        _, predicted = torch.max(outputs.data, 1)
        total += Y.size(0)
        correct += (predicted == Y).sum().item()

    print('Test Accuracy : {} %'.format(100 * correct / total))

# Save the model checkpoint
#torch.save(model.state_dict(), 'model.ckpt')


#Test Accuracy : 52.946081156197884 %