# Classification of Emotional-Task Related fMRI Data

## Imports

In [None]:
from sklearn.svm import LinearSVC, SVC
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score,confusion_matrix

import os
from tqdm.notebook import tqdm
from copy import deepcopy
from itertools import combinations
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
import nibabel as nib

## Main constants

In [None]:
DATASET_FILE = r'/kaggle/input/ds003548/trials/labels.csv'
SELECTED_SUBJECTS = list(range(1, 17, 1))
RUN_SET_SPLITS = {
    'train': [1, 2, 3, 4, 5],
    'test': ['-']
}
SELECTED_CLASSES = {'neutral': 0, 'happy': 1, 'sad': 2, 'angry': 3}

## Dataset handling

In [None]:
def load_set(set_df: pd.DataFrame, flatten: bool = True, time_first: bool = True, averaged_over_time: bool = False) -> np.ndarray:
    data = []
    labels = []
    for _, row in set_df.iterrows():
        # Load image (change paths to be compatible with Kaggle)
        data_path = row['ext_frmi_pths'].replace(r'C:\Users\tormi\Documents\Egyetem\PhD\Learn\IDA\HW\data', r'/kaggle/input/ds003548')
        data_path = data_path.replace('\\', os.sep)
        loaded_data = nib.load(data_path).get_fdata()
        
        # Loaded dimensions height x width x slices x times
        if time_first:
            loaded_data = loaded_data.transpose(3, 0, 1, 2)
            
            # Feature vector
            if flatten:
                loaded_data = loaded_data.reshape((loaded_data.shape[0], loaded_data.shape[1], -1))
                loaded_data = loaded_data.reshape((loaded_data.shape[0], -1))

            if averaged_over_time:
                loaded_data = loaded_data.mean(axis=0, keepdim=True)

        elif flatten:
            # Feature vector
            loaded_data = loaded_data.reshape((-1, loaded_data.shape[-1]))

            if averaged_over_time:
                loaded_data = loaded_data.mean(axis=-1, keepdim=True)
        
        data.append(loaded_data)
        # Label constants to be in ascending order
        row_labels = np.zeros((len(loaded_data), len(SELECTED_CLASSES.keys())))
        row_labels[:, SELECTED_CLASSES[row['trial_types']]] = 1
        labels.append(row_labels.argmax(-1))

    data = np.concatenate(data, axis=0)
    labels = np.concatenate(labels, axis=0)

    return data, labels

def load_and_split_dataset(dataset_file: str, selected_classes: list, selected_subjects: list, train_runs: list, test_runs: list, scale: bool = True):
    # Load dataset
    dataset_file = pd.read_csv(dataset_file, sep=';')
    dataset_numpy = dataset_file.to_numpy()
    
    # Filter subjects and classes
    subject_filter = dataset_numpy[:, 1] == selected_subjects[0]
    if len(selected_subjects) > 1:
        for subject in selected_subjects[1:]:
            subject_filter += (dataset_numpy[:, 1] == subject)

    class_filter = dataset_numpy[:, 4] == selected_classes[0]
    if len(selected_classes) > 1:
        for class_type in selected_classes[1:]:
            class_filter += (dataset_numpy[:, 4] == class_type)

    train_run_filter = dataset_numpy[:, 2] == train_runs[0]
    if len(train_runs) > 1:
        for run in train_runs[1:]:
            train_run_filter += (dataset_numpy[:, 2] == run)

    test_run_filter = dataset_numpy[:, 2] == test_runs[0]
    if len(test_runs) > 1:
        for run in test_runs[1:]:
            test_run_filter += (dataset_numpy[:, 2] == run)

    train_set_file = dataset_numpy[subject_filter * class_filter * train_run_filter]
    test_set_file = dataset_numpy[subject_filter * class_filter * test_run_filter]

    train_set_file = pd.DataFrame(data=train_set_file, columns=dataset_file.columns)
    test_set_file = pd.DataFrame(data=test_set_file, columns=dataset_file.columns)

    # Load data sets and labels
    train_data, train_labels = load_set(train_set_file)
    if not test_set_file.empty:
        test_data, test_labels = load_set(test_set_file)
    else:
        test_data, test_labels = None, None

    # Standardize data
    if scale:
        sc_train = StandardScaler()
        train_data = sc_train.fit_transform(train_data)

        if not test_set_file.empty:
            sc_test = StandardScaler()
            test_data = sc_test.fit_transform(test_data)

    return train_data, train_labels, test_data, test_labels

