<a href="https://colab.research.google.com/github/mrshamshir/Automated-Neurological-Disease-Classification/blob/main/SVM_ResNet_V3(final).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Imports and data loading

In [None]:
from pathlib import Path
import nibabel as nib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import pickle
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, accuracy_score

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch import Tensor
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

from sklearn import svm

In [None]:
# path to train datasets and labels

train_rCBF = Path("/content/drive/MyDrive/Assignment/training_images_rcbf.nii")
train_DAT = Path("/content/drive/MyDrive/Assignment/training_images_sbr.nii")

labels = pd.read_csv("/content/drive/MyDrive/Assignment/Diagnoses_of_training_data.csv")

In [None]:
# Load NIfTI and extract image data
train_PET_rCBF = nib.load(train_rCBF)
train_data_rCBF = train_PET_rCBF.get_fdata()

train_PET_DAT = nib.load(train_DAT)
train_data_DAT = train_PET_DAT.get_fdata()

### Dataset Creation

#### Loading data

In [None]:
def create_xdata(rCBF, DAT):
    # combine two images of same subjects
    res = np.stack((rCBF, DAT), axis = 3)
    res = np.transpose(res, (4, 3, 0, 1, 2))
    return res

xdata = create_xdata(train_data_rCBF, train_data_DAT)
print(xdata.shape)

(40, 2, 64, 64, 64)


#### Define Normalization and dataset classes

In [None]:
def calculate_mean_std(data):
    """
        Calculate the mean and standard deviation for each channel across all samples in the input data.

        Args:
        - data (numpy.ndarray): Input data with shape (num_samples, num_channels, depth, height, width).

        Returns:
        - mean (numpy.ndarray): Mean values for each channel across all samples, with shape (num_channels,).
        - std (numpy.ndarray): Standard deviation for each channel across all samples, with shape (num_channels,).
    """
    mean = np.mean(data, axis=(0, 2, 3, 4))
    std = np.std(data, axis=(0, 2, 3, 4))
    return mean, std


In [None]:
# Custom transformation to convert numpy array to tensor
class ToTensor(object):
    def __call__(self, sample):
        return torch.from_numpy(sample).float()  # Convert to float tensor


class Normalize3D(torch.nn.Module):
    def __init__(self, mean, std, inplace=False):
        """
            Initializes the 3D normalization module.

            Args:
            - mean (array-like): Mean values for each channel.
            - std (array-like): Standard deviation values for each channel.
            - inplace (bool): If True, normalize the tensor in-place. Default is False.
        """
        super().__init__()
        self.mean = torch.tensor(mean, dtype=torch.float32).view(-1, 1, 1, 1)
        self.std = torch.tensor(std, dtype=torch.float32).view(-1, 1, 1, 1)
        self.inplace = inplace

    def forward(self, tensor: Tensor) -> Tensor:
        """
            Forward pass of the normalization module.

            Args:
            - tensor (Tensor): Input tensor to be normalized.

            Returns:
            - Tensor: Normalized output tensor.
        """
        if self.inplace:
            tensor.sub_(self.mean).div_(self.std)
            return tensor
        else:
            return (tensor - self.mean) / self.std

    def __repr__(self) -> str:
        """
            Returns a string representation of the normalization module.

            Returns:
            - str: String representation of the module.
        """
        return f"{self.__class__.__name__}(mean={self.mean}, std={self.std})"


In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        """
            Custom dataset class for handling input data and labels.

            Args:
            - data (array-like): Input data array.
            - labels (array-like): Labels array corresponding to the input data.
            - transform (callable, optional): Optional transform to be applied to the input data.

        """
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        """
            Returns the length of the dataset.

            Returns:
            - int: Length of the dataset.
        """
        return len(self.data)

    def __getitem__(self, idx):
        """
            Retrieves an item from the dataset based on the provided index.

            Args:
            - idx (int): Index of the item to retrieve.

            Returns:
            - dict: A dictionary containing the input data and its corresponding label.
        """
        sample = {'input': self.data[idx], 'label': self.labels[idx] - 1}
        if self.transform:
            sample['input'] = self.transform(sample['input'])
        return sample


