In [None]:
import pickle
import h5py
import os
import torch
import random
import torch.utils.data as data
import torch.nn as nn
from torch.autograd import Variable
import time
import numpy


In [None]:
# Set HyperParameter
# learning_rate = 0.00001
learning_rate = 0.00001
gradient_direction = 0.9
batch_size = 310
epochs = 250
test_range = 32
total_process_count = test_range * epochs

TEST_TYPE = 0

if TEST_TYPE == 0:
    _type = 'Valence'
elif TEST_TYPE == 1:
    _type = 'Arousal'
else:
    raise ValueError('TEST_TYPE must be 1 or 2')

In [None]:
class DEAP_DataSet(data.Dataset):
    
    VALID_SPLIT = ('train', 'validation', 'test')
    DATASET = 'data/'
    
    def __init__(self, test_target, test_type=0, split='train', data_dir='processed_data.pkl', label_dir='y_train.pkl'):
        super(DEAP_DataSet, self).__init__()
        
        if split not in self.VALID_SPLIT:
            raise ValueError('Unknown split {:s}'.format(split))
        if not os.path.exists(self.DATASET + data_dir):
            raise ValueError('{:s} does not exist'.format(data_dir))
        if not os.path.exists(self.DATASET + label_dir):
            raise ValueError('{:s} does not exist'.format(label_dir))
            
        data_path = '{}/{}'.format(self.DATASET, data_dir)
        label_path = '{}/{}'.format(self.DATASET, label_dir)
        
        self.split = split
        self.data = pickle.load(open(data_path, 'rb'))
        self.labels = pickle.load(open(label_path, 'rb'))
        
        self.test_target = test_target
        self.test_type = test_type
        self.target_data = []
        
        # create train data set
        if self.split == self.VALID_SPLIT[0]:
            counter = 0
            for pp, pp_data in enumerate(self.data):
                if self.test_target is not pp:
                    for c, data in enumerate(pp_data):
                        counter += 1
                        temp_label = self.labels[pp][c][self.test_type]
                        temp_data = data.flatten()
                        self.target_data.append((temp_label, temp_data))
            
        # craete test data set
        if self.split == self.VALID_SPLIT[2]:
            counter = 0
            for pp, pp_data in enumerate(self.data):
                if self.test_target == pp:
                    for c, data in enumerate(pp_data):
                        counter += 1
                        temp_label = self.labels[pp][c][self.test_type]
                        temp_data = data.flatten()
                        self.target_data.append((temp_label, temp_data))
            
    def __getitem__(self, index):
        return self.target_data[index]
    
    def __len__(self):
        return len(self.target_data)

In [None]:
class DNN(nn.Module):
    def __init__(self):
        super(DNN, self).__init__()
        self.layer = nn.Sequential(
            nn.Linear(4040, 5000),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(5000, 500),
            nn.ReLU(),
            nn.Dropout(0.50),
            nn.Linear(500, 1000),
            nn.ReLU(),
            nn.Dropout(0.50),
            nn.Linear(1000, 3),
            #nn.Dropout(0.50),
            nn.Softmax(dim=1)
        )
    def forward(self, x):
        out = self.layer(x)
        return out

In [None]:
def run_train(test_num, test_type):
    train_model = DNN().double().cuda()
    
    DEAP_train = DEAP_DataSet(test_num, test_type=test_type, split='train')
    train_loader = torch.utils.data.DataLoader(DEAP_train, batch_size=batch_size, shuffle=False, num_workers=2, drop_last=False)
    
#     optimizer = torch.optim.RMSprop(train_model.parameters(), lr=learning_rate, alpha=gradient_direction)
    optimizer = torch.optim.SGD(train_model.parameters(), lr=learning_rate, momentum=0.9)
    loss_func = nn.CrossEntropyLoss().double()
    
    for epoch in range(epochs):
        count = 0
        for label, data in train_loader:
            x = Variable(data).double().cuda()
            y_ = Variable(label).type(torch.LongTensor).cuda()

            optimizer.zero_grad()
            output = train_model.forward(x)
            loss = loss_func(output, y_).double()
            loss.backward()
            optimizer.step()

            count += 1
    return train_model

In [None]:
def run_validation(model, test_num, test_type):
    DEAP_test = DEAP_DataSet(test_num, test_type=test_type, split='test')
    test_loader = torch.utils.data.DataLoader(DEAP_test, batch_size=batch_size, shuffle=False, num_workers=2, drop_last=False)
    
    total = 0
    correct = 0
    
    for label, data in test_loader:
        x = Variable(data, requires_grad=False).double().cuda()
        y_ = Variable(label).type(torch.LongTensor).cuda()
        
        with torch.no_grad():
            output = model.forward(x)
            _, output_index = torch.max(output, 1)

            total += label.size(0)
            correct += (output_index == y_).sum().float()
    return total, correct

In [None]:
total = 0
correct = 0
for test_num in range(test_range):
    
    print('# > [{}/{}] train start.'.format(test_num + 1, test_range))
    model = run_train(test_num, TEST_TYPE)
    print('# > [{}/{}] train end.'.format(test_num + 1, test_range))
    
    sub_total, sub_correct = run_validation(model, test_num, TEST_TYPE)
    current_process_count = (test_num + 1) * epochs
    print('# > [{}/{}] correct : {}% ({} / {}) -> {}%'.format(test_num + 1, test_range, 100 * sub_correct / sub_total, sub_correct, sub_total, 100 * current_process_count / total_process_count))
    
    total += sub_total
    correct += sub_correct
    print('# > total :  correct : {}% ({} / {})'.format(100 * correct / total, correct, total))

In [None]:
print('# > train and validation end')
print('# > final report')
print('type : {}, result : {}% ({} / {})'.format(_type, 100 * correct / total, correct, total))

In [None]:

TEST_TYPE = 1

if TEST_TYPE == 0:
    _type = 'Valence'
elif TEST_TYPE == 1:
    _type = 'Arousal'
else:
    raise ValueError('TEST_TYPE must be 1 or 2')

In [None]:
total = 0
correct = 0
for test_num in range(test_range):
    
    print('# > [{}/{}] train start.'.format(test_num + 1, test_range))
    model = run_train(test_num, TEST_TYPE)
    print('# > [{}/{}] train end.'.format(test_num + 1, test_range))
    
    sub_total, sub_correct = run_validation(model, test_num, TEST_TYPE)
    current_process_count = (test_num + 1) * epochs
    print('# > [{}/{}] correct : {}% ({} / {}) -> {}%'.format(test_num + 1, test_range, 100 * sub_correct / sub_total, sub_correct, sub_total, 100 * current_process_count / total_process_count))
    
    total += sub_total
    correct += sub_correct
    print('# > total :  correct : {}% ({} / {})'.format(100 * correct / total, correct, total))

In [None]:
print('# > train and validation end')
print('# > final report')
print('type : {}, result : {}% ({} / {})'.format(_type, 100 * correct / total, correct, total))