In [1]:
# Load data
import numpy as np
from sklearn.model_selection import train_test_split
from Inner_Speech_Dataset.Python_Processing.Data_extractions import  Extract_data_from_subject

# Load all data for subject
def load_subject(subject_nr):
    datatype = "EEG"
    root_dir = "../dataset"

    data, description = Extract_data_from_subject(root_dir, subject_nr, datatype)
    return data, description

# Extract labels from the description
def extract_labels(desc):
    return desc[:,1]


# Test when extracting only the action interval
def extract_action_interval(data):
    return data[:,:,254:890]

def split_data(data, labels):
    trainv_data, test_data, trainv_labels, test_labels = train_test_split(data, labels, test_size = 0.2, random_state = 1)
    train_data, val_data, train_labels, val_labels = train_test_split(trainv_data, trainv_labels, test_size = 0.25, random_state = 1)

    return train_data, val_data, test_data, train_labels, val_labels, test_labels




# Load, extract and split data
data, description = load_subject(1)
labels = extract_labels(description)
#new_data = extract_action_interval(data)
#print(new_data.shape)


train_data, val_data, test_data, train_labels, val_labels, test_labels = split_data(data, labels)

# Shallow CNN
Implementation of the shallow CNN structure from (Schirrmeister et. al.) for the use on "Thinking out loud" dataset.

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import time
import os
import copy

class ShallowCNN(nn.Module):
    def __init__(self):
        super(ShallowCNN, self).__init__()
        first_channels = 40
        second_channels = 40

        # Temporal convolution
        self.tempconv = nn.Conv2d(in_channels = 1, out_channels = first_channels, kernel_size = (1, 25), padding = 0, bias = False)
        # Spatial convolution
        self.spatconv = nn.Conv2d(in_channels = first_channels, out_channels = 40, kernel_size=(128,1), padding = 0, bias = False)
        # Batch normalization
        self.batchnorm = nn.BatchNorm2d(40, False)
        # ELU
        self.elu = nn.ELU()
        # Mean pooling
        self.meanpool = nn.AvgPool2d(kernel_size = (1,75), stride = (1,15)) # This 15 might be a 75 :(

        # Classifier
        self.classifier = nn.Linear(2840,4, bias = False)
        # Softmax
        self.softmax = nn.Softmax()

        
    def forward(self, x, dropout = 0.5):
        res = self.tempconv(x)
        res = self.spatconv(res)
        res = self.batchnorm(res)
        res = self.elu(res)
        res = self.meanpool(res)
        res = F.dropout(res, dropout)
        res = torch.flatten(res, start_dim=1)
        res = self.classifier(res)
        res = self.softmax(res)
        return res

    def train_model(self, data, labels, epochs, batch_size, loss_func, optimizer):
        print("Epoch\t train loss\t validation loss\t train acc\t validation acc")
        best_model = copy.deepcopy(self.state_dict())
        best_loss = 500 # bad value for dis

        for epoch in range(epochs):
            epoch_loss = 0
            self.train() # Set model to train mode
            
            for i in range(len(data)//batch_size): # BATCH SIZE MUST BE EVEN DIVIDER OF DATA LEN, otherwise we miss stuff here
                start = i*batch_size
                end = (i+1)*batch_size

                train_inputs = torch.FloatTensor(data[start:end])
                train_labels = torch.LongTensor(labels[start:end])
                train_outputs = self(train_inputs)

                loss = loss_func(train_outputs, train_labels)
                
                #print("LOSS: ", loss)
                epoch_loss += loss
                loss.backward()
                optimizer.step()
            
            # Check for new best model, this should be on val_data instead
            if epoch_loss < best_loss:
                best_model = copy.deepcopy(self.state_dict())

            print("EPOCH ", epoch, " LOSS:", epoch_loss)

        self.load_state_dict(best_model) # Set model to best performing one.



In [3]:
train_data = torch.tensor(train_data)
train_data = torch.unsqueeze(train_data,1).float()
train_labels = torch.tensor(train_labels).long()

val_data = torch.tensor(val_data)
val_data = torch.unsqueeze(val_data,1).float()
val_labels = torch.tensor(val_labels).long()

test_data = torch.tensor(test_data)
test_data = torch.unsqueeze(test_data,1).float()
test_labels = torch.tensor(test_labels).long()


# Uncomment this if running on only 1 trial
#train_data = torch.unsqueeze(torch.unsqueeze(d,0),0).float() # Unsqueeze adds a wrapper dimension of 1s

In [4]:
network = ShallowCNN()
d = torch.unsqueeze(train_data[0],0)
print(d.size())
r = network.forward(d)
print(r.size())

# Dumb checking of initial weights
def accuracy_check(data, labels):
    r = network.forward(data)
    p = torch.max(r,1)[1]
    correct = 0.0
    for i in range(len(p)):
        if p[i] == labels[i]:
            correct += 1
    print("ACCURACY:", correct/len(p))


network = ShallowCNN().float()
accuracy_check(train_data, train_labels)

print("####### TRAINING #######")
op = optim.Adam(params = network.parameters(), lr = 0.0001)
lossf = nn.NLLLoss()
network.train_model(data = train_data, labels = train_labels, epochs = 5, batch_size = 10, loss_func = lossf, optimizer = op)

accuracy_check(train_data, train_labels)

torch.Size([1, 1, 128, 1153])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x2840 and 1120x4)

In [None]:
accuracy_check(train_data, train_labels)
accuracy_check(val_data, val_labels)
accuracy_check(test_data, test_labels)

  res = self.softmax(res)


ACCURACY: 0.6766666666666666
ACCURACY: 0.22
ACCURACY: 0.29
