<a href="https://colab.research.google.com/github/mehdi-or/VT2PFC/blob/main/DATRACE_with_sklearn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import sys
from google.colab import drive
drive.mount('/content/gdrive')
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision
from torchvision import datasets, transforms
import torchvision.transforms as transforms
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import h5py
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset
import multiprocessing
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

Mounted at /content/gdrive


##finction to set the random see for reproducibility purposes

In [2]:
def set_seed(seed_value=42):
    """Set seed for reproducibility."""
    random.seed(seed_value)  # Python random module
    np.random.seed(seed_value)  # Numpy module
    torch.manual_seed(seed_value)  # PyTorch random number generator for CPU

    # If you are using CUDA
    torch.cuda.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)  # if you are using multi-GPU.

    # Additional configurations to enhance reproducibility
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

##setting up the architecture of the network

In [3]:
class DATRACE(nn.Module):
    def __init__(self, VTC_dim, PFC_dim, hidden_dim, bottleneck_dim, num_classes):
        super(DATRACE, self).__init__()
        # Encoding layers for the VTC
        self.encoder_VTC = nn.Linear(VTC_dim, hidden_dim)

        # Shared bottleneck layer
        self.shared_bottleneck = nn.Linear(hidden_dim, bottleneck_dim)

        # Decoding layers for PFC
        self.decoder = nn.Linear(bottleneck_dim, hidden_dim)
        self.prediction_PFC = nn.Linear(hidden_dim, PFC_dim)

        # Classification layer attached to the shared bottleneck
        self.classifier = nn.Linear(bottleneck_dim, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(p=0.2)

    def encode(self, x):
        # Encoder VTC
        enocded_VTC = torch.tanh(self.encoder_VTC(x))
        enocded_VTC = self.dropout(enocded_VTC)
        BN_shared = torch.tanh(self.shared_bottleneck(enocded_VTC))
        return BN_shared

    def decode(self, x):
        # Decoder PFC
        decoded = torch.tanh(self.decoder(x))
        decoded = self.dropout(decoded)
        predicted_PFC = self.prediction_PFC(decoded)
        return predicted_PFC

    def logis(self, x):
        # Classifier
        logits = self.classifier(x)
        probabilities = logits
        return probabilities


    def forward(self, x):
        # Encoder VTC
        BN = self.encode(x)
        # Decoder PFC
        predicted_PFC = self.decode(BN)
        # Classifier
        probabilities = self.logis(BN)

        return predicted_PFC, probabilities

##importing the data

In [4]:
def load_data_VTC(subject):
    with h5py.File(r'/content/gdrive/MyDrive/Colab Notebooks/CNC data/hrfAll_VT_PETERS.hdf5', 'r') as hdf:
        data0 = hdf.get('items/'+str(subject)+'/rcargs/items/0')
        data_vtc = np.array(data0)
        data_vtc = np.delete(data_vtc,np.where(~data_vtc.any(axis=0))[0],axis=1)
    return(data_vtc)

def load_data_PFC(subject):
    with h5py.File(r'/content/gdrive/MyDrive/Colab Notebooks/CNC data/hrfAll_DLPFC_PETERS.hdf5', 'r') as hdf:
        data0_pfc = hdf.get('items/'+str(subject)+'/rcargs/items/0')
        data_pfc = np.array(data0_pfc)
        data_pfc = np.delete(data_pfc,np.where(~data_pfc.any(axis=0))[0],axis=1)
    return(data_pfc)

def preprocessign (data, labels2categ):
  shuffle_index = np.arange(0,3600)
  data_train, data_test, y_categ_train, y_categ_test, map_train_index, map_test_index = train_test_split(data, labels2categ, shuffle_index, random_state=42)
  #scaler = StandardScaler()
  scaler = MinMaxScaler(feature_range=(-1,1))
  X_train = scaler.fit_transform(data_train)
  X_test = scaler.transform(data_test)
  return X_train, X_test, y_categ_train, y_categ_test, map_train_index, map_test_index

#setting the labels for pytorch is differen from keras
# the way it works is that we need to assign a number to each categorical class
unique_labels = pd.read_csv('/content/gdrive/MyDrive/Colab Notebooks/CNC data/unique_aranged.csv', header=None).values[:,1]
labels = pd.read_csv('/content/gdrive/MyDrive/Colab Notebooks/CNC data/label.csv')['y'].values
label_to_index = {label: idx for idx, label in enumerate(unique_labels)} #mapping form label to its numeric value
index_to_label = {idx: label for label, idx in label_to_index.items()} #mapping from numeric label to the name of the label

#turning label file into its numeric values
numeric_labels = []
for label in labels:
  numeric_labels.append(label_to_index[label])

numeric_labels = np.array(numeric_labels)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#converting all the numpy array inot pytorch tensor

# y_train and y_test is the same for every subject therefore
VTC0 = load_data_VTC(37)
_, _, y_train, y_test, map_train_index, map_test_index = preprocessign(VTC0, numeric_labels)
y_tensor_train = torch.tensor(y_train).to(device)
y_tensor_test =torch.tensor(y_test).to(device)
trainidx = int(len(y_tensor_test)*0.8)
y_tensor_test_train, y_tensor_test_test = y_tensor_test[:trainidx], y_tensor_test[trainidx:]

def load_subject(subject):
  VTC0 = load_data_VTC(subject)
  #VTC0 = np.random.normal(0,1, (3600, 2450)) #giving random noise as VTC data
  PFC0 = load_data_PFC(subject)
  VTC_dim = VTC0.shape[1]
  PFC_dim = PFC0.shape[1]
  num_classes = len(np.unique(labels))
  shuffle_index = np.arange(0,3600)
  VTC_train, VTC_test, _, _, _, _ = preprocessign(VTC0, numeric_labels)
  PFC_train, PFC_test, _, _, _, _ = preprocessign(PFC0, numeric_labels)

  VTC_tensor_train = torch.tensor(VTC_train, dtype=torch.float32).to(device)
  VTC_tensor_test = torch.tensor(VTC_test, dtype=torch.float32).to(device)
  PFC_tensor_train = torch.tensor(PFC_train, dtype=torch.float32).to(device)
  PFC_tensor_test = torch.tensor(PFC_test, dtype=torch.float32).to(device)

  train_dataset = TensorDataset(VTC_tensor_train, PFC_tensor_train, y_tensor_train)
  test_dataset = TensorDataset(VTC_tensor_test, PFC_tensor_test, y_tensor_test)
  return train_dataset, test_dataset, VTC_tensor_test, PFC_tensor_test

##Function for calculating the metrics


In [5]:
def fidelity(x, x_pred):
  x = x.detach().cpu().numpy()
  #x_pred = x_pred.cpu().numpy()
  #calculating recons fidelity
  corr_x = np.corrcoef(x, x_pred)[:x.shape[0],x.shape[0]:]
  corr_x = np.diag(corr_x)
  recons_fidelity = np.mean(corr_x)
  return recons_fidelity

## function for training the model

In [6]:
#train and val the network
def train_and_val_network (train_loader, val_loader, num_classes=40, BN_dim=30, num_epochs=300, alpha=0.1):
  data_iterator = iter(val_loader)
  input_VTC, input_PFC, labels = next(data_iterator)
  VTC_dim = input_VTC.shape[1]
  PFC_dim = input_PFC.shape[1]
  input_PFC = input_PFC.shape[1]
  # Define the model, optimizer, and loss functions
  model = DATRACE(VTC_dim=VTC_dim, PFC_dim=PFC_dim, hidden_dim=500, bottleneck_dim=BN_dim, num_classes=num_classes).to(device)
  optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)
  #mse_loss_fn = nn.MSELoss()
  mse_loss_fn = nn.L1Loss()
  #mse_loss_fn = CorrelatonLoss()
  classification_loss_fn = nn.CrossEntropyLoss()
  train_loss_PFC_hist = []
  train_loss_class_hist = []
  val_PFC_loss_hist = []
  train_loss_hist = []
  val_loss_hist = []
  for epoch in range(num_epochs):
      train_loss = 0
      predicted_loss_PFC = 0
      classification_loss = 0
      set_seed(42 + epoch)
      model.train()
      for input_VTC, input_PFC, labels in train_loader:
          optimizer.zero_grad()
          predicted_PFC, probabilities = model(input_VTC)
          # Calculate losses
          predicted_loss_PFC0 = mse_loss_fn(predicted_PFC, input_PFC)
          classification_loss0 = classification_loss_fn(probabilities, labels)
          # Total loss
          total_loss = predicted_loss_PFC0 + alpha*classification_loss0
          # Backpropagation and optimizer step
          total_loss.backward()
          optimizer.step()
          predicted_loss_PFC += predicted_loss_PFC0.item()
          classification_loss += classification_loss0.item()
          train_loss += total_loss.item()
      predicted_loss_PFC /= len(train_loader)
      classification_loss /= len(train_loader)
      train_loss /= len(train_loader)
      train_loss_PFC_hist.append(predicted_loss_PFC)
      train_loss_class_hist.append(classification_loss)
      train_loss_hist.append(train_loss)

      # Validation
      model.eval()
      val_loss = 0
      val_PFC_loss = 0
      with torch.no_grad():
          for input_VTC, input_PFC, labels in val_loader:
              reconstructed, probabilities = model(input_VTC)
              PFC_loss = mse_loss_fn(reconstructed, input_PFC)
              classification_loss = classification_loss_fn(probabilities, labels)
              loss = PFC_loss + alpha*classification_loss
              val_PFC_loss += PFC_loss.item()
              val_loss += loss.item()

      val_PFC_loss /= len(val_loader)
      val_loss /= len(val_loader)
      val_PFC_loss_hist.append(val_PFC_loss)
      val_loss_hist.append(val_loss)

      if (epoch+1)%(num_epochs/10)==0:
        print(f"Epoch {epoch+1}/{num_epochs} completed for alpha={alpha}.")
  return model, train_loss_hist, val_loss_hist