## Linear Kernel SVM Subject-Wise Classification

In [None]:
# Dict for saving metrics and best fold-model
subject_linear_svcs = dict()

folds = len(RUN_SET_SPLITS['train'])
with tqdm(total=folds * len(SELECTED_SUBJECTS), leave=True) as pbar:
    for subject in SELECTED_SUBJECTS:
        print(f'Subject {subject}')
        
        # Subject-related metrics
        subject_linear_svcs[str(subject)] = {
            'model': None,
            'accuracies': [],
            'recall': [],
            'precision': [],
            'fscore': []
        }
        
        # Leave-one-run-out folds
        best_acc = 0.
        for fold in range(folds):
            print(f'Fold {fold}')
            try:
                # Gather train and test runs
                train_runs = deepcopy(RUN_SET_SPLITS['train'])
                train_runs.remove(fold + 1)
                test_runs = [fold + 1]
                
                # Load folds
                fold_train_data, fold_train_labels, fold_test_data, fold_test_labels = \
                    load_and_split_dataset(DATASET_FILE, list(SELECTED_CLASSES.keys()), [subject], train_runs, test_runs)
                
                # Subject-wise SVM
                linear_svc = SVC(verbose=0, C=0.1, kernel='linear', max_iter=1000)
                linear_svc.fit(fold_train_data, fold_train_labels)
                predictions = linear_svc.predict(fold_test_data)
                
                # Metric calculations based on confusion matrix
                fold_acc = accuracy_score(predictions, fold_test_labels)
                conf_matrix = confusion_matrix(fold_test_labels, predictions)
                diag = np.eye(*conf_matrix.shape, dtype=bool)
                recall = np.sum(conf_matrix, axis=-1, keepdims=True)
                recall = np.where(recall > 0, (conf_matrix[diag] / recall.T).T, np.zeros_like(recall))
                precision = np.sum(conf_matrix, axis=0, keepdims=True)
                precision = np.where(precision > 0, (conf_matrix[diag] / precision), np.zeros_like(precision)).T
                accuracy = np.sum(conf_matrix[diag]) / np.sum(conf_matrix)
                
                # Save metrics
                print(f'Fold accuracy: {fold_acc * 100}%')
                if fold_acc > best_acc:
                    best_acc = fold_acc
                    subject_linear_svcs[str(subject)]['model'] = deepcopy(linear_svc)
                subject_linear_svcs[str(subject)]['accuracies'].append(fold_acc)

                fscore = 2 * precision * recall / (precision + recall)
                fscore = [value.item() for value in fscore]
                subject_linear_svcs[str(subject)]['fscore'].append(fscore)

                recall = [value.item() for value in recall]
                precision = [value.item() for value in precision]
                subject_linear_svcs[str(subject)]['recall'].append(recall)
                subject_linear_svcs[str(subject)]['precision'].append(precision)
                
            except Exception as e:
                print('Warning! Something went wrong!')
                print(str(e))
            
            pbar.update(1)

# Save trainin metrics and SVMs
from joblib import dump
subject_linear_svcs = dump(r'/kaggle/working/subject_linear_svcs.joblib')

## Metric visualizations

In [None]:
# Load trainin metrics and SVMs
from joblib import load
subject_linear_svcs = load(r'/kaggle/working/subject_linear_svcs.joblib')

# Subject-wise accuracy scores
for_df = {
    'accuracy': [],
    'subject': [],
}
for subject in SELECTED_SUBJECTS:
    for_df['accuracy'].extend(subject_linear_svcs[str(subject)]['accuracies'])
    for_df['subject'].extend([f's{subject}'] * len(subject_linear_svcs[str(subject)]['accuracies']))
df = pd.DataFrame.from_dict(for_df)