### Model creation and loading

In [None]:
class BasicBlock3D(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock3D, self).__init__()
        self.conv1 = nn.Conv3d(inplanes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm3d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv3d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm3d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class ResNet3D(nn.Module):
    def __init__(self, block, layers, num_classes=4, zero_init_residual=False):
        super(ResNet3D, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Conv3d(2, 64, kernel_size=(7, 7, 7), stride=(2, 2, 2), padding=(3, 3, 3), bias=False)  # Change input channels to 64
        self.bn1 = nn.BatchNorm3d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm3d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, BasicBlock3D):
                    nn.init.constant_(m.bn2.weight, 0)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv3d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm3d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def feature_extraction(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)

        return x



Loading ResNet3D model

In [None]:
loaded_model = ResNet3D(BasicBlock3D, [2, 2, 2, 2], num_classes=4)
path = '/content/drive/MyDrive/output/ML_Task/resnet3d_ML_Task_V4_best_0.0002_SGD_best_on_min_val_loss.pt'

checkpoint = torch.load(path, map_location=torch.device('cpu'))
loaded_model.load_state_dict(checkpoint)

<All keys matched successfully>

## K-Fold experiment

####Create K-Folds

In [None]:
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=5, random_state=13, shuffle=True)

In [None]:
res=skf.split(xdata, np.array(labels['diagnose']))

In [None]:
folds_info=[]

for i, (train_index, test_index) in enumerate(skf.split(xdata, np.array(labels['diagnose']))):
  print(f"Fold {i}:")
  print(f"  Train: index={train_index}")
  print(f"  Test:  index={test_index}")
  train_fold_x=[]
  train_fold_y=[]
  val_fold_x=[]
  val_fold_y=[]
  for idx in train_index:
    train_fold_x.append(xdata[idx])
    train_fold_y.append(labels['diagnose'][idx])

  for idx in test_index:
    val_fold_x.append(xdata[idx])
    val_fold_y.append(labels['diagnose'][idx])

  folds_info.append((np.array(train_fold_x), np.array(val_fold_x), np.array(train_fold_y), np.array(val_fold_y)))


Fold 0:
  Train: index=[ 0  1  2  3  4  5  7  9 10 11 14 16 18 19 20 21 22 23 24 25 26 27 28 29
 30 31 32 33 35 37 38 39]
  Test:  index=[ 6  8 12 13 15 17 34 36]
Fold 1:
  Train: index=[ 1  2  3  4  5  6  7  8  9 10 12 13 14 15 16 17 18 19 20 21 23 24 26 27
 28 29 30 31 33 34 36 39]
  Test:  index=[ 0 11 22 25 32 35 37 38]
Fold 2:
  Train: index=[ 0  1  3  5  6  7  8  9 10 11 12 13 14 15 17 18 19 20 22 23 24 25 26 28
 32 33 34 35 36 37 38 39]
  Test:  index=[ 2  4 16 21 27 29 30 31]
Fold 3:
  Train: index=[ 0  1  2  3  4  6  8  9 10 11 12 13 15 16 17 21 22 24 25 26 27 28 29 30
 31 32 34 35 36 37 38 39]
  Test:  index=[ 5  7 14 18 19 20 23 33]
Fold 4:
  Train: index=[ 0  2  4  5  6  7  8 11 12 13 14 15 16 17 18 19 20 21 22 23 25 27 29 30
 31 32 33 34 35 36 37 38]
  Test:  index=[ 1  3  9 10 24 26 28 39]


In [None]:
len(folds_info), len(folds_info[0])

(5, 4)