def subject_run(subject, ytrain=y_tensor_test_train, ytest=y_tensor_test_test):
  # writting a for loop to run the model for different bottleneck dimensions
  # And calculating the metrics we want
  train_dataset, test_dataset, VTC_tensor_test, PFC_tensor_test= load_subject(subject)

  set_seed(42) # to get the same training and testset everytime
  train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
  val_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
  data_iterator = iter(val_loader)
  input_VTC, input_PFC, labels = next(data_iterator)
  VTC_dim = input_VTC.shape[1]
  PFC_dim = input_PFC.shape[1]
  input_PFC = input_PFC.shape[1]

  alpha = 0.1 # Classifier weigth of the network
  num_epochs = 300
  num_classes = 40
  BN_dims = [10, 20, 30, 50, 100, 250, 500]
  accuracies = []
  pfc_fidels = []
  for BN_dim in BN_dims:
    model, train_loss_hist, val_loss_hist = train_and_val_network(train_loader, val_loader, num_classes, BN_dim, num_epochs, alpha)
    # Fetch a batch of test images
    predicted_pfc, pred_labels = model(VTC_tensor_test)
    predicted_pfc = predicted_pfc.detach().cpu().numpy()

    #fidelity
    pfc_fidel = fidelity(PFC_tensor_test, predicted_pfc)
    pfc_fidels.append(pfc_fidel)

    # training the seperate classifier
    X_train2, X_test2, y_train2, y_test2= train_test_split(predicted_pfc, y_test, random_state=42)
    model = LogisticRegression(multi_class='multinomial', solver='lbfgs', max_iter=100)
    model.fit(X_train2, y_train2)
    y_test_pred2 = model.predict(X_test2)
    # Calculate the accuracy on the test set
    test_accuracy = accuracy_score(y_test2, y_test_pred2)
    accuracies.append(test_accuracy)

    #saving the data
  column_labels = np.array(['dim', 'fidelity', 'prediction classifier accuracy'])
  all_data = np.array(np.column_stack((BN_dims, pfc_fidels, accuracies)))
  all_data_df = pd.DataFrame(all_data, columns=column_labels)
  all_data_df.to_csv('/content/gdrive/MyDrive/Colab Notebooks/VT2PFC/VTC2PFC/defense/DATRACE_subject_' + str(subject) + '.csv', index=False)
  return all_data_df

In [None]:
subjects = list(range(50, 71))
for subject in subjects:
  all_data_df = subject_run(subject, ytrain=y_tensor_test_train, ytest=y_tensor_test_test)
  all_data_df