# Deep Learning

## Import Module

In [None]:
!pip install captum

In [None]:
import os
import numpy as np
import random
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import transforms
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import recall_score, f1_score, classification_report, accuracy_score, confusion_matrix
from captum.attr import IntegratedGradients

In [None]:
device = torch.device('cuda:0')
device

In [None]:
# 固定Seed
seed = 695
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

## Preprocessing


### Load data

In [None]:
# control data
control_data_list = []
control_name_list = []
control_label_list = []
for dirname, _, filenames in os.walk('/kaggle/input/posner-data/control'):
    for file in filenames:
        control_data_list.append(os.path.join(dirname, file))
        control_name_list.append(file.split('_')[0])
        control_label_list.append('control')
control_data_list[:10], control_name_list[:10], control_label_list[:10]

In [None]:
# ADHD data
ADHD_data_list = []
ADHD_name_list = []
ADHD_label_list = []
for dirname, _, filenames in os.walk('/kaggle/input/posner-data/ADHD'):
    for file in filenames:
        ADHD_data_list.append(os.path.join(dirname, file))
        ADHD_name_list.append(file.split('_')[0])
        ADHD_label_list.append('ADHD')
ADHD_data_list[:10], ADHD_name_list[:10], ADHD_label_list[:10]

In [None]:
# TS data
TS_data_list = []
TS_name_list = []
TS_label_list = []
for dirname, _, filenames in os.walk('/kaggle/input/posner-data/TS'):
    for file in filenames:
        TS_data_list.append(os.path.join(dirname, file))
        TS_name_list.append(file.split('_')[0])
        TS_label_list.append('TS')
TS_data_list[:10], TS_name_list[:10], TS_label_list[:10]

In [None]:
all_data_list = control_data_list + ADHD_data_list + TS_data_list
all_name_list = control_name_list + ADHD_name_list + TS_name_list
all_label_list = control_label_list + ADHD_label_list + TS_label_list
all_df = pd.DataFrame()
all_df[['data', 'name', 'label']] = pd.DataFrame(list(zip(all_data_list, all_name_list, all_label_list)))
all_df

### Padding

In [None]:
max_len = 0
for file in all_df['data']:
    data = pd.read_csv(file)
    if len(data) > max_len: 
        max_len = len(data)
print(max_len)
blank = pd.Series(range(max_len))
for file in all_df['data']:
    data = pd.read_csv(file, usecols=[i for i in range(3, 7)])
    data_pad = pd.concat([blank, data], axis=1)
    data_pad = data_pad.fillna(0).drop(columns=[0])
    path = file.replace('input', 'working')
    if not os.path.isdir(os.path.split(path)[0]):
        os.makedirs(os.path.split(path)[0])
    
    avg = []
    for i in range(int(max_len/17)):
        avg.append(data_pad.iloc[i*17:(i+1)*17].mean(axis=0).values)
    
    data_final = pd.DataFrame(avg)
    data_final.T.to_csv(path, header=False, index=False)
    
all_df['data'] = all_df['data'].apply(lambda x: x.replace('input', 'working'))
all_df['label'] = all_df['label'].map({'control':0, 'ADHD':1, 'TS':2})

In [None]:
pd.read_csv(all_df['data'][0], header=None)

# Define Dataset

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, transform=None, target_transform=None):
        self.df = annotations_file
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.iloc[idx, 0]
        image = pd.read_csv(img_path, header=None).values
        name = self.df.iloc[idx, 1]
        label = self.df.iloc[idx, 2]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, name, label

# Define net