In [None]:
fold_0_X_train=folds_info[0][0]
fold_0_y_train=folds_info[0][2]

print(fold_0_X_train.shape)
print(fold_0_y_train.shape)

fold_1_X_train=folds_info[1][0]
fold_1_y_train=folds_info[1][2]

print(fold_1_X_train.shape)
print(fold_1_y_train.shape)

(32, 2, 64, 64, 64)
(32,)
(32, 2, 64, 64, 64)
(32,)


#### select fold

fold_1

In [None]:
i=0
X_train, X_test, y_train, y_test=folds_info[i][0], folds_info[i][1], folds_info[i][2], folds_info[i][3]

fold_2

In [None]:
i=1
X_train, X_test, y_train, y_test=folds_info[i][0], folds_info[i][1], folds_info[i][2], folds_info[i][3]

fold_3

In [None]:
i=2
X_train, X_test, y_train, y_test=folds_info[i][0], folds_info[i][1], folds_info[i][2], folds_info[i][3]

fold_4

In [None]:
i=3
X_train, X_test, y_train, y_test=folds_info[i][0], folds_info[i][1], folds_info[i][2], folds_info[i][3]

fold_5

In [None]:
i=4
X_train, X_test, y_train, y_test=folds_info[i][0], folds_info[i][1], folds_info[i][2], folds_info[i][3]

###Create dataloaders and extract features

In [None]:
mean, std = calculate_mean_std(X_train)
print("Mean for each channel:", mean)
print("Standard deviation for each channel:", std)

Mean for each channel: [12.50556362  3.101349  ]
Standard deviation for each channel: [26.98893801 17.80137844]


In [None]:
# Define transformations for data augmentation
# We only need and use validation transformation
val_transforms = transforms.Compose([
    ToTensor(),
    Normalize3D(mean, std)
])

In [None]:
# Create custom datasets with transformations
train_dataset = CustomDataset(X_train, y_train, transform=val_transforms)
val_dataset = CustomDataset(X_test, y_test, transform=val_transforms)  # Apply validation transformation

print(f"There are {len(train_dataset)} train images and {len(val_dataset)} val images")

There are 32 train images and 8 val images


In [None]:
print('train dataset', np.unique(train_dataset.labels, return_counts=True))
print('test dataset', np.unique(val_dataset.labels, return_counts=True))

train dataset (array([1, 2, 3, 4]), array([8, 8, 8, 8]))
test dataset (array([1, 2, 3, 4]), array([2, 2, 2, 2]))


In [None]:
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=1, num_workers=0, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1, num_workers=0, shuffle=False)

#### Feature extraction

In [None]:
loaded_model.eval()  # Set the model to evaluation mode

