In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import Dataset

import os
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold
import warnings
warnings.filterwarnings('ignore')

  from .autonotebook import tqdm as notebook_tqdm


### Options

In [2]:
"""
    < How to Set Options>
    1. Select GPU
    2. Name the direcory to save the weights
    3. Select Score Calculation Method
    4. Set the weight of Cross Entropy
    5. Select the type of Distance
"""
# Settings
DEVICE = "cuda:0"
TARGET = "valence"  # valence / arousal
OUTPUT_DIR = "SAMPLE"
FORMULA = "EXP" # EXP: Expectation / BER: Bernoulli
ALPHA = 1
DIST = "absolute"
EPOCH = 1000
# Fixed
DATA_PATH = "/home/aicv/Documents/2023_EEG/DEAP_Dataset/Feature_Extraction/Base_EEG/basemean_yes_pt"
LABEL_PATH = "/home/aicv/Documents/2023_EEG/DEAP_Dataset/EEG_label/EEG_label.csv"
WRITE_PATH = "/home/aicv/Documents/2023_EEG/weights"
N_CLASSES = 9
LEARNING_RATE = 1e-4
BATCH = 2

In [3]:
def create_directory(directory):
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
    except OSError:
        print("Error: Failed to create the directory.")

In [4]:
label = pd.read_csv(LABEL_PATH)
print("Arousal sd:", np.sqrt(np.var(label.arousal)))
print("Arousal mean:", np.mean(label.arousal))
print("Valence sd:", np.sqrt(np.var(label.valence)))
print("Valence mean:", np.mean(label.valence))

Arousal sd: 2.01971007608589
Arousal mean: 5.1567109375
Valence sd: 2.1299830169143954
Valence mean: 5.2543125


### Data Loader

In [4]:
class DEAP(Dataset):
    def __init__(self, data_dir, label_path, target="valence"):
        self.data_dir = data_dir
        self.label = pd.read_csv(label_path)
        self.target = target

    def __len__(self):
        return len(self.label)

    def __getitem__(self, idx):
        name = self.label.iloc[idx, 0]
        data_path = os.path.join(self.data_dir, name)+'_win_128.pt'
        eeg_signal = torch.load(data_path)
        
        if self.target == "valence":
            valence = self.label.iloc[idx, 1]
            return name, eeg_signal.unsqueeze(1).type("torch.FloatTensor"), valence
        elif self.target == "arousal":
            arousal = self.label.iloc[idx, 2]
            return name, eeg_signal.unsqueeze(1).type("torch.FloatTensor"), arousal


In [6]:
dataset_train = DEAP(data_dir=DATA_PATH, label_path=LABEL_PATH)
trainloader = torch.utils.data.DataLoader(dataset_train, batch_size=3)

In [8]:
for name, eeg, val in trainloader:
    eeg = eeg.to(DEVICE)
    val = val.to(DEVICE)

    print("---------")
    print(name)
    print(eeg.shape)
    print(val)
    print(torch.round(val).long())
    # print(aro)


