In [196]:
from torchvision import transforms as T
from torch.utils.data import DataLoader
import io
import zipfile
from torchnet import meter

In [197]:
import torch
import os
from PIL import Image
from torch.utils import data
import numpy as np
import pandas as pd
from torchvision import transforms as T
import io
import zipfile

In [198]:
from torch import nn

In [211]:
class CNN_Model(nn.Module):
    def __init__(self):
        super(CNN_Model, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1).cuda()
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1).cuda()
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=64, kernel_size=3, stride=1, padding=1).cuda()
        self.conv4 = nn.Conv1d(in_channels=64, out_channels=32, kernel_size=3, stride=1, padding=1).cuda()
        self.conv5 = nn.Conv1d(in_channels=32, out_channels=16, kernel_size=3, stride=1, padding=1).cuda()
        self.classifier = nn.Sequential(
            nn.Linear(2048, 20),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(20, 1),
            nn.Sigmoid()
        ).cuda()
        # (5120x128 and 160x20)
    def forward(self, x):

        x = torch.relu(self.conv1(x))
        # print(x.shape)
        x = torch.relu(self.conv2(x))
        # print(x.shape)
        x = torch.relu(self.conv3(x))
        # print(x.shape)
        x = torch.relu(self.conv4(x))
        # print(x.shape)
        x = torch.relu(self.conv5(x))
        # print(x.shape)
        x = torch.flatten(x,1)
        x = self.classifier(x)
        
        return x

    def save(self, name=None):
        """
        save the model
        """
        if name is None:
            prefix = 'checkpoints/' + 'face_classifier_'
            name = time.strftime(prefix + '%m%d_%H:%M:%S.pth')
        torch.save(self.state_dict(), name)
        return name

In [213]:
class Deap(data.Dataset):
    def __init__(self, label = "valence", kind = "train", samples = 74040):
        self.label = label
        self.bio_path = "./data/DEAP/bio/"
        self.label_path = "./data/DEAP/labels/participant_ratings.csv"
        self.kind = kind
        self.samples = samples
        self.deap_indices_dict = {1: 2400,
                             2: 2400,
                             3: 2340,
                             4: 2400,
                             5: 2340,
                             6: 2400,
                             7: 2400,
                             8: 2400,
                             9: 2400,
                             10: 2400,
                             11: 2220,
                             12: 2400,
                             13: 2400,
                             14: 2340,
                             15: 2400,
                             16: 2400,
                             17: 2400,
                             18: 2400,
                             19: 2400,
                             20: 2400,
                             21: 2400,
                             22: 2400,23: 2400,
             24: 2400,
             25: 2400,
             26: 2400,
             27: 2400,
             28: 2400,
             29: 2400,
             30: 2400,
             31: 2400,
             32: 2400}
        

        # train_size = int(0.8 * len(indices))
        # test_size = len(indices) - train_size
        # self.train_indices, self.test_indices = torch.utils.data.random_split(indices, [train_size, test_size])
        
        self.sub_trial_second = []
        for subject in range(1,32):
            for segment in range(1,int(deap_indices_dict[subject]/60+1)):
                for second in range(1, 61):
                    self.sub_trial_second.append((subject, segment, second))

        self.train_size = int(0.8 * self.samples)
        self.test_size = self.samples - self.train_size
        # print(self.samples)
        # print(len(self.sub_trial_second))
        # print(self.train_size, self.test_size)
        # print(len(self.sub_trial_second[:self.samples]))
        self.train_indices, self.test_indices = torch.utils.data.random_split(self.sub_trial_second[:self.samples], [self.train_size, self.test_size])
        

    def __getitem__(self, index):
    
        # subject, trial , second = self.sub_trial_second[index]

        subject, trial , second = self.train_indices[index] if self.kind == "train" else self.test_indices[index]
        
        subject_path = os.path.join(self.bio_path, f's{subject}/{subject}_{trial}_{second}.npy')
        bio_data = torch.tensor(np.load(subject_path)).float()
        
        self.eeg = bio_data[:32]
        
        self.labels = pd.read_csv(self.label_path)
        
        if self.label == "valence":
            valence = 0 if self.labels[(self.labels['Participant_id']==subject) & (self.labels['Trial']==trial)]['Valence'].iloc[0] < 5 else 1
            return self.eeg, valence
        else:
            arousal = 0 if self.labels[(self.labels['Participant_id']==subject) & (self.labels['Trial']==trial)]['Arousal'].iloc[0] < 5 else 1
            return self.eeg, arousal


    def __len__(self):
        if self.kind == "train":
            return self.train_size
        return self.test_size

