In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
import time

from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split,KFold
frameNum = 60 

# load data
dfs = []
for i in range(1,33):
  for j in range(1,41):
    filename = '/kaggle/input/deap-research/DE/subject_%d_trial_%d.txt'%(i,j)
    cols = [i for i in range(frameNum)]
    df = pd.read_csv(filename, header = None, usecols = cols, delimiter=',')   
    dfs.append(df.values)
    #print('participant%dvideo%d.txt'%(i,j))

dfs = np.array(dfs)
print('dataLoaded:')
print(dfs.shape)

#dataLoaded:
#(1280, 1024, 60)

# normalize
x_min = dfs.min(axis = (1,2),keepdims=True)
x_max = dfs.max(axis = (1,2),keepdims=True)
dfs_normal = (dfs-x_min)/(x_max-x_min)
depth = 3
# divide frames ,or 60s is too long for a single 3dinput
reshape_dfs = np.split(dfs_normal, frameNum/depth, axis=2)
reshape_dfs = np.array(reshape_dfs)
reshape_dfs = np.reshape(reshape_dfs,[-1,1024,depth])
print(reshape_dfs.shape)

#(25600, 1024, 3)

# load label
cols = ['valence', 'arousal', 'dominance', 'liking']
label_df = pd.read_csv('/kaggle/input/deap-research/label.txt',
    usecols = [i for i in range(4)], header=None, delimiter=',')
print(label_df.shape)
label_df.columns = cols
label_df[label_df<4.5] = 0
label_df[label_df>=4.5] = 1

#(1280, 4)

# valence
label = label_df['arousal'].astype(int).values
label = np.tile(label,20)
print(label.shape)

class cnn_classifier(nn.Module):
  def __init__(self):
    super().__init__()
    self.conv11 = nn.Conv3d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)
    self.conv12 = nn.Conv3d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1)
    self.pool1 = nn.MaxPool3d(kernel_size=2, padding=(0,0,1))
    
    self.conv21 = nn.Conv3d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
    self.conv22 = nn.Conv3d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
    self.pool2 = nn.MaxPool3d(kernel_size=2, padding=(0,0,1))
    
    self.conv31 = nn.Conv3d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
    self.conv32 = nn.Conv3d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1)
    self.pool3 = nn.MaxPool3d(kernel_size=2, padding=0)
    

    self.fc_layer = nn.Linear(128*4*4*1, 2)
    
    self.dropout_layer = nn.Dropout(p=0.5)

  def forward(self, xb):
    h1 = self.conv11(xb)
    h1 = self.conv12(h1)
    h1 = self.dropout_layer(h1)
    h1 = self.pool1(h1)
    h1 = F.relu(h1)

    h2 = self.conv21(h1)
    h2 = self.conv22(h2)
    #h2 = self.dropout_layer(h2)
    h2 = self.pool2(h2)
    h2 = F.relu(h2) 

    h3 = self.conv31(h2)
    h3 = self.conv32(h3)
    #h3 = self.dropout_layer(h3)
    h3 = self.pool3(h3)
    h3 = F.relu(h3) 
    
    
    # flatten the output from conv layers before feeind it to FC layer
    flatten = h3.view(-1, 128*4*4*1)
    out = self.fc_layer(flatten)
    #out = self.dropout_layer(out)
    return out

def train_model(model, x_train, y_train, x_test, y_test, epochs=50 , batch_size=32, lr=0.0001, weight_decay=0):
  # data
  train_dataset = TensorDataset(x_train, y_train)
  train_data_loader = DataLoader(train_dataset, batch_size=batch_size)

  # loss function
  loss_func = F.cross_entropy

  # optimizer
  #optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay)
  optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

  # figure
  train_a = list([])
  test_a = list([])

  # training loop
  for epoch in range(epochs):
    model.train()
    tic = time.time()
    acc_train = []
    for xb, yb in train_data_loader:    
      xb, yb = xb.to(device), yb.to(device)
      pred = model(xb)
      loss = loss_func(pred, yb)
      
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      acc_train.append(pred.detach().argmax(1).eq(yb).float().mean().cpu().numpy())
    acc_train = np.mean(acc_train)
    toc = time.time()
    
    with torch.no_grad():
      model.eval()
      y_pred = model(x_test.to(device))
      acc = y_pred.argmax(1).eq(y_test.to(device)).float().mean().cpu().numpy()

    train_a.append(acc_train)
    test_a.append(acc)
    print('Loss at epoch %d : %f, train_acc: %f, test_acc: %f, running time: %d'% (epoch, loss.item(), acc_train, acc, toc-tic))
  
  train_amax = max(train_a)
  test_amax = max(test_a)

  # draw an accuray figure
  """
  plt.plot(train_a,'y.-.')
  plt.plot(test_a,'.-.')
  plt.xlabel('epoch')
  plt.ylabel('accuracy')"""
  plt.plot(train_a, 'y.-.', label='Train Accuracy')  # Add label for train accuracy
  plt.plot(test_a, '.-.', label='Test Accuracy')    # Add label for test accuracy
  plt.xlabel('Epoch')
  plt.ylabel('Accuracy')
  plt.legend()  # Enable legend
  plt.title('Train vs Test Accuracy Over Epochs')  # Add title
  #plt.show()



  return train_amax,test_amax

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, roc_curve, roc_auc_score, confusion_matrix

# Tracking variables for best fold
best_fold_idx = -1
best_test_acc = -1
best_fold_train_acc = []
best_fold_test_acc = []
best_fold_losses = []
best_y_true = None
best_y_pred = None
best_y_scores = None

# K-fold validation
x_train = reshape_dfs
y_train = label
kf = KFold(n_splits=10)