---------
('s01_trial01', 's01_trial02', 's01_trial03')
torch.Size([3, 7680, 1, 9, 9])
tensor([7.7100, 8.1000, 8.5800], device='cuda:0', dtype=torch.float64)
tensor([8, 8, 9], device='cuda:0')
---------
('s01_trial04', 's01_trial05', 's01_trial06')
torch.Size([3, 7680, 1, 9, 9])
tensor([4.9400, 6.9600, 8.2700], device='cuda:0', dtype=torch.float64)
tensor([5, 7, 8], device='cuda:0')
---------
('s01_trial07', 's01_trial08', 's01_trial09')
torch.Size([3, 7680, 1, 9, 9])
tensor([7.4400, 7.3200, 4.0400], device='cuda:0', dtype=torch.float64)
tensor([7, 7, 4], device='cuda:0')
---------
('s01_trial10', 's01_trial11', 's01_trial12')
torch.Size([3, 7680, 1, 9, 9])
tensor([1.9900, 2.9900, 2.7100], device='cuda:0', dtype=torch.float64)
tensor([2, 3, 3], device='cuda:0')
---------
('s01_trial13', 's01_trial14', 's01_trial15')
torch.Size([3, 7680, 1, 9, 9])
tensor([1.9500, 4.1800, 3.1700], device='cuda:0', dtype=torch.float64)
tensor([2, 4, 3], device='cuda:0')
---------
('s01_trial16', 's01_tria

In [45]:
true.unsqueeze(0)

tensor([[4, 7]], dtype=torch.int32)

In [48]:
F.one_hot(true, num_classes=9)

tensor([[0, 0, 0, 0, 1, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0]])

In [49]:
true = torch.round(val).type("torch.LongTensor")
loss(result, true, cal.calculate(result), cal.calculate(F.one_hot(true, num_classes=9)))

RuntimeError: The size of tensor a (5) must match the size of tensor b (2) at non-singleton dimension 0

### Model

In [5]:
class depthwise_separable_conv(nn.Module):
    def __init__(self, nin, nout, kernel=(5, 5), stride=1, padding=1):
        super(depthwise_separable_conv, self).__init__()
        self.depthwise = nn.Conv2d(nin, nin, kernel, stride=stride, padding=padding, groups=nin)
        nn.init.kaiming_normal_(self.depthwise.weight)  # Parameter Initialization
        self.pointwise = nn.Conv2d(nin, nout, kernel_size=1)
        nn.init.kaiming_normal_(self.pointwise.weight)  # Parameter Initialization
    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out

In [6]:
class Model(nn.Module):
    def __init__(self, device='cpu', input_image=torch.zeros(1, 7680, 1, 9, 9), kernel=(5, 5), stride=1, padding=1, n_classes=9, n_units=128):
        """
        EmoticonNet
            - Input: Electroencephalogram Signals 
                    [Batch Size, Sequence, Image Size]
            - Return: Probabilities for each class 
                    [Batch Size, n_classes]
        """
        super(Model, self).__init__()

        self.device = device
        n_channel = input_image.shape[2]
        
        # EmoticonNet 2D ConvNet
        self.ENet2D = nn.Sequential(
            depthwise_separable_conv(n_channel, 32, kernel, stride=stride, padding=padding),
            nn.BatchNorm2d(32),
            nn.ELU(),
            depthwise_separable_conv(32, 64, kernel, stride=stride, padding=padding),
            nn.BatchNorm2d(64),
            nn.ELU(),
            depthwise_separable_conv(64, 128, kernel, stride=stride, padding=padding),
            nn.BatchNorm2d(128),
            nn.ELU()
        ).to(self.device)

        # EmoticonNet Temporal 1D Convolution
        self.ENet1D = nn.Conv1d(3*3*128, 128, input_image.shape[1], stride=stride, padding=0).to(self.device)
        nn.init.kaiming_normal_(self.ENet1D.weight)  # Parameter Initialization

        # FC layers
        self.fc = nn.Linear(128, 128).to(self.device)
        self.fc1 = nn.Linear(128, 128).to(self.device)
        self.fc2 = nn.Linear(128, n_classes).to(self.device)
        self.max = nn.Softmax().to(self.device)
    
    def forward(self, x):
        print("--------")
        tmp = x.reshape(x.shape[0]*x.shape[1], x.shape[2], x.shape[3], x.shape[4]).to(self.device)
        print(tmp.device)
        tmp = self.ENet2D(tmp).to(self.device)
        print(tmp.device)
        tmp = tmp.reshape(x.shape[0], x.shape[1], 128, 3, 3)
        print(tmp.device)
        temp_conv = F.elu(self.ENet1D(tmp.reshape(x.shape[0], 3*3*128, x.shape[1])))
        print(temp_conv.device)
        temp_conv = temp_conv.reshape(temp_conv.shape[0], -1)
        print(temp_conv.device)
        del tmp

        out = temp_conv/torch.max(temp_conv)
        print(out.device)
        out1 = F.sigmoid(self.fc(out))
        print(out1.device)
        out = torch.mul(out, out1)
        print(out.device)
        out = F.elu(self.fc1(out))
        print(out.device)
        out = self.fc2(out)
        print(out.device)
        out = self.max(out)
        print(out.device)

        return out


In [7]:
class Calculator():
    def __init__(self, cal_type, penalty=1):
        self.cal_type = cal_type
        self.penalty = penalty
    def calculate(self, probs):
        if self.cal_type == "EXP":
            return self.expectation(probs)
        elif self.cal_type == "BER":
            return self.bernoulli(probs)
    def expectation(self, probs):
        return torch.sum(torch.tensor(range(1,10)).to(probs.device)*probs, 1)
    def bernoulli(self, probs):
        return torch.sum(torch.tensor(range(1,10)).to(probs.device)*probs - self.penalty*(1-probs)*probs, 1)

In [50]:
model = Model(device=DEVICE)
sample = torch.rand(5, 7680, 1, 9, 9)
result = model(sample.to(DEVICE))

torch.Size([5, 128, 1])


In [59]:
model(eeg.to("cuda:1"))

torch.Size([2, 128, 1])


tensor([[0.0986, 0.1088, 0.1134, 0.1139, 0.1119, 0.1265, 0.1027, 0.1185, 0.1057],
        [0.1049, 0.0999, 0.1005, 0.1095, 0.1168, 0.1199, 0.1164, 0.1203, 0.1118]],
       device='cuda:1', grad_fn=<SoftmaxBackward0>)

In [13]:
cal = Calculator(cal_type=FORMULA)
cal.calculate(result)

tensor([4.9802, 4.9597, 4.8923, 5.0023, 4.9262], device='cuda:1',
       grad_fn=<SumBackward1>)

In [14]:
cal2 = Calculator(cal_type="BER")
cal2.calculate(result)

tensor([4.0917, 4.0710, 4.0040, 4.1139, 4.0379], device='cuda:1',
       grad_fn=<SumBackward1>)

In [15]:
del model

### Loss Function

In [8]:
class Custom_Loss(nn.Module):
    def __init__(self, device='cpu', alpha=1, dist_type=None, bernoulli_penalty=False, n_classes=9):
        super(Custom_Loss, self).__init__()
        self.device = device
        self.dist_type = dist_type
        self.n_classes = n_classes
        self.alpha = alpha
        # self.ce = nn.CrossEntropyLoss()
        # self.mse = nn.MSELoss()
    def forward(self, pred_probs, true_class, pred_points, true_points):
        """
            pred_probs: [B x n_classes]
            true_classes: [B x 1]
            pred_points: [B x 1]
            pred_points: [B x 1]
        """
        MSE = nn.MSELoss()
        CE = nn.CrossEntropyLoss()

        mse = MSE(pred_points, true_points).to(self.device)
        ce = torch.tensor(0, dtype=torch.float64).to(self.device)
        # ce = -1 * (torch.log(1 - pred_probs)+1e-6) * (1-nn.functional.one_hot(true_class, num_classes=self.n_classes))
        # ce = torch.nan_to_num(ce, posinf=1e10, neginf=-1e10, nan= 0)

        # No Distance
        if not self.dist_type:
            weight = torch.tensor([1]*pred_probs.shape[0]).to(self.device)
            for index, pred in enumerate(pred_probs):
                ce += weight[index] * CE(pred.unsqueeze(0), true_class[index].unsqueeze(0))
        # 1(:Offset) + Abs. Distance
        elif self.dist_type == "absolute":
            weight = torch.abs(torch.subtract(torch.argmax(pred_probs, 1), true_class)) + 1
            for index, pred in enumerate(pred_probs):
                ce += weight[index] * CE(pred.unsqueeze(0), true_class[index].unsqueeze(0))
        # 1(:Offset) + Sqr. Distance
        elif self.dist_type == "square":
            weight = torch.subtract(torch.argmax(pred_probs, 1), true_class) ** 2 + 1
            for index, pred in enumerate(pred_probs):
                ce += weight[index] * CE(pred.unsqueeze(0), true_class[index].unsqueeze(0))
        
        ce = ce / pred_probs.shape[0]

        loss = mse + self.alpha*ce

        return loss, mse, ce
        


In [9]:
class Custom_Loss(nn.Module):
    def __init__(self, device='cpu', alpha=1, dist_type=None, bernoulli_penalty=False, n_classes=9):
        super(Custom_Loss, self).__init__()
        self.device = device
        self.dist_type = dist_type
        self.n_classes = n_classes
        self.alpha = alpha
        self.ce = nn.CrossEntropyLoss()
        self.mse = nn.MSELoss()
    def forward(self, pred_probs, true_class, pred_points, true_points):
        """
            pred_probs: [B x n_classes]
            true_classes: [B x 1]
            pred_points: [B x 1]
            pred_points: [B x 1]
        """
        mse = self.mse(pred_points, true_points).to(self.device)
        ce = torch.tensor(0, dtype=torch.float64).to(self.device)
        # ce = -1 * (torch.log(1 - pred_probs)+1e-6) * (1-nn.functional.one_hot(true_class, num_classes=self.n_classes))
        # ce = torch.nan_to_num(ce, posinf=1e10, neginf=-1e10, nan= 0)

        true_class = true_class.long()
        true_points = true_points.long()

        # No Distance
        if not self.dist_type:
            weight = torch.tensor([1]*pred_probs.shape[0]).to(self.device)
            for index, pred in enumerate(pred_probs):
                ce += weight[index] * self.ce(pred.unsqueeze(0), true_class[index].unsqueeze(0))
        # 1(:Offset) + Abs. Distance
        elif self.dist_type == "absolute":
            weight = torch.abs(torch.subtract(torch.argmax(pred_probs, 1), true_class)) + 1
            for index, pred in enumerate(pred_probs):
                ce += weight[index] * self.ce(pred.unsqueeze(0), true_class[index].unsqueeze(0))
        # 1(:Offset) + Sqr. Distance
        elif self.dist_type == "square":
            weight = torch.subtract(torch.argmax(pred_probs, 1), true_class) ** 2 + 1
            for index, pred in enumerate(pred_probs):
                ce += weight[index] * self.ce(pred.unsqueeze(0), true_class[index].unsqueeze(0))
        
        ce = ce / pred_probs.shape[0]

        loss = mse + self.alpha*ce

        return loss.float(), mse, ce
        


In [12]:
loss = Custom_Loss(device=DEVICE, alpha=1, dist_type="absolute")

In [268]:
loss(result, true, cal.calculate(result), cal.calculate(F.one_hot(true, num_classes=9)))

(tensor(12.0389, device='cuda:0', dtype=torch.float64, grad_fn=<AddBackward0>),
 tensor(3.6991, device='cuda:0', grad_fn=<MseLossBackward0>),
 tensor(8.3398, device='cuda:0', dtype=torch.float64, grad_fn=<DivBackward0>))

### Train

Conduct 5-Fold Cross Validation for DEAP Dataset

In [14]:
print(torch.__version__)

1.13.1+cu116


In [10]:
def get_5fold(label_path):
    label = pd.read_csv(label_path)
    trial = label.iloc[:,0].str.split("_trial").str[1].astype('int64')
    test_indexes = [
        ([idx for idx in range(len(trial)) if trial[idx] not in range(1, 9)], [idx for idx in range(len(trial)) if trial[idx] in range(1, 9)]),
        ([idx for idx in range(len(trial)) if trial[idx] not in range(9, 17)], [idx for idx in range(len(trial)) if trial[idx] in range(9, 17)]),
        ([idx for idx in range(len(trial)) if trial[idx] not in range(17, 25)], [idx for idx in range(len(trial)) if trial[idx] in range(17, 25)]),
        ([idx for idx in range(len(trial)) if trial[idx] not in range(25, 33)], [idx for idx in range(len(trial)) if trial[idx] in range(25, 33)]),
        ([idx for idx in range(len(trial)) if trial[idx] not in range(33, 41)], [idx for idx in range(len(trial)) if trial[idx] in range(33, 41)])]
    
    return test_indexes


In [11]:
kfold = get_5fold(LABEL_PATH)
cal = Calculator(cal_type=FORMULA)

dataset = DEAP(data_dir=DATA_PATH, label_path=LABEL_PATH, target=TARGET)
# torch.backends.cudnn.enabled = False

for fold, (train_idx, test_idx) in enumerate(kfold):
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_idx)

    trainloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH, sampler=train_subsampler) 
    testloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH, sampler=test_subsampler) 

    # save_path = os.path.join(WRITE_PATH, OUTPUT_DIR, f'ALPHA_{ALPHA}', f'fold_{fold+1}')
    # create_directory(save_path)

    # f = open(os.path.join(output_dir, f'BIN_{BIN}', f'ALPHA_{ALPHA}', f'fold_{fold+1}', 'output_t5.csv'), 'w')
    # f.write('epoch,loss,MSE,CE,RMSE,v_loss,v_MSE,v_CE,v_RMSE\n')

    print(f'\nTraining for fold {fold+1}')

    LOSS = Custom_Loss(device=DEVICE, alpha=1, dist_type="absolute")

    model = Model(device=DEVICE)
    optimizer = Adam(model.parameters(), lr=1e-4)

    for epoch in range(EPOCH):
        model.train()

        optimizer.zero_grad()
        total_mse = 0
        total_ce = 0
        total_loss = 0
        num_data = 0

        for name, eeg, score in trainloader:
            num_data += eeg.shape[0]

            print(score)
            true = torch.round(score).long().to(DEVICE)
            print(true)

            if torch.sum(true) > 9*BATCH:
                print(">>> Overflow Occured!!")
                continue
            
            pred = model(eeg.to(DEVICE))
            print(pred)

            loss, mse, ce = LOSS(pred, true, cal.calculate(pred), cal.calculate(F.one_hot(true, num_classes=9)))

            total_mse += mse
            total_ce += ce
            total_loss += loss
            
            loss.backward()
            optimizer.step()

            print(f'>>> {loss}={mse}+{ce}')

        total_mse /= num_data
        total_ce /= num_data
        total_loss /= num_data

        print(f'>>> --- {total_loss}={total_mse}+{total_ce}')