In [214]:
def train_model(lr = 0.001, batch_size = 32, epochs = 50, samples = 74040):

    train_eeg = Deap(kind = "train", samples = samples)
    test_eeg = Deap(kind = "test", samples = samples)

    train_loader = DataLoader(train_eeg, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(test_eeg, batch_size=batch_size, shuffle=True)

    device = torch.device('cuda')
    

    model = CNN_Model()
    
    criterion = torch.nn.BCELoss()
    lr = lr 
    optimizer = torch.optim.Adam(model.parameters(), lr = lr)

    loss_meter = meter.AverageValueMeter()
    
    #  creating a file results 
    try : 
        os.mkdir("./results")
    except Exception as E:
        print("File results made")
    best_accuracy = 0

    file_name = "./results/result.txt"
    for epoch in range(epochs):
        pred_label = []
        true_label = []

        loss_meter.reset()
        for ii, (data, label) in enumerate(train_loader):
        
            input_ = data.float().to(device)
            label = label.float().to(device)

            optimizer.zero_grad()
            
            pred = model(input_).float()
            pred = pred.squeeze().float()
            # If there's only one element, enclose it in brackets, to be tensor of one-dimension
            if pred.dim() == 0:
                pred = pred.unsqueeze(0)

            # print(pred, label, ii)
            loss = criterion(pred, label)
            loss.backward()
            optimizer.step()

            # meters update
            loss_meter.add(loss.item())

            pred = (pred >= 0.5).float().to(device).data
            pred_label.append(pred)
            true_label.append(label)

        pred_label = torch.cat(pred_label,0)
        true_label = torch.cat(true_label,0)

        print(epoch, torch.sum(pred_label == true_label), true_label.size(0))
        train_accuracy = torch.sum(pred_label == true_label).type(torch.FloatTensor) / true_label.size(0)
        out_put('Epoch: ' + 'train' + str(epoch) + '| train accuracy: ' + str(train_accuracy.item()), file_name)

        val_accuracy = val(model, val_loader, use_gpu = True)
        out_put('Epoch: ' + 'train' + str(epoch) + '| train loss: ' + str(loss_meter.value()[0]) +
              '| val accuracy: ' + str(val_accuracy.item()), file_name)

        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            best_epoch = epoch
            model.save(f"{file_name}_best.pth")

    model.save(f'{file_name}.pth')

    perf = f"best accuracy is {best_accuracy} in epoch {best_epoch}" + "\n"
    out_put(perf,file_name)

    return best_accuracy

In [215]:
x = train_model() 

File results made
0 tensor(33729, device='cuda:0') 59232
tensor(8488, device='cuda:0') 14808
1 tensor(35468, device='cuda:0') 59232
tensor(9091, device='cuda:0') 14808
2 tensor(37377, device='cuda:0') 59232
tensor(9844, device='cuda:0') 14808
3 tensor(39762, device='cuda:0') 59232
tensor(10403, device='cuda:0') 14808
4 tensor(41315, device='cuda:0') 59232
tensor(10713, device='cuda:0') 14808
5 tensor(42676, device='cuda:0') 59232
tensor(11028, device='cuda:0') 14808
6 tensor(43681, device='cuda:0') 59232
tensor(11244, device='cuda:0') 14808
7 tensor(44468, device='cuda:0') 59232
tensor(11478, device='cuda:0') 14808
8 tensor(45422, device='cuda:0') 59232
tensor(11797, device='cuda:0') 14808
9 tensor(46210, device='cuda:0') 59232
tensor(11915, device='cuda:0') 14808
10 tensor(46719, device='cuda:0') 59232
tensor(12066, device='cuda:0') 14808
11 tensor(47401, device='cuda:0') 59232
tensor(12217, device='cuda:0') 14808
12 tensor(47717, device='cuda:0') 59232
tensor(12267, device='cuda:0') 

In [203]:
@torch.no_grad()
def val(model, dataloader, use_gpu):
    model.eval()
    if use_gpu:
      device = torch.device('cuda')
    else:
      device = torch.device('cpu')

    pred_label = []
    true_label = []

    for ii, (data, label) in enumerate(dataloader):
        
        input_ = data.float().to(device)
        label = label.to(device)
        pred = model(input_).float()

        pred = (pred >= 0.5).float().to(device).data
        pred = pred.squeeze().float()
        
        # If there's only one element, enclose it in brackets, to be tensor of one-dimension
        if pred.dim() == 0:
            pred = pred.unsqueeze(0)
        pred_label.append(pred)
        true_label.append(label)

    # print(pred_label, len(pred_label))
    # print(true_label,len(true_label))
    pred_label = torch.cat(pred_label, 0)
    true_label = torch.cat(true_label, 0)
    
    print(torch.sum(pred_label == true_label), true_label.size(0))
    val_accuracy = torch.sum(pred_label == true_label).type(torch.FloatTensor) / true_label.size(0)

    model.train()

    return val_accuracy

In [137]:
def out_put(string, verbose):
    '''
    Help function for verbose,
    output the string to destination path

    Parameters
    ----------
    string  :str,  the string to output
    verbose :str, the path to store the output
    '''
    with open(f"{verbose}.txt", "a") as f:
        f.write(string + "\n")