# Define a function to extract features from the ResNet model
def extract_features(model, dataloader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    features = []
    labels = []
    with torch.no_grad():
        for i, batch in enumerate(dataloader):
        # for inputs, targets in dataloader:
            inputs, targets = batch['input'].to(device), batch['label'].to(device)
            # Forward pass through the ResNet model
            outputs = model(inputs)
            temp = model.feature_extraction(inputs)
            # Extract features before the fully connected layer
            features.append(temp.numpy())
            labels.append(targets.numpy())
    return torch.tensor(features), torch.tensor(labels)

# Extract features from the training and validation datasets
train_features, train_labels = extract_features(loaded_model, train_loader)

val_features, val_labels = extract_features(loaded_model, val_loader)

# Flatten the features
train_features = train_features.view(train_features.size(0), -1)
train_labels = train_labels.view(train_labels.size(0))
val_features = val_features.view(val_features.size(0), -1)
val_labels = val_labels.view(val_labels.size(0))

print(train_features.shape)
print(train_labels.shape)
print(val_features.shape)
print(val_labels.shape)

torch.Size([32, 512])
torch.Size([32])
torch.Size([8, 512])
torch.Size([8])


### SVM classifier

In [None]:
from sklearn.metrics import confusion_matrix, precision_score, recall_score

# Train an SVM classifier
svm_classifier = svm.SVC(kernel='linear')
svm_classifier.fit(train_features.numpy(), train_labels.numpy())

# Predict using the SVM classifier
predicted_labels = svm_classifier.predict(val_features.numpy())

# Calculate accuracy
accuracy = accuracy_score(val_labels.numpy(), predicted_labels)
print("Validation Accuracy:", accuracy)

# Calculate confusion matrix
conf_matrix = confusion_matrix(val_labels.numpy(), predicted_labels)
print("Confusion Matrix:")
print(conf_matrix)

# Calculate precision and recall
precision = precision_score(val_labels.numpy(), predicted_labels, average=None)
recall = recall_score(val_labels.numpy(), predicted_labels, average=None)

print("Precision:", precision)
print("Recall:", recall)

Validation Accuracy: 1.0
Confusion Matrix:
[[2 0 0 0]
 [0 2 0 0]
 [0 0 2 0]
 [0 0 0 2]]
Precision: [1. 1. 1. 1.]
Recall: [1. 1. 1. 1.]


In [None]:
test_preds5 = svm_classifier.predict(test_features)
print(test_preds5.shape)

(41,)


In [None]:
print(test_preds1+1)
print(test_preds2+1)
print(test_preds3+1)
print(test_preds4+1)
print(test_preds5+1)

[3 2 3 1 1 1 2 2 3 2 3 1 2 4 2 4 1 1 1 3 1 3 3 2 3 3 4 2 2 2 1 2 2 1 1 3 3
 1 3 3 4]
[3 2 3 1 1 1 1 3 3 2 3 1 2 1 2 4 1 1 1 3 1 3 3 2 3 2 4 1 2 2 1 2 2 1 1 3 3
 1 1 3 4]
[3 2 3 1 1 1 1 3 3 2 3 1 2 1 2 4 1 1 1 3 1 3 3 2 3 2 4 1 2 2 1 2 2 1 1 3 3
 1 1 3 2]
[2 2 3 1 1 1 1 3 3 2 1 1 2 1 2 4 1 1 1 3 1 3 3 2 3 2 4 1 2 2 1 2 2 1 1 3 3
 1 1 3 4]
[3 2 3 1 1 1 1 3 3 2 3 1 2 1 2 4 1 1 1 3 1 3 3 2 3 3 2 1 2 2 1 2 2 1 1 3 3
 1 1 3 2]


Ensemble test results prediction

In [None]:
pred1_np = np.array(test_preds1+1)
pred2_np = np.array(test_preds2+1)
pred3_np = np.array(test_preds3+1)
pred4_np = np.array(test_preds4+1)
pred5_np = np.array(test_preds5+1)


preds = np.column_stack((pred1_np, pred2_np,pred3_np,pred4_np,pred5_np))
print(preds.shape)

(41, 5)


In [None]:
from scipy.stats import mode
modes, counts = mode(preds, axis=1)
most_frequent_values = modes.squeeze()

print(most_frequent_values)

[3 2 3 1 1 1 1 3 3 2 3 1 2 1 2 4 1 1 1 3 1 3 3 2 3 2 4 1 2 2 1 2 2 1 1 3 3
 1 1 3 4]


Save the DataFrame to a CSV file, including patient number

In [None]:
index_array = np.arange(1, most_frequent_values.shape[0]+1)
combined_array = np.column_stack((index_array, most_frequent_values))

df = pd.DataFrame(combined_array, columns=['patient_number', 'SVM_Kfold'])
df.to_csv("patient_predictions.csv", index=False)


## Final model
Train our final model on all of training data.
Predict labels for test data that we don't have their labels and save them on CSV file.

### Load the test data

In [None]:
# path to train datasets, labels and VoI template files

test_rCBF = Path("/content/drive/MyDrive/Assignment/test_images_rcbf.nii")
test_DAT = Path("/content/drive/MyDrive/Assignment/test_images_sbr.nii")

In [None]:
# Load NIfTI and extract image data

test_PET_rCBF = nib.load(test_rCBF)
test_data_rCBF = test_PET_rCBF.get_fdata()

test_PET_DAT = nib.load(test_DAT)
test_data_DAT = test_PET_DAT.get_fdata()

### Dataset creation, feature engineering and feature normalization

In [None]:
xdata = create_xdata(train_data_rCBF, train_data_DAT)
print(xdata.shape)

(40, 2, 64, 64, 64)


In [None]:
xdata_test = create_xdata(test_data_rCBF, test_data_DAT)
print(xdata_test.shape)

(41, 2, 64, 64, 64)


In [None]:
# This time we are going to train our model on all of training data,
# so we calcualte mean and std for all of them.

mean, std = calculate_mean_std(xdata)
print("Mean for each channel:", mean.shape)
print("Standard deviation for each channel:", std.shape)

Mean for each channel: (2,)
Standard deviation for each channel: (2,)


In [None]:
train_dataset = CustomDataset(xdata, np.array(labels['diagnose']), transform=val_transforms)
test_dataset = CustomDataset(xdata_test, np.zeros((xdata_test.shape[0])), transform=val_transforms)

print(f"There are {len(train_dataset)} train images and {len(test_dataset)} test images")

There are 40 train images and 41 test images


In [None]:
print('train dataset', np.unique(train_dataset.labels, return_counts=True))
print('test_dataset', np.unique(test_dataset.labels, return_counts=True))

train dataset (array([1, 2, 3, 4]), array([10, 10, 10, 10]))
test_dataset (array([0.]), array([41]))


In [None]:
train_loader = DataLoader(train_dataset, batch_size=1, num_workers=0, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, num_workers=0, shuffle=False)

In [None]:
test_dataset = CustomDataset(xdata_test, np.zeros((xdata_test.shape[0])), transform=val_transforms)

print(f"There are {len(test_dataset)} test images")

There are 41 test images


In [None]:
test_loader = DataLoader(test_dataset, batch_size=1, num_workers=0, shuffle=False)
print('test dataset', np.unique(test_dataset.labels, return_counts=True))

test dataset (array([0.]), array([41]))


In [None]:
test_features, test_labels = extract_features(loaded_model, test_loader)

test_features = test_features.view(test_features.size(0), -1)
test_labels = test_labels.view(test_labels.size(0))

print(test_features.shape)
print(test_labels.shape)

torch.Size([41, 512])
torch.Size([41])


In [None]:
train_features, train_labels = extract_features(loaded_model, train_loader)
test_features, test_labels = extract_features(loaded_model, test_loader)

# Flatten the features
train_features = train_features.view(train_features.size(0), -1)
train_labels = train_labels.view(train_labels.size(0))
test_features = test_features.view(test_features.size(0), -1)
test_labels = test_labels.view(test_labels.size(0))

print(train_features.shape)
print(train_labels.shape)
print(test_features.shape)
print(test_labels.shape)

torch.Size([40, 512])
torch.Size([40])
torch.Size([41, 512])
torch.Size([41])


### Train SVM model on all the train data, predict on test and save to file

In [None]:
# Train an SVM classifier
svm_classifier2 = svm.SVC(kernel='linear')
svm_classifier2.fit(train_features.numpy(), train_labels.numpy()+1)

In [None]:
test_preds2 = svm_classifier2.predict(test_features)
print(test_preds2.shape)

(41,)


In [None]:
# Save the DataFrame to a CSV file, including patient number

index_array = np.arange(1, test_preds2.shape[0]+1)
combined_array = np.column_stack((index_array, test_preds2))

df = pd.DataFrame(combined_array, columns=['patient_number', 'SVM_predication'])
df.to_csv("patient_predictions.csv", index=False)