Training for fold 1
tensor([4.4700, 3.0300], dtype=torch.float64)
tensor([4, 3], device='cuda:0')
--------
cuda:0
cuda:0
cuda:0
cuda:0
cuda:0
cuda:0
cuda:0
cuda:0
cuda:0
cuda:0
cuda:0
tensor([[0.1104, 0.1138, 0.1091, 0.1229, 0.1055, 0.1024, 0.0999, 0.1248, 0.1111],
        [0.1103, 0.1077, 0.1099, 0.1130, 0.1078, 0.0993, 0.1111, 0.1293, 0.1116]],
       device='cuda:0', grad_fn=<SoftmaxBackward0>)


RuntimeError: Found dtype Long but expected Float

In [16]:
score = torch.tensor([8.84, 7.21], dtype=torch.float64)

In [20]:
torch.round(score.to(DEVICE)).float().to(DEVICE)

RuntimeError: numel: integer multiplication overflow

In [261]:
trial

0        1
1        2
2        3
3        4
4        5
        ..
1275    36
1276    37
1277    38
1278    39
1279    40
Name: Unnamed: 0, Length: 1280, dtype: int64

In [264]:
[idx for idx in range(len(trial)) if trial[idx] in range(1, 9)]

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 200,
 201,
 202,
 203,
 204,
 205,
 206,
 207,
 240,
 241,
 242,
 243,
 244,
 245,
 246,
 247,
 280,
 281,
 282,
 283,
 284,
 285,
 286,
 287,
 320,
 321,
 322,
 323,
 324,
 325,
 326,
 327,
 360,
 361,
 362,
 363,
 364,
 365,
 366,
 367,
 400,
 401,
 402,
 403,
 404,
 405,
 406,
 407,
 440,
 441,
 442,
 443,
 444,
 445,
 446,
 447,
 480,
 481,
 482,
 483,
 484,
 485,
 486,
 487,
 520,
 521,
 522,
 523,
 524,
 525,
 526,
 527,
 560,
 561,
 562,
 563,
 564,
 565,
 566,
 567,
 600,
 601,
 602,
 603,
 604,
 605,
 606,
 607,
 640,
 641,
 642,
 643,
 644,
 645,
 646,
 647,
 680,
 681,
 682,
 683,
 684,
 685,
 686,
 687,
 720,
 721,
 722,
 723,
 724,
 725,
 726,
 727,
 760,
 761,
 762,
 763,
 764,
 765,
 766,
 767,
 800,
 801,
 802,
 803,
 804,
 805,
 806,
 807,
 840,
 841,
 842,
 843,


