In [None]:
import os
import random
import torch

import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.utils import shuffle
from torch_geometric.nn import GCNConv, global_mean_pool
from torch_geometric.loader import DataLoader
from torch_geometric.data import Data



#### data interpretation 
* 64 channels * 21 trials per type(video/imagine) * 6000(guess sampling frequency is 0.01)
* reorder = ['sad4', 'sad5', 'sad8', 'dis4', 'dis5', 'dis8', 'fear4', 'fear5', 'fear8', 'neu4', 'neu5', 'neu8', 'joy4', 'joy5', 'joy8', 'ten4', 'ten5', 'ten8', 'ins4', 'ins5', 'ins8']

In [8]:
data = np.load('EEG_data\sub-01_ses-ima_task-emotion_reorder.npy')
data.shape

(64, 21, 6000)

### Brainstrom, what we can do ?
* first, classification
* then, try transfer learning ?
* more deeply, apply time-frequency analysis
    * classification
    * transfer learning


#### some basic funcs

In [None]:
def train(model, train_loader, optimizer, criterion):
    model.train()
    tol_loss = 0
    
    for batch in  train_loader:
        ## zero gradient
        optimizer.zero_grad()
        ## forward pass
        out = model.forward(batch.x, batch.edge_index, batch.batch)
        ## calculate loss according to output and y
        loss = criterion(out, batch.y)
        ## backward propagation
        loss.backward()
        ## do optimization
        optimizer.step()
        ## accumulate loss
        tol_loss += loss.item()
    ## return the mean loss    
    return tol_loss / len(train_loader)

def test(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    
    ## no need to do gradient descent in test phase  
    with torch.no_grad():
        for batch in  test_loader:
            ## forward pass
            out = model.forward(batch.x, batch.edge_index, batch.batch)
            prediction = out.argmax(dim = 1)
            correct += (prediction == batch.y).sum().item()
            total += batch.y.size(0)
            
        accuracy = correct / total
    return  accuracy   

def fit(model, train_loader, test_loader, epochs = 200, lr = 0.01):
        ## define optimizer and criterion
        optimizer = torch.optim.Adam(model.parameters(), lr = lr)
        ## here i choose cross entropy
        criterion = nn.CrossEntropyLoss()
        
        for epoch in range(epochs):
                train_loss = train(train_loader, optimizer, criterion)  
                test_acc = test(test_loader)
                print(f'Epoch {epoch:03d}, Train Loss: {train_loss:.4f}, Test Acc: {test_acc:.4f}')
        
        return test_acc


#### build a simple CNN

In [None]:
class simple_cnn(nn.Module):
    def __init__(self):
        '''
        Initialization params
        
        Args:
            None
        Returns:
            Nothing
        '''
        ## allocate the nn.module
        
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = 32, kernel_size = 3)
        self.conv2 = nn.Conv2d(in_channels = 32, out_channels = 3, kernel_size = 3)
        self.pool = nn.MaxPool2d(kernel_size = 2)
        self.fc1 = nn.Linear(in_features = 9216, out_features = 128)
        self.fc2 = nn.Linear(in_features = 128, out_features = 2)
    
    def forawrd(self):
        '''
        Here is a forward pass
        
        Args:
            x: torch.tensor
               Input feature   
                   
        Returns:
            x: torch.tensor
               Output of final fully connected layer
        '''
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.pool(x)
        x = self.fc2(x) 
        
        return x
        
       

### build a simple GCN
* GCN1 → ReLU → GCN2 → ReLU → meanpool → Linear1 → ReLU → Linear2 → ReLU → Output Layer（classification）


####  define GCN first