In [None]:
class Network(nn.Module):
    def __init__(self, pool = 6, fc1=1024, maxpool=2, conv8_open=False):
        super(Network, self).__init__()
        
        self.maxpool = maxpool
        
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=16, kernel_size=10, stride=1, padding=1)
        self.bn1 = nn.BatchNorm1d(16)        
        
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=10, stride=1, padding=1)
        self.bn2 = nn.BatchNorm1d(32)
        self.pool = nn.MaxPool1d(self.maxpool, padding= 1)
        
        self.conv4 = nn.Conv1d(in_channels=32, out_channels=32, kernel_size=10, stride=1, padding=1)
        self.bn4 = nn.BatchNorm1d(32)
        self.conv5 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=10, stride=1, padding=1)
        self.bn5 = nn.BatchNorm1d(64)   
        self.pool2 = nn.MaxPool1d(self.maxpool, padding= 1)

        self.conv6 = nn.Conv1d(in_channels=64, out_channels=64, kernel_size=10, stride=1, padding=1)
        self.bn6 = nn.BatchNorm1d(64)
        self.conv7 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=10, stride=1, padding=1)
        self.bn7 = nn.BatchNorm1d(64)
        self.conv8 = nn.Conv1d(in_channels=128, out_channels=128, kernel_size=10, stride=1, padding=1)
        self.bn8 = nn.BatchNorm1d(128)
        
        self.conv_open = conv8_open
        self.pools = pool
        self.ave_pool = nn.AdaptiveAvgPool1d(self.pools)
        
        # FC
        self.fc1_num = fc1
        self.fc1 = nn.Linear(128*self.pools, self.fc1_num)
        self.fc2 = nn.Linear(self.fc1_num, 64)
        self.fc3 = nn.Linear(64, 3)

        self.soft = nn.Softmax(dim=1)

        self.drop = nn.Dropout(0.1)


    def forward(self, input1):
        output = F.celu(self.conv1(input1))
        output = F.celu(self.conv2(output))
        output = self.bn2(output)
        output = self.pool(output) 
        
        output = F.celu(self.conv4(output))     
        output = F.celu(self.conv5(output)) 
        output = self.bn5(output)
        output = self.pool(output)   
        output =  F.celu(self.conv6(output))
        output =  F.celu(self.conv7(output))

        if self.conv_open:
            output =  F.elu(self.conv8(output))
        
        output = self.ave_pool(output)
        
        output = output.view(-1, 128*self.pools) 
        
        con = self.fc1(output)
        con = self.drop(con)
        con = self.fc2(con)   
#         con = self.drop(con)
        con = self.fc3(con)

        con = self.soft(con)

        return con

# Start training


### Data split

In [None]:
skf = StratifiedKFold(n_splits=3, shuffle= True, random_state= 695)
folds = list(skf.split(all_df, all_df['label']))

In [None]:
all_df['label'].value_counts()

### Define evaluetion and training function

In [None]:
def get_accuracy(dataloader):
    model.eval()
    running_loss, loss =0., 0.
    label_list, predicted_list = [], []
    total = 0
    correct = 0.
    with torch.no_grad():
        for i, (X, _, y) in enumerate(dataloader):
            images, labels = X.squeeze().float().to(device), y.squeeze().type(torch.LongTensor).to(device)
            # calculate outputs by running images through the network
            predictions = model(images)
            loss = criterion(predictions, labels)
            running_loss += loss.item()     
            loss = running_loss / (i+1)

            # the class with the highest energy is what we choose as prediction
            predicted = predictions.to('cpu').argmax(1)
            label = np.array(labels.to('cpu'), dtype= int)
            label_list += list(label.ravel())
            predicted_list += list(predicted.ravel())

        accuracy = accuracy_score(label_list, predicted_list)
    return label_list, predicted_list, loss, accuracy

In [None]:
def training(model, criterion, optimizer, name):
    for f, fold in enumerate(folds):
        higest_train_acc = 0
        before_acc = 0.
        higest_train = []
        higest_test =[]

        # splite train & test
        train_df = all_df.loc[fold[0]]
        test_df = all_df.loc[fold[1]]

        # make dataloader
        trans_comp = transforms.Compose([transforms.ToTensor()]) 
        train_dataset = CustomImageDataset(train_df, transform=trans_comp)
        test_dataset = CustomImageDataset(test_df, transform=trans_comp)
        train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, drop_last=True)
        test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

        train_loss_list = []
        train_acc_list = []
        test_loss_list = []
        test_acc_list = []


        for epoch in range(150):
            running_loss = 0.0

            for i, (X, _, y) in enumerate(train_dataloader, 0):
                model.train()

                # get the inputs; data is a list of [inputs, labels]
                inputs, labels = X.squeeze().float().to(device), y.squeeze().type(torch.LongTensor).to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward + backward + optimize
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                # print statistics
                running_loss += loss.item()

            # training accuracy
            train_label_list, train_predicted_list, train_loss, train_acc = get_accuracy(train_dataloader)

            # test accuracy
            test_label_list, test_predicted_list, test_loss, test_acc = get_accuracy(test_dataloader)
            

            train_loss_list.append(train_loss)
            train_acc_list.append(train_acc)
            test_loss_list.append(test_loss)
            test_acc_list.append(test_acc)

            rc_mean = recall_score(test_label_list, test_predicted_list, average=None)

            if test_acc > before_acc and rc_mean[0] == 1:
                print(train_acc, test_acc)
                print(rc_mean)
                before_acc = test_acc
                higest_train_acc = train_acc
                print("{}.pth".format(name+'_fold_'+str(f)+"_epoch_" +str(epoch)))
                torch.save(model.state_dict(), "{}.pth".format(name+'_fold_'+str(f)+"_epoch_" +str(epoch)))

        higest_train.append(higest_train_acc)
        higest_test.append(before_acc)
        print('higest_train_acc', higest_train_acc, "before_acc", before_acc)

    #     break
    print('Train_mean', np.mean(higest_train))
    print('Test_mean', np.mean(higest_test))
    print("Data Length", len(higest_train))
    
    return train_loss_list, train_acc_list, test_loss_list, test_acc_list