fold_idx = 0
for train_index, test_index in kf.split(x_train, y_train):  
    print(f"Fold {fold_idx}:")
    
    # Splitting the data
    x_train_fold = x_train[train_index]
    y_train_fold = y_train[train_index]
    x_test_fold = x_train[test_index]
    y_test_fold = y_train[test_index]
    
    # Convert to tensors
    x_train_fold = torch.from_numpy(x_train_fold).float()
    y_train_fold = torch.from_numpy(y_train_fold).long()
    x_test_fold = torch.from_numpy(x_test_fold).float()
    y_test_fold = torch.from_numpy(y_test_fold).long()
    
    # Initialize model
    model = cnn_classifier()
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = model.to(device)

    # Lists to store accuracies and losses for each fold
    train_acc_per_fold = []
    test_acc_per_fold = []
    losses_per_fold = []

    # Training loop
    def train_model(model, x_train, y_train, x_test, y_test, epochs=50 , batch_size=32, lr=0.0001, weight_decay=0):
        train_dataset = TensorDataset(x_train, y_train)
        train_data_loader = DataLoader(train_dataset, batch_size=batch_size)

        # Loss function
        loss_func = F.cross_entropy
        # Optimizer
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

        # Training loop
        for epoch in range(epochs):
            model.train()
            toc = time.time()
            acc_train = []
            epoch_loss = 0
            for xb, yb in train_data_loader:    
                xb, yb = xb.to(device), yb.to(device)
                pred = model(xb)
                loss = loss_func(pred, yb)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                acc_train.append(pred.detach().argmax(1).eq(yb).float().mean().cpu().numpy())
                epoch_loss += loss.item()

            acc_train = np.mean(acc_train)
            tic = time.time()
            train_acc_per_fold.append(acc_train)
            losses_per_fold.append(epoch_loss / len(train_data_loader))  # Avg loss

            # Evaluate on test set
            model.eval()
            with torch.no_grad():
                y_pred = model(x_test.to(device)).argmax(1)
                acc_test = y_pred.eq(y_test.to(device)).float().mean().cpu().numpy()
                test_acc_per_fold.append(acc_test)
                
            print('Loss at epoch %d : %f, train_acc: %f, test_acc: %f, running time: %d'% (epoch, loss.item(), acc_train, acc_test, toc-tic))

    # Train the model on the current fold
    train_model(model, x_train_fold.view(-1, 1, 32, 32, depth), y_train_fold,
                x_test_fold.view(-1, 1, 32, 32, depth), y_test_fold)
    
    # After training, check if this fold has the best test accuracy
    if max(test_acc_per_fold) > best_test_acc:
        best_test_acc = max(test_acc_per_fold)
        best_fold_idx = fold_idx
        best_fold_train_acc = train_acc_per_fold
        best_fold_test_acc = test_acc_per_fold
        best_fold_losses = losses_per_fold

        # Save the best fold's predictions for further evaluation
        best_y_true = y_test_fold.cpu().numpy()
        best_y_pred = model(x_test_fold.view(-1, 1, 32, 32, depth).to(device)).argmax(dim=1).cpu().numpy()
        best_y_scores = model(x_test_fold.view(-1, 1, 32, 32, depth).to(device)).softmax(dim=1)[:, 1].detach().cpu().numpy()

    
    fold_idx += 1

# After k-fold, evaluate and plot results for the best fold
print(f"Best Fold: {best_fold_idx}, Test Accuracy: {best_test_acc}")

# Plot training and test accuracy for the best fold
plt.figure(figsize=(14, 6))

# Plot accuracy
plt.subplot(1, 2, 1)
plt.plot(range(1, len(best_fold_train_acc) + 1), best_fold_train_acc, label='Train Accuracy', color='blue')
plt.plot(range(1, len(best_fold_test_acc) + 1), best_fold_test_acc, label='Test Accuracy', color='orange')
plt.title('Accuracy vs Epochs of Wavelet Energy (Valence)')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (Kernel 5x5)')
plt.legend()

# Plot loss
plt.subplot(1, 2, 2)
plt.plot(range(1, len(best_fold_losses) + 1), best_fold_losses, label='Loss', color='red')
plt.title('Loss vs Epochs of Wavelet Energy (Valence)')
plt.xlabel('Epochs')
plt.ylabel('Loss (Kernel 5x5)')
plt.legend()

plt.tight_layout()
plt.savefig('accuracy_loss_Wavelet_Energy_valence_5ker.png')
plt.show()

# Confusion Matrix for best fold
def plot_confusion_matrix(y_true, y_pred, title='Confusion Matrix'):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.savefig('confusion_matrix_Wavelet_Energy_valence_5ker.png')
    plt.show()

plot_confusion_matrix(best_y_true, best_y_pred, title='Confusion Matrix of Wavelet Energy (Valence) with Kernel 5x5')

# Classification Report for best fold
print('Classification Report of Wavelet Energy (Valence) with Kernel 5x5:')
print(classification_report(best_y_true, best_y_pred))

# ROC Curve for best fold
def plot_roc_curve(y_true, y_scores):
    fpr, tpr, _ = roc_curve(y_true, y_scores)
    roc_auc = roc_auc_score(y_true, y_scores)
    np.savetxt('roc_curve_Wavelet_Energy_valence_5ker.txt', np.c_[fpr, tpr], header='FPR TPR', comments='')
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='blue', label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='red', linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) of Wavelet Energy with Kernel 5x5 (Valence)')
    plt.legend(loc='lower right')
    plt.savefig('roc_curve_Wavelet_Energy_valence_5ker.png')
    plt.show()

plot_roc_curve(best_y_true, best_y_scores)