In [1]:
# Torch-related imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, Subset

# Scikit-learn-related imports
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import confusion_matrix, accuracy_score

# Nibabel and Scipy imports (for handling fMRI and image processing)
import nibabel as nib
import scipy.ndimage as ndimage  # For smoothing

# NumPy, Matplotlib, and Seaborn (for data manipulation and visualization)
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# OS for file system operations
import os

In [2]:
os.environ['CUDA_LAUNCH_BLOCKING'] = '0'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [3]:
# from google.colab import drive
# drive.mount('/content/drive')

# root_dir = os.path.join('/content/drive', 'My Drive', 'UCR', '2-2024', 'InvCC', 'ADHD200', 'Datasets', 'preprocessed')

root_dir = os.path.join('preprocessed')

# Carpetas para TDC y ADHD
tdc_dir = os.path.join(root_dir, 'TDC')
adhd_dir = os.path.join(root_dir, 'ADHD')

# Para guardar el estado del autoencoder
save_path = os.path.join(root_dir, 'autoencoder.pt')

# Listas para almacenar las rutas de archivos
tdc_file_paths = [os.path.join(tdc_dir, file) for file in os.listdir(tdc_dir) if file.endswith('.nii.gz')]
adhd_file_paths = [os.path.join(adhd_dir, file) for file in os.listdir(adhd_dir) if file.endswith('.nii.gz')]

# Etiquetas correspondientes
tdc_labels = [0] * len(tdc_file_paths)
adhd_labels = [1] * len(adhd_file_paths)

# Combinar rutas de archivos y etiquetas
file_paths = tdc_file_paths + adhd_file_paths
labels = tdc_labels + adhd_labels

# Preprocessing

In [4]:
class FMRI_Dataset(Dataset):
    def __init__(self, file_paths, labels, max_shape, smoothing_sigma=1):
        self.file_paths = file_paths  # List of paths to the fMRI data files
        self.labels = labels  # Corresponding labels
        self.max_shape = max_shape  # Shape to pad all inputs to
        self.smoothing_sigma = smoothing_sigma  # Standard deviation for Gaussian smoothing

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        # Load fMRI data using NiBabel
        fmri_img = nib.load(self.file_paths[idx])
        data = fmri_img.get_fdata()  # Extract the fMRI data as a NumPy array

        # Apply smoothing
        # data = self.smooth_data(data)

        # Normalize the data
        data = self.normalize_data(data)

        # Convert NumPy array to a PyTorch tensor
        data = torch.tensor(data, dtype=torch.float32)

        # Get the label
        label = torch.tensor(self.labels[idx])

        # Pad the tensor to the max_shape
        data_padded = F.pad(data, pad=self.calculate_padding(data.shape), mode='constant', value=0)

        return data_padded, label

    def calculate_padding(self, current_shape):
        padding = []
        for current_dim, max_dim in zip(reversed(current_shape), reversed(self.max_shape)):
            pad_total = max_dim - current_dim
            padding.append(pad_total // 2)  # pad_left or pad_top, etc.
            padding.append(pad_total - (pad_total // 2))  # pad_right or pad_bottom, etc.

        return padding

    def normalize_data(self, data):
        """Normalize the data to zero mean and unit variance."""
        mean = data.mean()
        std = data.std()
        if std > 0:  # Avoid division by zero
            data = (data - mean) / std
        return data

    def smooth_data(self, data):
        """Apply Gaussian smoothing to the data."""
        return ndimage.gaussian_filter(data, sigma=self.smoothing_sigma)

In [5]:
# Determine the maximum shape across all tensors
max_shape = [1, 53, 64, 46, 512]

# Create the dataset with padded tensors
dataset = FMRI_Dataset(file_paths, labels, max_shape)

sample_data, sample_label = dataset[0]
print(sample_data.shape, sample_label)

torch.Size([53, 64, 46, 512]) tensor(0)


In [6]:
# Crear el objeto StratifiedShuffleSplit para dividir en conjuntos de entrenamiento y prueba
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

# Dividir en entrenamiento y prueba
for train_val_idx, test_idx in sss.split(range(len(dataset)), dataset.labels):
    train_val_set = torch.utils.data.Subset(dataset, train_val_idx)
    testset = torch.utils.data.Subset(dataset, test_idx)

# Ahora crear un segundo StratifiedShuffleSplit para los conjuntos de entrenamiento y validación
sss_val = StratifiedShuffleSplit(n_splits=1, test_size=0.25, random_state=42)  # 0.25 de 0.8 es 0.2 total para validación

# Acceder a las etiquetas del conjunto de entrenamiento y validación
train_val_labels = [dataset.labels[i] for i in train_val_idx]  # Obtener las etiquetas usando una lista de comprensión

for train_idx, val_idx in sss_val.split(range(len(train_val_set)), train_val_labels):
    trainset = torch.utils.data.Subset(train_val_set, train_idx)
    valset = torch.utils.data.Subset(train_val_set, val_idx)

# Crear los dataloaders
batch_size = 1
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=0)
valloader = DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=0)
testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=0)