### Start training


In [None]:
# compile network
model = Network(pool = 10, fc1=1024, maxpool=4, conv8_open=False).to(device)
weight = torch.tensor([1/5, 1/24, 1/29]).to(device)
criterion = nn.CrossEntropyLoss(weight=weight)
optimizer = optim.SGD(model.parameters(), lr=0.02, weight_decay= 0.0001) #lr * batch_size要一樣
name = 'control_adhd_ts_try'

train_loss_list, train_acc_list, test_loss_list, test_acc_list = training(model, criterion, optimizer, name)

### train/test accuracies plot

In [None]:
# record
df_result = pd.DataFrame()
df_result['train_loss'] = train_loss_list
df_result['train_acc'] = train_acc_list
df_result['test_loss'] = test_loss_list
df_result['test_acc'] = test_acc_list
df_result['epoch'] = [int(x) + 1 for x in df_result.index]

# plot
plt.figure(figsize= (7, 3))
plt.plot(df_result['epoch'].iloc[:100], df_result['train_acc'].iloc[:100], label= 'train_acc')
plt.plot(df_result['epoch'].iloc[:100], df_result['test_acc'].iloc[:100], label= 'test_acc')
plt.legend(fontsize= 15 )
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy plot')
plt.show()

## Reproduction

### Load best model

In [None]:
model.load_state_dict(torch.load('/kaggle/working/control_adhd_ts_try_fold_2_epoch_0.pth'))

### Reporduction & cofusion matrix

In [None]:
train_df = all_df.loc[folds[-1][0]]
test_df = all_df.loc[folds[-1][1]]
trans_comp = transforms.Compose([transforms.ToTensor()]) 
train_dataset = CustomImageDataset(train_df, transform=trans_comp)
test_dataset = CustomImageDataset(test_df, transform=trans_comp)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)
train_label_list, train_predicted_list, train_loss, train_acc = get_accuracy(train_dataloader)
test_label_list, test_predicted_list, test_loss, test_acc = get_accuracy(test_dataloader)

In [None]:
len(train_label_list), len(test_label_list)

In [None]:
cm = confusion_matrix(train_label_list, train_predicted_list)
plt.figure(figsize = (4,3))
sns.heatmap(cm, annot = True, fmt='.20g', cmap= sns.color_palette("Blues", as_cmap=True), xticklabels= ['Control', 'ADHD', 'TS'], yticklabels= ['Control', 'ADHD', 'TS'])
plt.title('Training CM')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

cm = confusion_matrix(test_label_list, test_predicted_list)
plt.figure(figsize = (4, 3))
sns.heatmap(cm, annot = True, fmt='.20g', cmap= sns.color_palette("Blues", as_cmap=True), xticklabels= ['Control', 'ADHD', 'TS'], yticklabels= ['Control', 'ADHD', 'TS'])
plt.title('Test CM')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

## Explainable AI -- Intergradient Greadient

In [None]:
def minmax(x):
    return (x - x.min())/(x.max() - x.min())

In [None]:
model.eval()
ig = IntegratedGradients(model)

for batch, (X, _, y) in enumerate(train_dataloader):
    input1 = X.squeeze().float().to(device)
    attributions, delta = ig.attribute(input1, target=0, return_convergence_delta=True)
    
    df_cam = pd.DataFrame()
    for nums, att in enumerate(attributions):
        df_cam[f'{nums}_cam0'] = att.cpu()[0]
        df_cam[f'{nums}_cam1'] = att.cpu()[1]
        df_cam[f'{nums}_cam2'] = att.cpu()[2]
        df_cam[f'{nums}_cam3'] = att.cpu()[3]
    df_rolling = minmax(df_cam.rolling(50).mean().bfill())
    
    for nums in range(len(attributions)):
        # First plot
        yy = np.linspace(-0.01, 1, 100)
        cmapp1 = [(1, 0, 0, a*0.5) for a in df_rolling[f'{nums}_cam0']]
        colors = cmapp1
        # 繪製水平色彩漸進圖
        fig, ax = plt.subplots(figsize=(40, 15))

        for i in range(len(colors)):
            ax.plot(np.ones_like(yy) * i, yy, color=colors[i], linewidth=4)
            
        plt.scatter(np.arange(0, len(df_rolling[f'{nums}_cam0']), 1), df_rolling[f'{nums}_cam0'], label= 'region1', c='black', linewidth=10)
        plt.show()