In [None]:
class SimpleGCN(nn.Module):
    def __init__(self, in_channels, hidden_channels, fc_hidden, num_classes):
        '''
        Args:
            in_channels: dimension of input
            hidden_channels: numbers of hidden_layer
            fc_hidden: dimension of input
            num_classes: numbers of classification
        '''
        super().__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)  
        self.fc1 = nn.Linear(hidden_channels, fc_hidden)
        self.fc2 = nn.Linear(fc_hidden, fc_hidden)
        self.classifier = nn.Linear(fc_hidden, num_classes)
        self.dropout = nn.Dropout(0.5) 
        
    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = global_mean_pool(x, batch=batch)
        
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout(x)

        x = self.fc2(x)
        x = F.relu(x)
        x = self.dropout(x)

        x = self.classifier(x)
        
        return x
        
    def train(self, train_loader, optimizer, criterion):
        self.train()
        tol_loss = 0
        
        for batch in  train_loader:
            ## zero gradient
            optimizer.zero_grad()
            ## forward pass
            out = self.forward(batch.x, batch.edge_index, batch.batch)
            ## calculate loss according to output and y
            loss = criterion(out, batch.y)
            ## backward propagation
            loss.backward()
            ## do optimization
            optimizer.step()
            ## accumulate loss
            tol_loss += loss.item()
        ## return the mean loss    
        return tol_loss / len(train_loader)
    
    def test(self, test_loader):
        self.eval()
        correct = 0
        total = 0
        
        ## no need to do gradient descent in test phase  
        with torch.no_grad():
            for batch in  test_loader:
                ## forward pass
                out = self.forward(batch.x, batch.edge_index, batch.batch)
                prediction = out.argmax(dim = 1)
                correct += (prediction == batch.y).sum().item()
                total += batch.y.size(0)
                
            accuracy = correct / total
        return  accuracy   
    
    def fit(self, train_loader, test_loader, epochs = 200, lr = 0.01):
            ## define optimizer and criterion
            optimizer = torch.optim.Adam(self.parameters(), lr = lr)
            ## here i choose cross entropy
            criterion = nn.CrossEntropyLoss()
            
            for epoch in range(epochs):
                  train_loss = self.train(train_loader, optimizer, criterion)  
                  test_acc = self.test(test_loader)
                  print(f'Epoch {epoch:03d}, Train Loss: {train_loss:.4f}, Test Acc: {test_acc:.4f}')
            
            return test_acc
                

#### ready for the dataset
* channels and sequences
* 'Fp1', 'Fpz', 'Fp2', 'AF7', 'AF3','AF4','AF8', 'F7', 'F5','F3','F1','Fz', 'F2', 'F4', 'F6', 'F8', 'FT7', 'FC5', 'FC3', 'FC1','FCz','FC2','FC4', 'FC6', 'FT8', 'T7','C5', 'C3', 'C1', 'Cz', 'C2', 'C4', 'C6', 'T8', 'TP7', 'CP5', 'CP3', 'CP1','CPz','CP2', 'CP4','CP6', 'TP8', 'P7','P5', 'P3', 'P1', 'Pz','P2', 'P4', 'P6', 'P8', 'PO7', 'PO3','POz', 'PO4','PO8', 'O1','Oz','O2', 'F9', 'F10', 'TP9', 'TP10'

In [None]:
num_channels = 15       ## actually it is 64       
num_features = 4        
num_edges = 31        

x = torch.randn((num_channels, num_features))   # feature， Should be the exact calue([mean, std, skewness, kurtosis])
y = torch.tensor([2])                           # real emotion 

# create connection （no-direction）
## i only use 15 channels, or the matrix would be too long for me to write....
edge_index = torch.tensor([
    [0, 1, 1, 2, 2, 3, 3, 4, 5 , 6, 6, 7, 7, 8, 8, 9, 9, 9,  10, 10, 11, 11, 12, 12, 13, 13, 13, 14, 14, 15, 15],  # start node
    [1, 0, 2, 1, 6, 0, 7, 9, 13, 2, 15,3, 8, 7, 9, 4, 8, 10, 9,  11, 10, 12, 11, 13, 12, 5,  14, 13, 15, 14, 6]    # goal node
], dtype=torch.long)

# build Data 
data = Data(x=x, edge_index=edge_index, y=y)

print(data)


####  now run this model

In [None]:
model = SimpleGCN(in_channels=3, hidden_channels=64, fc_hidden=128, num_classes=6)
res_acc = model.fit(train_loader='', test_loader='', epochs=50, lr=0.01)