In [266]:
get_5fold(LABEL_PATH)

[[0,
  1,
  2,
  3,
  4,
  5,
  6,
  7,
  40,
  41,
  42,
  43,
  44,
  45,
  46,
  47,
  80,
  81,
  82,
  83,
  84,
  85,
  86,
  87,
  120,
  121,
  122,
  123,
  124,
  125,
  126,
  127,
  160,
  161,
  162,
  163,
  164,
  165,
  166,
  167,
  200,
  201,
  202,
  203,
  204,
  205,
  206,
  207,
  240,
  241,
  242,
  243,
  244,
  245,
  246,
  247,
  280,
  281,
  282,
  283,
  284,
  285,
  286,
  287,
  320,
  321,
  322,
  323,
  324,
  325,
  326,
  327,
  360,
  361,
  362,
  363,
  364,
  365,
  366,
  367,
  400,
  401,
  402,
  403,
  404,
  405,
  406,
  407,
  440,
  441,
  442,
  443,
  444,
  445,
  446,
  447,
  480,
  481,
  482,
  483,
  484,
  485,
  486,
  487,
  520,
  521,
  522,
  523,
  524,
  525,
  526,
  527,
  560,
  561,
  562,
  563,
  564,
  565,
  566,
  567,
  600,
  601,
  602,
  603,
  604,
  605,
  606,
  607,
  640,
  641,
  642,
  643,
  644,
  645,
  646,
  647,
  680,
  681,
  682,
  683,
  684,
  685,
  686,
  687,
  720,
  721,
  722,
  7

In [None]:
kf = KFold(n_splits=5, shuffle=True)

In [None]:
dataset_train = DEAP(data_dir=DATA_PATH, label_path=LABEL_PATH)
trainloader = torch.utils.data.DataLoader(dataset_train, batch_size=3)

In [None]:
for fold, (train_idx, test_idx) in enumerate(kf.split(wav_files, dx_label)):