# Plot scores
sns.set_theme(style="whitegrid")
g = sns.catplot(
    data=df, kind="bar",
    x="subject", y="accuracy", palette="dark", alpha=.7, height=8
)
g.despine(left=True)
g.set_axis_labels("Patient", "Accuracy")
g.fig.suptitle("Classification Accuracies of Separate SVMs per Subject")
g.set(ylim=(0, 1))


In [None]:
# Class-wise other metrics
for_df = {
    'fscores': [],
    'precisions': [],
    'recalls': [],
    'classes': [],
}
for subject in SELECTED_SUBJECTS:
    recalls = np.array(subject_linear_svcs[str(subject)]['recall'])
    precisions = np.array(subject_linear_svcs[str(subject)]['precision'])
    fscores = np.array(subject_linear_svcs[str(subject)]['fscore'])
    for class_idx, class_id in enumerate(SELECTED_CLASSES.keys()):
        for_df['fscores'].extend(list(fscores[:, class_idx]))
        for_df['precisions'].extend(list(precisions[:, class_idx]))
        for_df['recalls'].extend(list(recalls[:, class_idx]))
        for_df['classes'].extend([f'{class_id}'] * recalls.shape[0])
df = pd.DataFrame.from_dict(for_df)

def plot_df(df, x, y, x_title, title):
    sns.set_theme(style="whitegrid")
    g = sns.catplot(
        data=df, kind="bar",
        x=x, y=y, palette="dark", alpha=.7, height=8
    )
    g.despine(left=True)
    g.set_axis_labels("Class", x_title)
    g.fig.suptitle(title)
    g.set(ylim=(0, 1))

In [None]:
plot_df(df, 'classes', 'recalls', 'Recall', '')

In [None]:
plot_df(df, 'classes', 'precisions', 'Precision', '')

In [None]:
plot_df(df, 'classes', 'fscores', 'F-Score', '')

## SVM coefficient visualization

In [None]:
import matplotlib.pyplot as plt
from skimage.util import montage
from skimage import color

# Gather coefficients and data w.r.t a subject or subjects
subj_coefs = []
subj_data = []

subjects = [1]
for subject in subjects:
    fold_train_data, _, _, _ = \
        load_and_split_dataset(DATASET_FILE, list(SELECTED_CLASSES.keys()), [subject], RUN_SET_SPLITS['train'], ['-'], scale=False)
    fold_train_data = np.split(fold_train_data, fold_train_data.shape[0]//15)
    fold_train_data = np.mean(fold_train_data, axis=1)
    subj_data.append(fold_train_data)
    
    subj_coefs.append(
        np.expand_dims(subject_linear_svcs[str(subject)]['model'].coef_, axis=0)
    )

subj_data = np.concatenate(subj_data, axis=0)
subj_data = np.mean(subj_data, axis=0)
subj_data = subj_data.reshape(61, 61, 37).transpose(2, 0, 1)

subj_coefs = np.concatenate(subj_coefs, axis=0)
subj_coefs = np.mean(np.mean(subj_coefs, axis=0), axis=0)
subj_coefs = subj_coefs.reshape(61, 61, 37).transpose(2, 0, 1)
subj_coefs_pos = np.where(subj_coefs > 0, subj_coefs, 0)
subj_coefs_neg = np.where(subj_coefs < 0, np.abs(subj_coefs), 0)

# Visualize activation maps based on SVM Coefficients
plt.figure(figsize=(20, 20))
plt.imshow(montage(subj_coefs_pos[1:]))
plt.imshow(montage(subj_coefs_neg[1:]))
plt.colorbar()
plt.imshow(montage(subj_data[1:]), cmap="gray", alpha=0.5)
plt.grid(None)
plt.show()

In [None]:
# Visualize wighted activation maps based on SVM Coefficients
plt.figure(figsize=(20, 20))
plt.imshow(montage(subj_coefs_pos[1:] * subj_data[1:]))
plt.imshow(montage(subj_coefs_neg[1:] * subj_data[1:]))
plt.colorbar()
plt.imshow(montage(subj_data[1:]), cmap="gray", alpha=0.5)
plt.grid(None)
plt.show()