class_names = ["TDC", "ADHD"]

# Iterate over the dataloaders to verify they are being processed correctly
for batch in trainloader:
    if batch[0] is not None:
        images, _ = batch
        break
        # print(f"Shape of inputs: {images.shape}, Shape of labels: {_}")


for batch in testloader:
    if batch[0] is not None:
        images, _ = batch
        break
        # print(f"Shape of inputs: {images.shape}, Shape of labels: {_}")


In [7]:
print(f"Number of samples in valloader: {len(valloader.dataset)}")

Number of samples in valloader: 34


In [8]:
print(f"Number of samples in trainloader: {len(trainloader.dataset)}")

Number of samples in trainloader: 102


In [9]:
print(f"Number of samples in testloader: {len(testloader.dataset)}")

Number of samples in testloader: 34


# CNN-AE

In [7]:
class CNN_Autoencoder(nn.Module):
    def __init__(self):
        super(CNN_Autoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv3d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Dropout3d(0.2),
            nn.Conv3d(16, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Dropout3d(0.2),
            nn.Conv3d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Dropout3d(0.2),
            nn.Conv3d(128, 256, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose3d(256, 128, kernel_size=3, stride=2, padding=(1, 1, 1), output_padding=(0, 1, 0)),
            nn.ReLU(),
            nn.ConvTranspose3d(128, 64, kernel_size=3, stride=2, padding=(1, 1, 1), output_padding=(1, 0, 1)),
            nn.ReLU(),
            nn.ConvTranspose3d(64, 16, kernel_size=3, stride=2, padding=(1, 0, 0), output_padding=(0, 0, 1)),
            nn.ReLU(),
            nn.ConvTranspose3d(16, 1, kernel_size=3, stride=2, padding=(1, 0, 0), output_padding=(0, 1, 1)),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Example usage
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder = CNN_Autoencoder().to(device)

# Generate random input matching new shape [1, 1, 53, 64] (Batch size 1)
inputs = torch.rand((1, 53, 64, 46)).to(device)  # Example input
output = autoencoder(inputs)
print(output.shape)  # should match the input shape [1, 53, 64, 46]

# Loss and optimizer
criterion = nn.MSELoss()  # Since it's an autoencoder, Mean Squared Error is commonly used
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001, weight_decay=1e-5)

torch.Size([1, 53, 64, 46])


# CNN

### Using pretrained weights

In [21]:
# Model definition

class CNNOnEncoder(nn.Module):
    def __init__(self, autoencoder, num_classes):
        super(CNNOnEncoder, self).__init__()
        self.encoder = autoencoder.encoder  # Use the encoder from the autoencoder
        self.conv1 = nn.Conv3d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1)
        self.dropout1 = nn.Dropout(0.2)  # Dropout after first convolution
        self.conv2 = nn.Conv3d(in_channels=512, out_channels=1024, kernel_size=3, stride=1, padding=1)
        self.dropout2 = nn.Dropout(0.2)  # Dropout after second convolution
        self.conv3 = nn.Conv3d(in_channels=1024, out_channels=512, kernel_size=3, stride=1, padding=1)
        self.dropout3 = nn.Dropout(0.2)  # Dropout after second convolution
        self.pool = nn.AdaptiveMaxPool3d(output_size=(4, 4, 3))
        self.fc1 = nn.Linear(512 * 4 * 4 * 3, 256)
        self.dropout4 = nn.Dropout(0.2)  # Dropout after first fully connected layer
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        # print(x.shape)
        x = self.encoder(x)
        # print(f'Encoder {x.shape}')
        x = self.pool(F.relu(self.conv1(x)))  # First conv + pooling
        x = self.dropout1(x)
        # print(f'First conv + pooling {x.shape}')
        x = self.pool(F.relu(self.conv2(x)))  # Second conv + pooling
        x = self.dropout2(x)
        # print(f'Second conv + pooling {x.shape}')
        x = self.pool(F.relu(self.conv3(x)))  # Second conv + pooling
        x = self.dropout3(x)
        x = x.unsqueeze(0)
        x = x.view(x.size(0), -1)  # Flatten
        # print(f'Flattened {x.shape}')
        x = F.relu(self.fc1(x))  # First fully connected layer
        x = self.dropout4(x)
        x = self.fc2(x)  # Output layer
        return x


In [12]:
# Load trained autoencoder
trained_autoencoder = CNN_Autoencoder()
trained_autoencoder.load_state_dict(torch.load(save_path, weights_only=True))
trained_autoencoder.to(device)
trained_autoencoder.eval() # Set the model to evaluation mode

# Create an instance of the new model
cnn_with_ae_model = CNNOnEncoder(trained_autoencoder, num_classes=2).to(device)

# Optionally, freeze the encoder layers, Frozen= false, Unfrozen= true
for param in cnn_with_ae_model.encoder.parameters():
    param.requires_grad = True

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(cnn_with_ae_model.parameters(), lr=0.005)

In [13]:
num_epochs = 100
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training phase
    cnn_with_ae_model.train()
    total_loss = 0.0
    all_labels = []
    all_preds = []

    for batch_idx, (inputs, labels) in enumerate(trainloader):
        inputs_reduced = torch.mean(inputs, dim=-1)
        inputs = inputs_reduced

        # Send inputs and labels to the device (GPU if available)
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = cnn_with_ae_model(inputs)

        loss = criterion(output, labels)

        # Backward pass
        loss.backward()

        # Update model parameters
        optimizer.step()

        # Accumulate the loss
        total_loss += loss.item()

        # Store predictions and labels for accuracy calculation
        _, preds = torch.max(output, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

    # Calculate training accuracy and average loss for the epoch
    train_accuracy = accuracy_score(all_labels, all_preds)
    avg_train_loss = total_loss / len(trainloader)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

    # Validation phase
    cnn_with_ae_model.eval()
    val_loss = 0.0
    val_labels = []
    val_preds = []

    with torch.no_grad():
        for val_inputs, val_labels_batch in valloader:
            val_inputs_reduced = torch.mean(val_inputs, dim=-1)
            val_inputs = val_inputs_reduced.to(device)
            val_labels_batch = val_labels_batch.to(device)

            # Forward pass for validation data
            val_outputs = cnn_with_ae_model(val_inputs)
            loss = criterion(val_outputs, val_labels_batch)
            val_loss += loss.item()

            # Store predictions and labels for accuracy calculation
            _, val_preds_batch = torch.max(val_outputs, 1)
            val_labels.extend(val_labels_batch.cpu().numpy())
            val_preds.extend(val_preds_batch.cpu().numpy())

    # Calculate validation accuracy and average loss
    avg_val_loss = val_loss / len(valloader)
    val_accuracy = accuracy_score(val_labels, val_preds)

    print(f'Epoch [{epoch+1}/{num_epochs}], Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

    # Save the model if validation loss improves
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(cnn_with_ae_model.state_dict(), 'best_cnn_with_ae.pt')
        print(f'Model saved at epoch {epoch+1} with validation loss: {avg_val_loss:.4f}')

    # Save the model after each epoch
    torch.save(cnn_with_ae_model.state_dict(), f'cnn_with_ae_epoch{epoch}.pt')

# Save the final trained model
torch.save(cnn_with_ae_model.state_dict(), 'cnn_with_ae_final.pt')

Epoch [1/100], Train Loss: 350522.7190, Train Accuracy: 0.5686
Epoch [1/100], Val Loss: 4.8463, Val Accuracy: 0.5588
Model saved at epoch 1 with validation loss: 4.8463
Epoch [2/100], Train Loss: 121.9081, Train Accuracy: 0.5686
Epoch [2/100], Val Loss: 1.9188, Val Accuracy: 0.4412
Model saved at epoch 2 with validation loss: 1.9188
Epoch [3/100], Train Loss: 25.8219, Train Accuracy: 0.4804
Epoch [3/100], Val Loss: 0.6865, Val Accuracy: 0.5588
Model saved at epoch 3 with validation loss: 0.6865


KeyboardInterrupt: 

In [None]:
# Testing loop

# Load trained autoencoder
trained_autoencoder = CNN_Autoencoder()
trained_autoencoder.load_state_dict(torch.load(save_path, weights_only=True))
trained_autoencoder.to(device)
trained_autoencoder.eval() # Set the model to evaluation mode

# Load the model
num_classes = 2

classifier_model = CNNOnEncoder(trained_autoencoder, num_classes=2).to(device)  # Initialize your classifier model
model_state_dict = torch.load('cnn_with_ae_final.pt', weights_only=False)
classifier_model.load_state_dict(model_state_dict)

# Set the model to evaluation mode
classifier_model.eval()

# Define criterion (loss function)
criterion = nn.CrossEntropyLoss()

# Test the model
correct = 0
total = 0
total_loss = 0.0
all_labels = []
all_preds = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs_reduced = torch.mean(inputs, dim=-1)
        inputs = inputs_reduced

        # Send inputs and labels to the device (GPU if available)
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = classifier_model(inputs)
        # print(outputs.shape)

        loss = criterion(output, labels)

        total_loss += loss.item()
        _, predicted = torch.max(output, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Store predictions and labels for confusion matrix
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

# Calculate accuracy and average loss
accuracy = 100 * correct / total
average_loss = total_loss / len(testloader)

# Calculate confusion matrix
conf_matrix = confusion_matrix(all_labels, all_preds)

# Plot confusion matrix using seaborn
fig, ax = plt.subplots(figsize=(6, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", ax=ax,
            xticklabels=[f'Class {i}' for i in range(num_classes)],
            yticklabels=[f'Class {i}' for i in range(num_classes)])
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
ax.set_title('Confusion Matrix')

# Print accuracy and average loss
print(f'Accuracy of the CNN with autoencoder on the test images: {accuracy:.2f}%')
print(f'Average loss on the test images: {average_loss:.4f}')


### Not using pretrained weights

In [15]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv3d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.dropout1 = nn.Dropout(0.2)  # Dropout after first convolution
        self.conv2 = nn.Conv3d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.dropout2 = nn.Dropout(0.2)  # Dropout after second convolution
        self.pool = nn.MaxPool3d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv3d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.dropout3 = nn.Dropout(0.2)  # Dropout after third convolution
        self.fc1 = nn.Linear(64 * 6 * 8 * 5, 256)
        self.dropout4 = nn.Dropout(0.2)  # Dropout after first fully connected layer
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # First conv + pooling
        x = self.dropout1(x)
        # print(f'First conv + pooling {x.shape}')
        x = self.pool(F.relu(self.conv2(x)))  # Second conv + pooling
        x = self.dropout2(x)
        # print(f'Second conv + pooling {x.shape}')
        x = self.pool(F.relu(self.conv3(x)))  # Second conv + pooling
        x = self.dropout3(x)
        # print(f'Third conv + pooling {x.shape}')
        x = x.unsqueeze(0)
        x = x.view(x.size(0), -1)  # Flatten
        # print(f'Flattened {x.shape}')
        x = F.relu(self.fc1(x))  # First fully connected layer
        x = self.dropout4(x)
        x = self.fc2(x)  # Output layer
        return x

In [19]:
simple_cnn_model = SimpleCNN(num_classes=2).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(simple_cnn_model.parameters(), lr=0.005, momentum=0.8)

example_input = torch.randn(1, 53, 64, 46)
example_output = simple_cnn_model(example_input.to(device))
print("Output shape:", example_output.shape)

Output shape: torch.Size([1, 2])


In [None]:
# Training loop

num_epochs = 100
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training phase
    simple_cnn_model.train()
    total_loss = 0.0
    all_labels = []
    all_preds = []

    for batch_idx, (inputs, labels) in enumerate(trainloader):
        inputs_reduced = torch.mean(inputs, dim=-1)
        inputs = inputs_reduced

        # Send inputs and labels to the device (GPU if available)
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = simple_cnn_model(inputs)

        loss = criterion(output, labels)

        # Backward pass
        loss.backward()

        # Update model parameters
        optimizer.step()

        # Accumulate the loss
        total_loss += loss.item()

        # Store predictions and labels for accuracy calculation
        _, preds = torch.max(output, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

    # Calculate training accuracy and average loss for the epoch
    train_accuracy = accuracy_score(all_labels, all_preds)
    avg_train_loss = total_loss / len(trainloader)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

    # Validation phase
    simple_cnn_model.eval()
    val_loss = 0.0
    val_labels = []
    val_preds = []

    with torch.no_grad():
        for val_inputs, val_labels_batch in valloader:
            val_inputs_reduced = torch.mean(val_inputs, dim=-1)
            val_inputs = val_inputs_reduced.to(device)
            val_labels_batch = val_labels_batch.to(device)

            # Forward pass for validation data
            val_outputs = simple_cnn_model(val_inputs)
            loss = criterion(val_outputs, val_labels_batch)
            val_loss += loss.item()

            # Store predictions and labels for accuracy calculation
            _, val_preds_batch = torch.max(val_outputs, 1)
            val_labels.extend(val_labels_batch.cpu().numpy())
            val_preds.extend(val_preds_batch.cpu().numpy())

    # Calculate validation accuracy and average loss
    avg_val_loss = val_loss / len(valloader)
    val_accuracy = accuracy_score(val_labels, val_preds)

    print(f'Epoch [{epoch+1}/{num_epochs}], Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

    # Save the model if validation loss improves
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(simple_cnn_model.state_dict(), 'best_simple_cnn.pt')
        print(f'Model saved at epoch {epoch+1} with validation loss: {avg_val_loss:.4f}')

    # Save the model after each epoch
    torch.save(simple_cnn_model.state_dict(), f'simple_cnn_epoch{epoch}.pt')

# Save the final trained model
torch.save(simple_cnn_model.state_dict(), 'simple_cnn_final.pt')

In [None]:
# Testing loop

# Load the model
classifier_model = SimpleCNN(num_classes=2).to(device)  # Initialize your classifier model
model_state_dict = torch.load('simple_cnn_final.pt', weights_only=False)
classifier_model.load_state_dict(model_state_dict)

# Set the model to evaluation mode
classifier_model.eval()

# Define criterion (loss function)
criterion = nn.CrossEntropyLoss()

# Test the model
correct = 0
total = 0
total_loss = 0.0
all_labels = []
all_preds = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs_reduced = torch.mean(inputs, dim=-1)
        inputs = inputs_reduced

        # Send inputs and labels to the device (GPU if available)
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = simple_cnn_model(inputs)
        # print(outputs.shape)

        loss = criterion(output, labels)

        total_loss += loss.item()
        _, predicted = torch.max(output, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Store predictions and labels for confusion matrix
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

# Calculate accuracy and average loss
accuracy = 100 * correct / total
average_loss = total_loss / len(testloader)

# Calculate confusion matrix
conf_matrix = confusion_matrix(all_labels, all_preds)

# Plot confusion matrix using seaborn
fig, ax = plt.subplots(figsize=(18, 16))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", ax=ax,
            xticklabels=[f'Class {i}' for i in range(num_classes=2)],
            yticklabels=[f'Class {i}' for i in range(num_classes=2)])
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
ax.set_title('Confusion Matrix')

# Print accuracy and average loss
print(f'Accuracy of the simple CNN on the test images: {accuracy:.2f}%')
print(f'Average loss on the test images: {average_loss:.4f}')


# CNN-LSTM

### Using pretrained weights

In [None]:
class CNNLSTMOnAutoencoder(nn.Module):
    def __init__(self, autoencoder, hidden_size, num_classes):
        super(CNNLSTMOnAutoencoder, self).__init__()
        self.encoder = autoencoder.encoder
        self.conv1 = nn.Conv3d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.dropout1 = nn.Dropout(0.3)
        self.conv2 = nn.Conv3d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.dropout2 = nn.Dropout(0.3)
        self.pool = nn.MaxPool3d(kernel_size=2, stride=2)
        self.lstm = None  # To be defined later after determining input size
        self.fc = None  # To be defined later after determining LSTM output size

    def forward(self, x):
        # x should have shape (batch_size, channels, height, width, depth)
        # batch_size, channels, H, W, D = x.size()  # (1, 1, 53, 64, 46)

        # Apply convolutional layers with ReLU and max pooling
        x = self.pool(F.relu(self.conv1(x)))  # Shape: (batch_size, 16, new_H, new_W, new_D)
        x = self.pool(F.relu(self.conv2(x)))  # Shape: (batch_size, 32, new_H, new_W, new_D)

        # Calculate the LSTM input size based on the output shape from convolutions
        conv_output_shape = x.shape  # (batch_size, channels, new_height, new_width, new_depth)
        lstm_input_size = conv_output_shape[1] * conv_output_shape[2] * conv_output_shape[3] * conv_output_shape[4]  # channels * height * width * depth

        # Reshape for LSTM input (seq_len should be the number of slices or frames)
        # Here, we can treat one of the dimensions as the sequence length. Assuming D is the sequence length.
        x = x.view(batch_size, -1, lstm_input_size)

        # Define LSTM and Linear layers with dynamic input size
        if self.lstm is None:
            self.lstm = nn.LSTM(input_size=lstm_input_size, hidden_size=128, batch_first=True).to(device)
            self.fc = nn.Linear(in_features=128, out_features=2).to(device)

        # LSTM layer
        lstm_out, _ = self.lstm(x)
        out = self.fc(lstm_out[:, -1, :])  # Take the output of the last time step
        return out


In [None]:
# Load trained autoencoder
trained_autoencoder = CNN_Autoencoder().to(device)
trained_autoencoder.load_state_dict(torch.load(save_path, weights_only=True))
trained_autoencoder.eval() # Set the model to evaluation mode

# Model with pretrained weights
cnn_lstm_with_ae = CNNLSTMOnAutoencoder(trained_autoencoder, 128, 2).to(device)

# Optionally, freeze the encoder layers, Frozen= false, Unfrozen= true
for param in cnn_lstm_with_ae.encoder.parameters():
    param.requires_grad = True

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(cnn_lstm_with_ae.parameters(), lr=0.001, momentum=0.9)

In [None]:
# Training loop

num_epochs = 100
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training phase
    cnn_lstm_with_ae.train()
    total_loss = 0.0
    all_labels = []
    all_preds = []

    for batch_idx, (inputs, labels) in enumerate(trainloader):
        inputs_reduced = torch.mean(inputs, dim=-1)
        inputs = inputs_reduced

        # Send inputs and labels to the device (GPU if available)
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = cnn_lstm_with_ae(inputs)

        loss = criterion(output, labels)

        # Backward pass
        loss.backward()

        # Update model parameters
        optimizer.step()

        # Accumulate the loss
        total_loss += loss.item()

        # Store predictions and labels for accuracy calculation
        _, preds = torch.max(output, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

    # Calculate training accuracy and average loss for the epoch
    train_accuracy = accuracy_score(all_labels, all_preds)
    avg_train_loss = total_loss / len(trainloader)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

    # Validation phase
    cnn_lstm_with_ae.eval()
    val_loss = 0.0
    val_labels = []
    val_preds = []

    with torch.no_grad():
        for val_inputs, val_labels_batch in valloader:
            val_inputs_reduced = torch.mean(val_inputs, dim=-1)
            val_inputs = val_inputs_reduced.to(device)
            val_labels_batch = val_labels_batch.to(device)

            # Forward pass for validation data
            val_outputs = cnn_lstm_with_ae(val_inputs)
            loss = criterion(val_outputs, val_labels_batch)
            val_loss += loss.item()

            # Store predictions and labels for accuracy calculation
            _, val_preds_batch = torch.max(val_outputs, 1)
            val_labels.extend(val_labels_batch.cpu().numpy())
            val_preds.extend(val_preds_batch.cpu().numpy())

    # Calculate validation accuracy and average loss
    avg_val_loss = val_loss / len(valloader)
    val_accuracy = accuracy_score(val_labels, val_preds)

    print(f'Epoch [{epoch+1}/{num_epochs}], Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

    # Save the model if validation loss improves
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(cnn_lstm_with_ae.state_dict(), 'best_cnnlstm_with_ae.pt')
        print(f'Model saved at epoch {epoch+1} with validation loss: {avg_val_loss:.4f}')

    # Save the model after each epoch
    torch.save(cnn_lstm_with_ae.state_dict(), f'cnnlstm_with_ae_epoch{epoch}.pt')

# Save the final trained model
torch.save(cnn_lstm_with_ae.state_dict(), 'cnnlstm_with_ae_final.pt')


In [None]:
# Testing loop

# Load trained autoencoder
trained_autoencoder = CNN_Autoencoder().to(device)
trained_autoencoder.load_state_dict(torch.load(save_path, weights_only=True))
trained_autoencoder.eval() # Set the model to evaluation mode

# Load the model
classifier_model = CNNLSTMOnAutoencoder(trained_autoencoder, hidden_size=128, num_classes=2).to(device)  # Initialize your classifier model
model_state_dict = torch.load('cnnlstm_with_ae_final.pt', weights_only=False)
classifier_model.load_state_dict(model_state_dict)

# Set the model to evaluation mode
classifier_model.eval()

# Define criterion (loss function)
criterion = nn.CrossEntropyLoss()

# Test the model
correct = 0
total = 0
total_loss = 0.0
all_labels = []
all_preds = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs_reduced = torch.mean(inputs, dim=-1)
        inputs = inputs_reduced

        # Send inputs and labels to the device (GPU if available)
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = simple_cnn_model(inputs)
        # print(outputs.shape)

        loss = criterion(output, labels)

        total_loss += loss.item()
        _, predicted = torch.max(output, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Store predictions and labels for confusion matrix
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

# Calculate accuracy and average loss
accuracy = 100 * correct / total
average_loss = total_loss / len(testloader)

# Calculate confusion matrix
conf_matrix = confusion_matrix(all_labels, all_preds)

# Plot confusion matrix using seaborn
fig, ax = plt.subplots(figsize=(18, 16))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", ax=ax,
            xticklabels=[f'Class {i}' for i in range(num_classes)],
            yticklabels=[f'Class {i}' for i in range(num_classes)])
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
ax.set_title('Confusion Matrix')

# Print accuracy and average loss
print(f'Accuracy of the CNN-LSTM with autoencoder on the test images: {accuracy:.2f}%')
print(f'Average loss on the test images: {average_loss:.4f}')

### Not using pretrained weights

In [None]:
class SimpleCNNLSTM(nn.Module):
    def __init__(self, hidden_size, num_classes):
        super(SimpleCNNLSTM, self).__init__()
        self.conv1 = nn.Conv3d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.dropout1 = nn.Dropout(0.3)
        self.conv2 = nn.Conv3d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.dropout2 = nn.Dropout(0.3)
        self.pool = nn.MaxPool3d(kernel_size=2, stride=2)
        self.lstm = None  # To be defined later after determining input size
        self.fc = None  # To be defined later after determining LSTM output size

    def forward(self, x):
        # x should have shape (batch_size, channels, height, width, depth)
        # batch_size, channels, H, W, D = x.size()  # (1, 1, 53, 64, 46)

        # Apply convolutional layers with ReLU and max pooling
        x = self.pool(F.relu(self.conv1(x)))  # Shape: (batch_size, 16, new_H, new_W, new_D)
        x = self.pool(F.relu(self.conv2(x)))  # Shape: (batch_size, 32, new_H, new_W, new_D)

        # Calculate the LSTM input size based on the output shape from convolutions
        conv_output_shape = x.shape  # (batch_size, channels, new_height, new_width, new_depth)
        lstm_input_size = conv_output_shape[1] * conv_output_shape[2] * conv_output_shape[3] * conv_output_shape[4]  # channels * height * width * depth

        # Reshape for LSTM input (seq_len should be the number of slices or frames)
        # Here, we can treat one of the dimensions as the sequence length. Assuming D is the sequence length.
        x = x.view(batch_size, -1, lstm_input_size)  # Reshape to (batch_size, seq_len, features)

        # Define LSTM and Linear layers with dynamic input size
        if self.lstm is None:
            self.lstm = nn.LSTM(input_size=lstm_input_size, hidden_size=128, batch_first=True).to(device)
            self.fc = nn.Linear(in_features=128, out_features=2).to(device)

        # LSTM layer
        lstm_out, _ = self.lstm(x)
        out = self.fc(lstm_out[:, -1, :])  # Take the output of the last time step
        return out

In [None]:
# Model without pretrained weights
simple_cnnlstm = SimpleCNNLSTM(hidden_size=128, num_classes=2).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(simple_cnnlstm.parameters(), lr=0.001, momentum=0.9)

In [None]:
# Training loop

num_epochs = 100
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training phase
    simple_cnnlstm.train()
    total_loss = 0.0
    all_labels = []
    all_preds = []

    for batch_idx, (inputs, labels) in enumerate(trainloader):
        inputs_reduced = torch.mean(inputs, dim=-1)
        inputs = inputs_reduced

        # Send inputs and labels to the device (GPU if available)
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = simple_cnnlstm(inputs)

        loss = criterion(output, labels)

        # Backward pass
        loss.backward()

        # Update model parameters
        optimizer.step()

        # Accumulate the loss
        total_loss += loss.item()

        # Store predictions and labels for accuracy calculation
        _, preds = torch.max(output, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

    # Calculate training accuracy and average loss for the epoch
    train_accuracy = accuracy_score(all_labels, all_preds)
    avg_train_loss = total_loss / len(trainloader)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

    # Validation phase
    simple_cnnlstm.eval()
    val_loss = 0.0
    val_labels = []
    val_preds = []

    with torch.no_grad():
        for val_inputs, val_labels_batch in valloader:
            val_inputs_reduced = torch.mean(val_inputs, dim=-1)
            val_inputs = val_inputs_reduced.to(device)
            val_labels_batch = val_labels_batch.to(device)

            # Forward pass for validation data
            val_outputs = simple_cnnlstm(val_inputs)
            loss = criterion(val_outputs, val_labels_batch)
            val_loss += loss.item()

            # Store predictions and labels for accuracy calculation
            _, val_preds_batch = torch.max(val_outputs, 1)
            val_labels.extend(val_labels_batch.cpu().numpy())
            val_preds.extend(val_preds_batch.cpu().numpy())

    # Calculate validation accuracy and average loss
    avg_val_loss = val_loss / len(valloader)
    val_accuracy = accuracy_score(val_labels, val_preds)

    print(f'Epoch [{epoch+1}/{num_epochs}], Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

    # Save the model if validation loss improves
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(simple_cnnlstm.state_dict(), 'best_simple_cnnlstm.pt')
        print(f'Model saved at epoch {epoch+1} with validation loss: {avg_val_loss:.4f}')

    # Save the model after each epoch
    torch.save(simple_cnnlstm.state_dict(), f'simple_cnnlstm_epoch{epoch}.pt')

# Save the final trained model
torch.save(simple_cnnlstm.state_dict(), 'simple_cnnlstm_final.pt')

In [None]:
# Testing loop

# Load the model
classifier_model = SimpleCNNLSTM(hidden_size=128, num_classes=2).to(device)  # Initialize your classifier model
model_state_dict = torch.load('simple_cnnlstm_final.pt', weights_only=False)
classifier_model.load_state_dict(model_state_dict)

# Set the model to evaluation mode
classifier_model.eval()

# Define criterion (loss function)
criterion = nn.CrossEntropyLoss()

# Test the model
correct = 0
total = 0
total_loss = 0.0
all_labels = []
all_preds = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs_reduced = torch.mean(inputs, dim=-1)
        inputs = inputs_reduced

        # Send inputs and labels to the device (GPU if available)
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = simple_cnn_model(inputs)
        # print(outputs.shape)

        loss = criterion(output, labels)

        total_loss += loss.item()
        _, predicted = torch.max(output, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Store predictions and labels for confusion matrix
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

# Calculate accuracy and average loss
accuracy = 100 * correct / total
average_loss = total_loss / len(testloader)

# Calculate confusion matrix
conf_matrix = confusion_matrix(all_labels, all_preds)

# Plot confusion matrix using seaborn
fig, ax = plt.subplots(figsize=(18, 16))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", ax=ax,
            xticklabels=[f'Class {i}' for i in range(num_classes)],
            yticklabels=[f'Class {i}' for i in range(num_classes)])
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
ax.set_title('Confusion Matrix')

# Print accuracy and average loss
print(f'Accuracy of the simple CNN-LSTM on the test images: {accuracy:.2f}%')
print(f'Average loss on the test images: {average_loss:.4f}')