# **Importing library**

In [None]:
# Install required libraries
# Install specific versions of PyTorch and torchvision compiled with the same CUDA version (11.7)
# Note: You should check the torchvision version compatible with PyTorch 2.0.0, and replace 'compatible_version' with the actual version number.
!pip install torch==2.0.0+cu117 torchvision==compatible_version+cu117 -f https://download.pytorch.org/whl/torch_stable.html
!pip install torchviz SimpleITK matplotlib monai 'monai[all]'
!pip install --upgrade monai
!pip install nibabel

# Import libraries
import SimpleITK as sitk
import numpy as np
import os
import glob
import matplotlib.pyplot as plt
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
from os import makedirs
from tqdm import tqdm
from google.colab import drive
from os import listdir
from os.path import join
from __future__ import print_function, division
import pandas as pd
from skimage import io, transform
import torch
import torch.nn.functional as F
import torch.optim as optim
import random
import torchvision.transforms as T
import torch.utils.data as data
import monai
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset, ConcatDataset
import torchvision
from torchvision.transforms import Compose, ToTensor, Normalize, Resize
from torchvision import datasets, models, transforms, utils
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from PIL import Image, UnidentifiedImageError
from monai.transforms import (
    Compose, LoadImaged, AddChanneld, RandFlipd, RandRotated, ToTensorD, CenterSpatialCropd,
    ScaleIntensityd, AsDiscreted
)


from sklearn.utils import shuffle
import warnings

warnings.filterwarnings("ignore")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# **Automated Cardiac Diagnosis Challenge**

Cardiac Magnetic Resonance Imaging is the most effective method used for detecting and treating cardiac disorders which are the world’s first cause of death. Manual analysis of cardiac images is time-consuming and prone to errors, necessitating a precise and automated segmentation approach. Convolutional Neural Networks (CNNs), particularly Fully Convolutional Networks (FCN) and U-Net, have emerged as successful methods for medical image segmentation. U-Net, a 2D CNN architecture, shows respectable accuracy by incorporating skip links. This work aims to solve the segmentation and classification task proposed by the ACDC challenge, utilizing the ACDC dataset, which includes expert manual segmentations and various clinical diagnoses.

## **Dataset preparation**

This cell performs a series of commands to facilitate the handling of a dataset. Firstly, it creates a new directory named "dataset" where the dataset will be stored. Then, it copies a file called "Resources.zip" from a Google Drive folder and pastes it into the "dataset" directory. Finally, it unzips the "Adaptiope.zip" file within the "dataset" directory, extracting its contents for further analysis and exploration. These commands enable the setup and preparation of the dataset for subsequent data manipulation tasks.

In [None]:
# Create a new directory called "dataset"
!mkdir dataset

# Copy the file "Resources.zip" from the Google Drive folder to the "dataset" directory
!cp "/content/drive/MyDrive/Resources" dataset/

# Unzip the "Adaptiope.zip" file in the "dataset" directory
!unzip /content/drive/MyDrive/Resources.zip

## **Segmentation**

The Automated Cardiac Diagnosis Challenge is a challenge that aims to compare the performance of automatic methods on the segmentation of the left ventricular endocardium and epicardium as well as the right ventricular endocardium for both end diastolic and end systolic phase instances. Segmentation of cardiac anatomical structures in cardiac magnetic resonance images (CMRI) is a prerequisite for automatic diagnosis and prognosis of cardiovascular diseases.

One study combined automatic segmentation and assessment of segmentation uncertainty in CMRI to detect image regions containing local segmentation failures. The experiments revealed that combining automatic segmentation with manual correction of detected segmentation failures resulted in improved segmentation and a 10-fold reduction of expert time compared to manual expert segmentation.

### **Pre-processing**

Data preprocessing for the ACDC challenge involved normalizing image intensities, cropping to 128x128 pixels, applying random flips and rotating within π/4 radians, achieved using MONAI library functions. The preprocessing pipeline ensured standardized data, augmented dataset variability, and facilitated subsequent analysis and classification.

This cell contains a function called read_data2 that reads data from a specified path, mode, and label. It retrieves paths of image files from the provided directory structure, organizes them, and returns the list of image file paths. The function streamlines the data retrieval process, making it easier to handle and analyze the dataset.

In [None]:
def read_data2(path, mode="training", label="images"):
    # Function to read data from a specified path, mode, and label

    path_img = []  # List to store paths of image files
    paths = glob.glob(path + mode + "/" + label + "/*")  # Get list of paths in the specified mode and label
    paths.sort()  # Sort the paths in alphabetical order

    for element in paths:
        #path_slice = glob.glob(element+"/")
        path_slice = element + "/"  # Get the path of the slice

        file_list = glob.glob(glob.escape(path_slice[0]) + "*.gz")  # Get the list of files in the slice
        sorted_file_list = sorted(file_list)  # Sort the files in the slice

        path_img.append(element)  # Add the image file path to the list

    return path_img  # Return the list of image file paths

In the following cell dictionaries which create a corrispondence between images and masks are created for the training, validation, and test set.

Note that for the training three paths of images and masks have been given, each of them correspond to the transformed images obtained for the purpose of data augmentation. This has been done with the code snipped "Data Augmentation" (below the section "Segmentation"). Of course this does not involved the independent test data in order of not having biased results.

In [None]:
path = "/content/drive/MyDrive/database/"  # Path to the database

# Read image and mask paths for training data with different labels
path_img = read_data2(path, mode="training", label="images")
path_mask = read_data2(path, mode="training", label="masks")

# Split the training data into train and validation sets
train_size = int(len(path_img) * 0.8)
train_img = path_img[:train_size]
train_mask = path_mask[:train_size]
val_img = path_img[train_size:]
val_mask = path_mask[train_size:]

# Read image and mask paths for augmented training data with different labels
path_img2 = read_data2(path, mode="training", label="images2")
path_mask2 = read_data2(path, mode="training", label="masks2")

path_img3 = read_data2(path, mode="training", label="images3")
path_mask3 = read_data2(path, mode="training", label="masks3")

# Combine original and augmented image and mask paths for the training set
train_img = train_img + path_img2 + path_img3
train_mask = train_mask + path_mask2 + path_mask3

# Create a DataFrame for training and validation data
train_data = pd.DataFrame({'img': train_img, 'mask': train_mask})
val_data = pd.DataFrame({'img': val_img, 'mask': val_mask})

# Convert the DataFrames to lists of dictionaries
train_dicts = train_data.to_dict('records')
val_dicts = val_data.to_dict('records')

# Read image and mask paths for testing data
path_img_test = read_data2(path, mode="testing2", label="images")
path_mask_test = read_data2(path, mode="testing2", label="masks")
test_dicts = pd.DataFrame({'img': path_img_test, 'mask': path_mask_test})  # Create a DataFrame for testing data

test_dicts = test_dicts.to_dict('records')  # Convert the DataFrame to a list of dictionaries

In this code snippet, we encounter a class called LoadAndTransform that encapsulates a data loading and transformation pipeline. This elegant pipeline facilitates the preparation and augmentation of data for further analysis. Let's explore its intricacies:

Within this method, a series of transformations are defined using the Compose function from the MONAI library. Each transformation plays a unique role in preparing the data for subsequent tasks.

The transformation pipeline begins by loading the image and mask data using the LoadImaged transformation. It gracefully handles the retrieval of these essential components, setting the stage for subsequent operations.

The next transformation, AddChanneld, augments the image and mask data by adding a channel dimension. This dimensionality enhancement ensures compatibility with subsequent processing steps.

To normalize the intensities of the image, the ScaleIntensityd transformation gracefully standardizes the data, fostering consistency and facilitating subsequent analysis.

The pipeline continues with the CenterSpatialCropd transformation, which expertly performs a centered spatial crop on both the image and mask. This operation helps focus on the essential regions of interest, promoting accurate and efficient analysis.

RandFlipd ia a transformation that randomly flips the image and mask. This augmentation technique enhances the diversity of the dataset, promoting robustness and reducing bias.

Next, the RandRotated transformation takes the stage, elegantly introducing random rotations to the image and mask. By varying the orientation, the dataset becomes more comprehensive and resilient to orientation-based discrepancies.

To enable one-hot encoding, the AsDiscreted transformation gracefully converts the mask into a one-hot representation. This representation facilitates multi-class classification tasks by encoding each class as a separate channel.

ToTensorD transformation converts the image and mask into PyTorch tensors. This conversion facilitates seamless integration with PyTorch-based modeling frameworks.

In [None]:
num_classes = 4  # Number of classes for one-hot encoding

class LoadAndTransform_train:
    def __init__(self):
        # Initialize the data loading and transformation pipeline
        self.transform = Compose([
            LoadImaged(keys=['img', 'mask']),  # Load the image and mask data
            AddChanneld(keys=['img', 'mask']),  # Add channel dimension to the image and mask
            ScaleIntensityd(keys=['img']),  # Normalize intensities of the image
            CenterSpatialCropd(keys=['img', 'mask'], roi_size=[128, 128]),  # Center crop the image and mask
            RandFlipd(keys=['img', 'mask'], prob=0.5, spatial_axis=1),  # Randomly flip the image and mask
            RandRotated(keys=['img', 'mask'], range_x=np.pi/4, prob=1, mode=['bilinear', 'nearest']),  # Randomly rotate the image and mask
            AsDiscreted(keys=['mask'], to_onehot=num_classes, num_classes=num_classes),  # Convert the mask to one-hot encoding
            ToTensorD(keys=['img', 'mask'])  # Convert the image and mask to PyTorch tensors
        ])

    def __call__(self, sample):
        # Apply the transformation pipeline on the given sample
        return self.transform(sample)

The class dataset for the training set is created.

In [None]:
train_transform = LoadAndTransform_train()  # Instantiate the training data transformation pipeline
train_dataset = monai.data.CacheDataset(train_dicts, train_transform)  # Create a training dataset with the transformed data

This cell executes the same operations of the one above, but not rotations, or flipping as we are dealing with the test and the validation.

In [None]:
num_classes = 4  # Number of classes for one-hot encoding

class LoadAndTransform:
    def __init__(self):
        # Initialize the data loading and transformation pipeline
        self.transform = Compose([
            LoadImaged(keys=['img', 'mask']),  # Load the image and mask data
            AddChanneld(keys=['img', 'mask']),  # Add channel dimension to the image and mask
            ScaleIntensityd(keys=['img']),  # Normalize intensities of the image
            CenterSpatialCropd(keys=['img', 'mask'], roi_size=[128, 128]),  # Center crop the image and mask
            AsDiscreted(keys=['mask'], to_onehot=num_classes, num_classes=num_classes),  # Convert the mask to one-hot encoding
            ToTensorD(keys=['img', 'mask'])  # Convert the image and mask to PyTorch tensors
        ])

    def __call__(self, sample):
        # Apply the transformation pipeline on the given sample
        return self.transform(sample)

Here the class dataset for validation and test is implemented.

In [None]:
validation_transform = LoadAndTransform()  # Instantiate the validation data transformation pipeline
validation_dataset = monai.data.CacheDataset(val_dicts, validation_transform)  # Create a validation dataset with the transformed data

test_transform = LoadAndTransform()  # Instantiate the testing data transformation pipeline
test_dataset = monai.data.CacheDataset(test_dicts, test_transform)  # Create a testing dataset with the transformed data

Dataloaders instantiation.

In [None]:
# Create the dataloader
dataloader_train = DataLoader(dataset=train_dataset, batch_size=16, shuffle=True, drop_last=True)
dataloader_validation = DataLoader(dataset=validation_dataset, batch_size=16, shuffle=True, drop_last=True)
dataloader_test = DataLoader(dataset=test_dataset, batch_size=1, shuffle=False, drop_last=True)

### **Data Augmentation**

In the field of machine learning, one of the critical factors that significantly impacts model performance is the availability and quality of the training data. However, it is not uncommon to encounter scenarios where the training dataset is relatively small, limiting the model's ability to learn complex patterns and generalize well. To mitigate this challenge, researchers and practitioners often employ data augmentation techniques to artificially expand the training set, thereby enhancing its dimensionality. This essay explores the process of data augmentation, specifically focusing on creating additional images through rotation and flipping, and the organizational steps involved in preparing the augmented dataset in Google Drive.

Data Augmentation for Image Classification:
In image classification tasks, data augmentation serves as a powerful tool to increase the diversity of the training set by applying various transformations to the existing images. By introducing such transformations, the model becomes exposed to a wider range of scenarios, leading to improved generalization and robustness. Among the commonly used data augmentation techniques, rotation and flipping have proven to be effective in expanding the dataset.

Rotation:
The process of rotation involves rotating an image by a certain degree within a specified range. For instance, an original image can be rotated by 90, 180, or 270 degrees to create three additional variations. By randomly selecting rotation angles, we can generate multiple images with different orientations, increasing the training set's dimensionality. This augmented dataset will enable the model to learn and classify objects irrespective of their orientation, making it more adaptable to real-world scenarios.

Flipping:
Flipping is another widely employed data augmentation technique that horizontally or vertically mirrors an image. This transformation creates new instances by reversing the original image along the specified axis. By applying horizontal flips, we can generate images that exhibit a mirrored effect, contributing to increased variation in the training set. Similarly, vertical flips provide additional diversity by introducing an upside-down perspective. The augmented dataset resulting from these flips will help the model to be more resilient to changes in object orientation and achieve better performance.

Organizational Steps in Google Drive:
Before implementing the data augmentation process, it is essential to organize the augmented dataset in a structured manner. Google Drive provides a convenient platform for creating and storing the augmented images in separate folders. By creating distinct folders for rotated and flipped images, we can maintain a clear distinction between the original and augmented dataset.

To begin, a dedicated folder can be created in Google Drive to house the training dataset. Within this main folder, separate subfolders should be created for rotated and flipped images. These subfolders serve as the destination for the augmented images generated through the data augmentation process.

Namely, inside training folders "images", "masks", "images2", "masks2", "images3", "masks3" have to be created in advance to allow the code to run.

By structuring the dataset in this manner, it becomes easier to keep track of the original and augmented images during the training process. Furthermore, it facilitates the integration of the augmented dataset with popular machine learning frameworks, as they often require the dataset to be organized into training, validation, and testing subsets.

In [None]:
def read_data(path, mode="training"):

    path_gt = []  # Initialize an empty list to store the paths of ground truth files
    path_img = []  # Initialize an empty list to store the paths of image files

    paths = glob.glob(data_path + mode + "/*")  # Get all the paths of files in the specified mode (e.g., "training")
    paths.sort()  # Sort the paths in alphabetical order

    for element in paths:  # Iterate through each path in the list of paths
        if (element[-10:-3]) == "patient":  # Check if the path corresponds to a patient
            path_slice = glob.glob(element + "/")  # Get the path of the slice directory

            file_list = glob.glob(glob.escape(path_slice[0]) + "*.gz")  # Get all the file paths within the slice directory
            sorted_file_list = sorted(file_list)  # Sort the file paths in alphabetical order

            for element in sorted_file_list:  # Iterate through each sorted file path
                if (element[-9:-7]) != "4d":  # Check if the file path does not contain "4d"
                    if (element[-9:-7]) == "gt":  # Check if the file path contains "gt"
                        path_gt.append(element)  # Append the ground truth file path to the list
                    else:
                        path_img.append(element)  # Append the image file path to the list

    dicts = pd.DataFrame({'img': path_img, 'mask': path_gt})  # Create a DataFrame with image and mask paths
    return dicts

In [None]:
data_path = '/content/drive/MyDrive/database/'

# Create a dictionary between the paths of the images and the paths of the masks within the train
train_dicts = read_data(data_path, mode='training')
train_dicts = train_dicts.to_dict('records')

In [None]:
from monai.transforms import (
    Compose, LoadImaged, AddChanneld, RandFlipd, RandRotated, ToTensorD, CenterSpatialCropd, DivisiblePadD, SqueezeDimd, AsDiscreted
)

num_classes = 4  # Number of classes in the dataset

class LoadAndTransform:
    def __init__(self):
        self.transform = Compose([
            LoadImaged(keys=['img', 'mask']),  # Load the image and mask using specified keys
            AddChanneld(keys=['img', 'mask']),  # Add a channel dimension to the image and mask
            CenterSpatialCropd(keys=['img', 'mask'], roi_size=[128, 128, 1]),  # Crop the image and mask to a specific size
            SqueezeDimd(keys=['img', 'mask'], dim=-1),  # Remove the last dimension from the image and mask
            RandFlipd(keys=['img', 'mask'], prob=0.5, spatial_axis=1),  # Randomly flip the image and mask along the spatial axis
            RandRotated(keys=['img', 'mask'], range_x=np.pi/4, prob=1, mode=['bilinear', 'nearest']),  # Randomly rotate the image and mask within a specified range
            AsDiscreted(keys=['mask'], to_onehot=num_classes, num_classes=num_classes),  # Convert the mask to one-hot encoding
            ToTensorD(keys=['img', 'mask'])  # Convert the image and mask to PyTorch tensors
        ])

    def __call__(self, sample):
        return self.transform(sample)  # Apply the defined transformation to the input sample

The following snippet is used to rotate both the original training image and mask of 45 degrees.

In [None]:
def data_rotation(dataset):
    masks = []  # Initialize an empty list to store the rotated masks
    images = []  # Initialize an empty list to store the rotated images
    metadata = []  # Initialize an empty list to store the metadata
    num = None  # Variable to check whether the index is repeated or not per iteration
    num2 = []  # Initialize an empty list

    # Load sample
    for element in range(len(dataset)):
        sample_dict = dataset[element]  # Get the sample from the dataset

        # Add channels
        add_channels_transform = monai.transforms.AddChanneld(keys=['img', 'mask'])  # Initialize the transform
        channels_sample_dict = add_channels_transform(sample_dict)  # Apply the transform
        print("Size of the image before AddChanneld transform", sample_dict["img"].shape)
        print("Size of the image after AddChanneld transform", channels_sample_dict["img"].shape)

        random_rotation_transform = monai.transforms.RandRotated(keys=['img', 'mask'], range_x=np.pi/4, prob=1, mode=['bilinear', 'nearest'])
        rotated_sample_dict = random_rotation_transform(channels_sample_dict)  # Apply the random rotation transform
        masks.append(rotated_sample_dict["mask"])  # Append the rotated mask to the masks list
        images.append(rotated_sample_dict["img"])  # Append the rotated image to the images list
        metadata.append(rotated_sample_dict["img_meta_dict"]["filename_or_obj"])  # Append the filename or object metadata
    return metadata, masks, images  # Return the metadata, masks, and images lists

In [None]:
general_data,masks,images=data_rotation(train_dataset)

The purpose of this cell is to save the generated images and masks in the NIfTI format (.nii.gz).

In [None]:
import nibabel as nib
def save_nifti(in_image, in_label, out, index, general_data):
    # Convert the torch tensors into numpy array
    volume = np.array(in_image.detach().cpu()[0, :, :, :], dtype=np.float32)
    lab = np.array(in_label.detach().cpu()[0, :, :, :], dtype=np.float32)

    # Convert the numpy array into nifti file
    volume = nib.Nifti1Image(volume, np.eye(4))
    lab = nib.Nifti1Image(lab, np.eye(4))

    # Create the path to save the images and labels
    path_out_images = os.path.join(out, 'Images')
    path_out_labels = os.path.join(out, 'Labels')

    # Make directory if not existing
    if not os.path.exists(path_out_images):
        os.mkdir(path_out_images)
    if not os.path.exists(path_out_labels):
        os.mkdir(path_out_labels)

    path_data = os.path.join(out, 'Images')
    path_label = os.path.join(out, 'Labels')
    nib.save(volume, os.path.join(path_data, f'patient_rotation'+general_data[index][53:-7]+'.nii.gz'))
    nib.save(lab, os.path.join(path_label, f'patient_rotation_'+general_data[index][53:-7]+'.nii.gz'))

    print(f'patient_generated_{index} is saved', end='\r')

In [None]:
out = '/content/drive/MyDrive/rotation/'

This again create new images and masks by rotating the previous ones by other 45 degrees.

In [None]:
def data_rotation2(dataset):
    masks=[]
    images=[]
    metadata=[]
    num=None # check whether the index is repeated or not per iteration
    num2=[]
    # Load sample
    for element in range(len(dataset)):

      sample_dict = dataset[element]

      # Add channels
      add_channels_transform = monai.transforms.AddChanneld(keys=['img', 'mask']) # Initialize the transform
      channels_sample_dict = add_channels_transform(sample_dict) # Apply the transform
      print("Size of the image before AddChanneld transform", sample_dict["img"].shape)
      print("Size of the image after AddChanneld transform", channels_sample_dict["img"].shape)

      random_rotation_transform = monai.transforms.RandRotated(keys=['img', 'mask'], range_x=3*np.pi/4, prob=1, mode=['bilinear', 'nearest']) # rotation of 135 degrees
      rotated_sample_dict = random_rotation_transform(channels_sample_dict)
      masks.append(rotated_sample_dict["mask"])
      images.append(rotated_sample_dict["img"])
      metadata.append(rotated_sample_dict["img_meta_dict"]["filename_or_obj"])
    return metadata,masks,images

Saving these new elemnts in ou2.

In [None]:
out2 = '/content/drive/MyDrive/rotation2/'
for i,element in enumerate(images):
  save_nifti(element, masks[i], out2, i,general_data)

The purpose of this following code is to read the file paths of augmented image and mask files stored in a specified directory. It assumes that the augmented images are stored in the "Images" subdirectory and the corresponding augmented masks are stored in the "Labels" subdirectory within the given path.

In [None]:
def read_augmented(path):
    path_gt = []  # Initialize an empty list to store the paths of ground truth files
    path_img = []  # Initialize an empty list to store the paths of image files

    path_images = path + "/Images"  # Path to the images directory
    path_masks = path + "/Labels"  # Path to the labels directory

    paths1 = glob.glob(path_images + "/*")  # Get all the paths of files in the images directory
    paths2 = glob.glob(path_masks + "/*")  # Get all the paths of files in the labels directory

    paths1.sort()  # Sort the paths of images in alphabetical order
    paths2.sort()  # Sort the paths of labels in alphabetical order

    for i, element in enumerate(paths1):  # Iterate through each path in the list of image paths
        path_img.append(element)  # Append the image file path to the list
        path_gt.append(paths2[i])  # Append the corresponding ground truth file path to the list

    dicts = pd.DataFrame({'img': path_img, 'mask': path_gt})  # Create a DataFrame with image and mask paths
    return dicts

In [None]:
rotated_dataset = read_augmented(out)
rotated_dataset = rotated_dataset.to_dict('records')
dataset=train_dicts+rotated_dataset
rotation_transf = LoadAndTransform()
dataset = monai.data.CacheDataset(dataset, rotation_transf)

This cell is used to flip the original images and masks to have even more data.

In [None]:
def data_flip(dataset):
    masks=[]
    images=[]
    metadata=[]
    num=None # check whether the index is repeated or not per iteration
    num2=[]
    # Load sample
    for element in range(len(dataset)):

      sample_dict = dataset[element]

      # Add channels
      add_channels_transform = monai.transforms.AddChanneld(keys=['img', 'mask']) # Initialize the transform
      channels_sample_dict = add_channels_transform(sample_dict) # Apply the transform
      print("Size of the image before AddChanneld transform", sample_dict["img"].shape)
      print("Size of the image after AddChanneld transform", channels_sample_dict["img"].shape)



      # Random flip
      # here we define the keys, the probability that the flip is performed and the axis to flip over
      random_flip_transform = monai.transforms.RandFlipd(keys=['img', 'mask'], prob=1, spatial_axis=1)
      # We put a probability of 1 to always flip the image for visualization purposes.
      # Please DO NOT DO THAT in the rest of the notebook.
      flipped_sample_dict = random_flip_transform(channels_sample_dict)
      masks.append(flipped_sample_dict["mask"])
      images.append(flipped_sample_dict["img"])
      metadata.append(flipped_sample_dict["img_meta_dict"]["filename_or_obj"])

    return metadata,masks,images

In [None]:
general_data,masks,images=data_flip(dataset)

In [None]:
out = '/content/drive/MyDrive/flipped/'
for i,element in enumerate(images):
  save_nifti(element, masks[i], out, i,general_data)

In [None]:
flipped_dataset = read_augmented(out)
flipped_dataset = flipped_dataset.to_dict('records')
dataset=train_dicts+rotated_dataset+flipped_dataset
transformation = LoadAndTransform()
dataset = monai.data.CacheDataset(dataset, transformation)

### **Model Instantiation**

The model architecture utilized for the ACDC challenge was a 2D U-Net, implemented using the MONAI library. The U-Net consisted of multiple encoding and decoding blocks, with a total of four output channels representing different classes. The U-Net architecture employed a series of
convolutional layers with increasing channel sizes (8, 16, 32, 64, and 128) and downsampling strides of 2 in each block.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'The used device is {device}')

In [None]:
# Define the UNet model architecture
model = monai.networks.nets.UNet(
    spatial_dims=2,  # Number of spatial dimensions (2D in this case)
    in_channels=1,  # Number of input channels
    out_channels=4,  # Number of output channels
    channels=(8, 16, 32, 64, 128),  # Number of channels in each layer
    strides=(2, 2, 2, 2),  # Strides in each layer for downsampling
    num_res_units=2,  # Number of residual units in each layer
).to(device)  # Move the model to the specified device

The cost function defined here utilizes the Dice loss, a popular and effective loss function commonly employed in medical image segmentation tasks. Let's explore its characteristics:

The DiceLoss function, provided by the MONAI library, is specifically designed for binary or multi-class segmentation tasks. It leverages the concept of the Dice coefficient, which measures the similarity between two sets by evaluating their overlap.

In this case, the cost function is configured with the sigmoid=True parameter, indicating that the model's output is expected to be passed through a sigmoid activation function. This is particularly useful when dealing with binary segmentation tasks, where the sigmoid function can be employed to obtain probability-like values within the range of 0 to 1.

The batch=True parameter specifies that the loss function will operate on batches of data. This allows for efficient computation and enables the utilization of batch processing capabilities provided by deep learning frameworks.

By employing the Dice loss, the cost function aims to minimize the dissimilarity between the predicted segmentation outputs and the ground truth labels. It accomplishes this by encouraging high overlap and agreement between the predicted and ground truth segmentations, resulting in more accurate and reliable segmentation models.

In [None]:
cost_function =  monai.losses.DiceLoss(sigmoid=True, batch=True)

The SGD (Stochastic Gradient Descent) optimizer is a widely used optimization algorithm in deep learning. It plays a crucial role in training neural networks by iteratively adjusting the model's parameters to minimize the loss function. Let's explore its characteristics:

SGD operates on the principle of gradient descent, which involves updating the model's parameters in the direction of the steepest descent of the loss function. It does so by calculating the gradients of the parameters with respect to the loss and taking small steps towards minimizing the loss.

In [None]:
# Set the GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Set some parameters
epochs = 5
learning_rate = 0.001
batch_size = 16

# Instantiate ResNet18
net = model

weight_decay=5e-4
momentum=0.9

# Optimizer
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)

These code cells contain functions and computations related to evaluating the performance of a model using Dice coefficient as a metric for medical image segmentation.

metric_fn is initialized as a DiceMetric object from the MONAI library. It is specifically configured to calculate the Dice coefficient, which is a common metric used for evaluating the performance of medical image segmentation models. This particular DiceMetric object is set to exclude the background class from the computation and uses a "mean" reduction strategy.

In [None]:
metric_fn = monai.metrics.DiceMetric(include_background=False, reduction="mean")

DICE_COE is a custom function that calculates the Dice coefficient based on two input masks. It performs the following steps:

Computes the intersection of the two masks by multiplying them element-wise.
Calculates the sums of the individual masks.
Applies the formula (2 * intersection) / (sum of masks) to obtain the Dice coefficient. This formula measures the overlap between the predicted segmentation and the ground truth, providing a measure of similarity between the two.

In [None]:
def DICE_COE(mask1, mask2):
    intersect = np.sum(mask1*mask2)
    fsum = np.sum(mask1)
    ssum = np.sum(mask2)
    dice = (2 * intersect ) / (fsum + ssum)
    dice = np.mean(dice)
    dice = round(dice, 3)
    return dice

The compute_metric function plays a crucial role in evaluating the performance of a model. It begins by preparing the model for evaluation, ensuring that it operates on the specified device. The function then employs the SlidingWindowInferer technique to perform inference on the input image, enabling efficient processing using a sliding window approach.

After obtaining the output predictions from the model, a Softmax operation is applied to generate class probabilities. These probabilities facilitate the calculation of dice coefficients, which measure the overlap or similarity between the predicted labels and the ground truth labels for each class.

To assess the model's performance comprehensively, mean dice coefficients are computed for each class. These coefficients provide an average measure of accuracy for individual classes, shedding light on the model's performance in segmenting different objects or regions.

The overall estimation is obtained by averaging the mean dice coefficients across all classes. This metric serves as an indicator of the model's overall segmentation accuracy, encapsulating its ability to produce accurate and consistent predictions.

In [None]:
def compute_metric(dataloader, model, metric_fn, device):
    model.eval()
    model = model.to(device)
    inferer = monai.inferers.SlidingWindowInferer(roi_size=[128, 128])
    discrete_transform = monai.transforms.AsDiscrete(threshold=0.9)
    Softmax = torch.nn.Softmax(dim=1)  # specify the dimension for softmax

    all_outputs = []
    all_labels = []

    for sample in dataloader:
        img = sample['img'].to(device)  # move image to device
        mask = sample['mask'].to(device)  # move mask to device
        dice_coefficients1 = []
        dice_coefficients2 = []
        dice_coefficients3 = []
        dice_coefficients4 = []


        with torch.no_grad():
            output = inferer(img, network=model)
            output = Softmax(output).cpu()  # apply softmax then move to cpu

            # Check if output has the expected shape and type (Tensor)
            assert isinstance(output, torch.Tensor), "Output should be a Tensor"

            all_outputs.append(output)
            all_labels.append(mask.cpu())  # move mask back to cpu for metric computation

            prediction_label1 = np.squeeze(output,axis=0)[0,:,:]
            prediction_label2 = np.squeeze(output,axis=0)[1,:,:]
            prediction_label3 = np.squeeze(output,axis=0)[2,:,:]
            prediction_label4 = np.squeeze(output,axis=0)[3,:,:]

            ground_truth1 = np.squeeze(mask,axis=0)[0,:,:]
            ground_truth2 = np.squeeze(mask,axis=0)[1,:,:]
            ground_truth3 = np.squeeze(mask,axis=0)[2,:,:]
            ground_truth4 = np.squeeze(mask,axis=0)[3,:,:]

            dice_coef1 = DICE_COE(prediction_label1,ground_truth1)
            dice_coef2 = DICE_COE(prediction_label2,ground_truth2)
            dice_coef3 = DICE_COE(prediction_label3,ground_truth3)
            dice_coef4 = DICE_COE(prediction_label4,ground_truth4)

            dice_coefficients1.append(dice_coef1)
            dice_coefficients2.append(dice_coef2)
            dice_coefficients3.append(dice_coef3)
            dice_coefficients4.append(dice_coef4)


    # Stack tensors together for metric computation

   # mean_dice = metric_fn(torch.stack(all_outputs), torch.stack(all_labels))
    mean_dice1 = np.mean(dice_coefficients1)
    mean_dice2 = np.mean(dice_coefficients2)
    mean_dice3 = np.mean(dice_coefficients3)
    mean_dice4 = np.mean(dice_coefficients4)


    overall_estimation = np.mean([mean_dice1,mean_dice2,mean_dice3,mean_dice4])
    return  overall_estimation,all_outputs, all_labels,mean_dice1,mean_dice2,mean_dice3,mean_dice4

Early stopping is a technique used to prevent overfitting and it is based on the idea that the model's performance on the validation set will stop improving after a certain number of training epochs, and continuing to train beyond this point will only lead to overfitting.

In this case the monitor is on the perplexity: after three times we see the perplexity increasing we stop the training and get the final value.

There are several benefits to using early stopping:

It can help prevent overfitting by stopping the training process before the model starts to overfit the training data;
It can save time and resources by avoiding the need to train the model for a large number of epochs;
It can improve the generalization ability of the model by avoiding the use of models that are overfitted to the training data.

In [None]:
class EarlyStopping:

  """
    A class for early stopping of training process.

    Args:
        patience (int): Number of epochs to wait before triggering early stopping. Default: 5
        verbose (bool): Flag indicating whether to print messages when early stopping is triggered. Default: True

    Attributes:
        patience (int): Number of epochs to wait before triggering early stopping.
        verbose (bool): Flag indicating whether to print messages when early stopping is triggered.
        counter (int): Counter for the number of epochs since the last improvement.
        best_score (float): Best score seen so far. None by default.
  """

  def __init__(self, patience=5, verbose=True):

    # set the number of epochs to wait before triggering early stopping
    self.patience = patience

    # set a flag indicating whether to print messages when early stopping is triggered
    self.verbose = verbose

    # initialize the counter for the number of epochs since the last improvement
    self.counter = 0

    # initialize the best score seen so far to None
    self.best_score = None

  def step(self, val_loss):

    """
        Function to check if early stopping should be triggered.

        Args:
            val_loss (float): Current validation loss.

        Returns:
            bool: Whether early stopping should be triggered or not.
    """

    # if this is the first epoch, set the best score to the current validation loss
    if self.best_score is None:

      self.best_score = val_loss

    # if the current validation loss is worse than the best score seen so far
    elif val_loss > self.best_score:

      # increment the counter
      self.counter += 1

      # if the counter has reached the patience threshold
      if self.counter >= self.patience:

        # if verbose is True, print a message indicating that early stopping has been triggered
        if self.verbose:

          print(f'Early stopping triggered with counter {self.counter} and patience {self.patience}')

        # return True to indicate that early stopping should be triggered
        return True

    # if the current validation loss is better than the best score seen so far

    else:

      # set the best score to the current validation loss
      self.best_score = val_loss

      # reset the counter
      self.counter = 0

    # if the counter has not reached the patience threshold, return False
    return False

### **Training Step**

The training_step function is a Python function that represents a training step (training step) of a neural network. The function takes as input a neural network (net), a training set (train_loader), a validation set (valid_loader), an optimizer (optimizer), a cost function (cost_function), and a device (device) on which to run the training (by default "cuda," indicating an NVIDIA GPU).

The function first sets the neural network to training mode (net.train()), then runs a training cycle on the training set (for inputs, targets in train_loader:). For each batch of inputs and targets in the training set, the function performs the following steps:

Resizing the input image to a specific size. Moving the inputs and targets to the specified device (device). Zeroing the gradients of the optimizer (optimizer.zero_grad()). Forward pass of the neural network (outputs = net(inputs)). Calculation of the cost function (loss = cost_function(outputs, targets)). Backward pass to calculate gradients (loss.backward()). Update of neural network parameters (optimizer.step()). Calculating training accuracy (train_acc) and tracking training loss (train_loss). After running the training cycle on the training set, the function runs a validation cycle on the validation set (for inputs, targets in valid_loader:). For each batch of inputs and targets in the validation set, the function performs steps 1 through 5, calculates the validation accuracy (valid_acc) and tracks the validation loss (valid_loss).

Finally, the function returns the average training loss (train_loss), average training accuracy (train_acc), average validation loss (valid_loss) and average validation accuracy (valid_acc) as a tuple. These values can be used to monitor and evaluate the performance of the neural network during training.

In [None]:
def training_step(model, data_loader, validation_loader, optimizer, cost_function, device='cuda'):

    # Training
    model.train()
    epoch_loss = 0
    samples = 0
    for batch_data in data_loader:
        optimizer.zero_grad()
        outputs = model(batch_data['img'].float().to(device))
        samples += outputs.shape[0]
        loss = cost_function(outputs, batch_data['mask'].float().to(device))
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    train_loss = epoch_loss / samples

    # Validation
    model.eval()
    samples = 0
    val_loss = 0
    with torch.no_grad():
        for batch_data in validation_loader:
            outputs = model(batch_data['img'].float().to(device))
            samples += outputs.shape[0]
            loss = cost_function(outputs, batch_data['mask'].float().to(device))
            val_loss += loss.item()
    val_loss = val_loss / samples

    return train_loss, val_loss, model

The dice_metric variable is initialized as a DiceMetric object from the MONAI library. It is configured to calculate the Dice coefficient, a commonly used metric in image segmentation tasks.

By setting include_background to True, the dice_metric includes the background class in the computation of the coefficient. This allows for a comprehensive evaluation of the model's segmentation performance across all classes, including the background.

The reduction parameter is set to 'mean', indicating that the computed Dice coefficients for each class will be averaged to obtain an overall mean coefficient. This provides a concise and interpretable measure of the model's segmentation accuracy, taking into account all classes and their contributions to the final result.

Using the dice_metric, one can conveniently assess the model's performance in terms of segmentation accuracy, accounting for both foreground and background classes. This enables a comprehensive evaluation of the model's ability to accurately delineate regions of interest in the given images.

In [None]:
dice_metric = monai.metrics.DiceMetric(include_background=True, reduction='mean')

The provided code is a Python function named "main" that implements a complete iteration of a neural network training process for classification. The function takes as input the training configuration parameters such as the number of epochs, batch size, learning rate, weight decay, momentum, training data loader, validation data loader, optimizer and cost function.

Within the function, an Early Stopping criterion is created, which is used to stop training when the validation loss does not improve for a certain number of epochs. Next, the function iterates over the specified number of epochs and at each iteration calls the training_step function to train the network based on the training and validation data loader.

During training, the model parameters with the lowest validation loss up to that point are saved. At the end of training, the function uses the saved model parameters to perform the test on the validation data.

Finally, the function returns the training and validation losses and accuracies as output. The code also provides the ability to save the trained model to disk for later use.

In [None]:
def main(batch_size,
         device,
         learning_rate,
         weight_decay,
         momentum,
         epochs,
         train_loader,
         validation_loader,
         test_loader,
         model,
         optimizer,
         cost_function,
         metric_function,
         patience):

    model_save_path = "/content/drive/MyDrive/best_model.pth"

    # List to store outputs and labels for testing phase
    outputs_test = []
    labels_test = []

    # Create an instance of the EarlyStopping class
    early_stopping = EarlyStopping(patience=patience, verbose=True)

    # Track loss over each epoch
    training_losses = []
    validation_losses = []
    best_loss = 10e6

    # Loop over each epoch
    for e in range(epochs):

        # Training
        train_loss, val_loss, model = training_step(model, train_loader, validation_loader, optimizer, cost_function, device)
        training_losses.append(train_loss)
        validation_losses.append(val_loss)

        # Check if early stopping should be triggered
        should_stop = early_stopping.step(val_loss)

        # Save the model if validation loss has improved
        if val_loss <= early_stopping.best_score:
            torch.save(model.state_dict(), model_save_path)
            print(f'Model saved at {model_save_path}')

        # Testing
        test_loss, outputs, labels, _, _, _, _ = compute_metric(test_loader, model, metric_function, device)

        # Append outputs and labels
        outputs_test.append(outputs)
        labels_test.append(labels)

        # Logging
        print(f'Epoch: {e + 1}')
        print(f'\t Training loss: {train_loss:.5f}')
        print(f'\t Validation loss: {val_loss:.5f}')
        print(f'\t Test loss: {test_loss:.5f}')
        print('-----------------------------------------------------')

        # Break the loop if early stopping is triggered
        if should_stop:
            break

    return outputs_test, labels_test


# Example call to main function
# Specify all the required parameters before calling.


outputs_test, labels_test=main(batch_size, device, learning_rate, weight_decay, momentum, epochs,
     dataloader_train, dataloader_validation, dataloader_test, net, optimizer,
     cost_function, dice_metric, patience=5)

### **Post processing**

Once the probability matrices are obtained from the U-Net, post-processing
steps can enable the calculation of a threshold to binarize the resulting images.
A common approach consists in finding the threshold that results in a mask
that maximizes the similarity between the resulting mask and a ground truth
mask. However, it is not always possible to calculate such threshold since the
ground truth mask is not available. We propose an approach that does not rely
on ground truth data for the test set, since we assume that this dataset would
not be available. The proposed approach consists in computing an area profile
of the label of interest in an MRI image. The area profile represents the amount
of pixels found to be a foreground for each label with respect to the size of the
image. For each slice of the MRI file the area was calculated.
A label-specific area profile was computed by obtaining all the areas for each
slice per label. The training masks served to create a reference area profile for
each label, and the average area profiles was obtained for each label.
Furthermore, different thresholds were applied to the probability matrices
obtained in the test set. The thresholds ranged from 0 to the maximum pixel
intensity found in the probability matrices. Area profiles were computed for each
threshold, for each observation of the test set. For each area profile of the test set,
the mean square error (MSE) was obtained with respect to it’s corresponding
reference area profile. The threshold that corresponded to the minimum MSE
was used as final threshold to binarize the probability matrices obtained from
the test set.

Also in this case it is necessary to create the same folders in the drive in order to run the code (masks and images within testing) as we will see in the different paths recalled.

Let's start again by reading an pre-process the images from the secret test set.

In [None]:
import os

def read_png_files(folder_path):
    png_files = []
    for file in os.listdir(folder_path):
        if file.lower().endswith('.png'):
            png_files.append(os.path.join(folder_path, file))
    return png_files
def order_strings(strings):
    def custom_sort_key(string):
        # Extract the number after 'patient'
        patient_num = int(string.split('patient')[1].split('_')[0])
        # Extract the number after 'frame'
        frame_num = int(string.split('frame')[1].split('_')[0])
        # Extract the last two digits of the string
        last_two_digits = int(string[-6:-4])
        return (patient_num, frame_num, last_two_digits)

    return sorted(strings, key=custom_sort_key)

Here below the already used LoadAndTransform function.

In [None]:
from monai.transforms import (
    Compose, LoadImaged, AddChanneld, RandFlipd, RandRotated, ToTensorD, CenterSpatialCropd,
    ScaleIntensityd, AsDiscreted
)
import torch
import numpy as np

num_classes = 4

class LoadAndTransform:
    def __init__(self):
        self.transform = Compose([
            LoadImaged(keys=['img', 'mask']),
            AddChanneld(keys=['img', 'mask']),
            ScaleIntensityd(keys=['img']),
            CenterSpatialCropd(keys=['img', 'mask'], roi_size=[128, 128]),
            AsDiscreted(keys=['mask'], to_onehot=num_classes, num_classes=num_classes),
            ToTensorD(keys=['img', 'mask'])
        ])

    def __call__(self, sample):
        return self.transform(sample)

Here the paths have to be changed according to the folder in which the secret test set has been saved in.

In [None]:
import pandas as pd

# Use the read_png_files function to get a list of all the .png files in the specified folder for test images
path_img_test = read_png_files('/Users/anacordonavila/Desktop/DLMI/group_project/database/testing/images')

# Use the read_png_files function to get a list of all the .png files in the specified folder for test masks
path_mask_test = read_png_files('/Users/anacordonavila/Desktop/DLMI/group_project/database/testing/masks')

# Sort the lists of test image and mask paths using the order_strings function
path_img_test = order_strings(path_img_test)
path_mask_test= order_strings(path_mask_test)

# Create a DataFrame with two columns: 'img' and 'mask', containing the sorted lists of test image and mask paths
test_dicts = pd.DataFrame({'img':path_img_test,'mask':path_mask_test})

# Convert the DataFrame to a list of dictionaries, where each dictionary represents one row of the DataFrame
test_dicts = test_dicts.to_dict('records')

# Create an instance of the LoadAndTransform class
test_transform=LoadAndTransform()

# Create a CacheDataset using the list of dictionaries and the transform object
test_dataset = monai.data.CacheDataset(test_dicts, test_transform)

The count_patient_repetitions function takes a list of file paths as an argument and returns a dictionary that counts the number of times each combination of patient ID and frame number appears in the list.

For each file path in the input list, the function extracts the patient ID and frame number from the file name by splitting the file path on the “/” character to get the file name, then splitting the file name on the “_frame” string to get the patient ID and frame number. The patient ID and frame number are then combined into a unique key by concatenating them with an underscore.

The function then updates the count for this key in the patient_frame_counts dictionary by using the get method to retrieve the current count for the key (or 0 if it doesn’t exist yet) and adding 1 to it. This process is repeated for all file paths in the input list.

In [None]:
def count_patient_repetitions(file_paths):
    patient_frame_counts = {}
    for file_path in file_paths:
        # Extract the patient ID and frame number from the file path
        file_name = file_path.split("/")[-1]  # Extract the file name
        patient_id, frame = file_name.split("_frame")[0], file_name.split("_frame")[1].split("_")[0]

        # Create a unique key combining patient ID and frame number
        key = patient_id + "_" + frame

        # Update the count for the key
        patient_frame_counts[key] = patient_frame_counts.get(key, 0) + 1

    return patient_frame_counts

In [None]:
count_patient_repetitions(path_mask_test).values()

Instantiate the dataloader.

In [None]:
dataloader_test = DataLoader(dataset=test_dataset, batch_size=1, shuffle=False)

Definition of the model previously discussed.

In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

state_dict = torch.load("/Users/anacordonavila/Desktop/DLMI/group_project/database/results_0.2731249574571848.pth")
# Define your network architecture
net = monai.networks.nets.UNet(
    spatial_dims=2,
    in_channels=1,
    out_channels=4,
    channels=(8, 16, 32, 64, 128),
    strides=(2, 2, 2, 2),
    num_res_units=2,
).to(device)

# Load the state dictionary into the model
net.load_state_dict(state_dict)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net = net.to(device)

from monai.transforms import (
    Compose, LoadImaged, AddChanneld, RandFlip, RandRotate, ToTensorD, CenterSpatialCropd,
    ScaleIntensityd, AsDiscrete
)
import torch
import numpy as np

class LoadAndTransform:
    def __init__(self):
        self.transform = Compose([
            LoadImaged(keys=['img']),
            AddChanneld(keys=['img']),
            ScaleIntensityd(keys=['img']),  # normalize intensities of the image
            CenterSpatialCropd(keys=['img'], roi_size=[128, 128]),
            ToTensorD(keys=['img'])
        ])

    def __call__(self, sample):
        data = {'img': sample['img']}  # Assuming 'img' is the key for the image data
        return self.transform(data)



# Set the model to evaluation mode
net.eval()

# Create a list to store the predicted masks
masks = []

# Run inference on the dataset
with torch.no_grad():
    for batch in dataloader_test:
        images = batch["img"].to(device)
        outputs=net(images)
        masks.append(outputs)

# Concatenate the predicted masks into a single tensor
masks = torch.cat(masks, dim=0)

# You can further process or visualize the predicted masks as needed
print(masks.shape)  # Print the shape of the resulting masks

In [None]:
masks=masks.numpy()

In [None]:
import cv2
shapes=[]
img=[]
for element in path_img_test:
    img.append(cv2.imread(element))
    shapes.append(np.shape(cv2.imread(element))[:-1])

The resize_image function changes the height and width of an input image to the supplied values. It generates a new image with zeros, centers the original image within it, and calculates the necessary padding values to preserve the aspect ratio. Additionally, the function sets the first channel's extra padding values to 1. After that, the resized image is given back.

In [None]:
def resize_image(image, new_height, new_width):
    original_height, original_width = image.shape[1:]

    # Calculate padding values
    pad_top = (new_height - original_height) // 2
    pad_bottom = new_height - original_height - pad_top
    pad_left = (new_width - original_width) // 2
    pad_right = new_width - original_width - pad_left

    # Create new image with zeros
    resized_image = np.zeros((4, new_height, new_width), dtype=image.dtype)

    # Calculate starting and ending indices for original image placement
    start_row = pad_top
    end_row = pad_top + original_height
    start_col = pad_left
    end_col = pad_left + original_width

    # Assign original image to the center of the new image
    resized_image[:, start_row:end_row, start_col:end_col] = image

    # Set the new added values to 1 for the first channel
    resized_image[0, :pad_top, :] = 1
    resized_image[0, end_row:, :] = 1
    resized_image[0, :, :pad_left] = 1
    resized_image[0, :, end_col:] = 1

    return resized_image

Depending on the quantity of components supplied in the slices list, the code divides a collection of photos and ground truth masks into groups. Utilizing np.cumsum, it determines the cumulative sum of the slices. Then, after repeating this process for each element in the cumulative total, it adds the matching subsets of resized_masks and gt to the lists of images_separated and gt_masks, respectively.

In [None]:
# List of slices representing the number of elements in each group
slices=[10, 10, 8, 8, 9, 9, 9, 9, 10, 10, 9, 9, 9, 9, 10, 10, 8, 8, 9, 9, 6, 6, 10, 10, 10, 10, 11, 11, 10, 10, 10, 10, 10, 10, 9, 9, 17, 17, 9, 9, 8, 8, 9, 9, 8, 8, 21, 21, 17, 17, 16, 16, 19, 19, 9, 9, 17, 17, 10, 10, 10, 10, 10, 10, 10, 10, 11, 11, 10, 10, 10, 10, 9, 9, 9, 9, 16, 16, 20, 20, 10, 10, 8, 8, 10, 10, 8, 8, 10, 10, 9, 9, 15, 15, 8, 8, 10, 10, 8, 8]

# Empty lists to store separated images and ground truth masks
images_separated=[]
gt_masks=[]

# Calculate cumulative sum of slices
cumulative_sum = np.cumsum(slices)

# Loop over the cumulative sum elements
for i,element in enumerate(cumulative_sum):
    # Check if it is the first element
    if i==0:
        # Append the corresponding subset of resized_masks and gt to the separated images and gt_masks lists
        images_separated.append(resized_masks[0:element])
        gt_masks.append(gt[0:element])
    else:
        # Append the subset from cumulative_sum[i-1] to element to the separated images and gt_masks lists
        images_separated.append(resized_masks[cumulative_sum[i-1]:element])
        gt_masks.append(gt[cumulative_sum[i-1]:element])

An image and the quantity of labels are inputs for the separate_labels function. It generates an empty image with distinct channels for each label, assigns pixels from that label to the appropriate channel in the separated image, and then deletes the empty image. It then returns the divided image, where each channel stands for a particular label.

In [None]:
def separate_labels(image, num_labels):
    # Assuming the image has shape (height, width)
    height, width,ch = image.shape

    # Create an empty image with separate channels for each label
    separated_image = np.zeros((num_labels, height, width), dtype=np.uint8)

    for label in range(num_labels):
        # Set pixels of the corresponding label to the channel of separated_image
        separated_image[label] = (image == label).astype(np.uint8)

    return separated_image

In [None]:
# Set the number of labels
num_labels = 4

# Initialize a list to store the ground truth masks
ground_truth=[]

# Loop over the patients in the gt_masks list
for patient in gt_masks:
    # Initialize a list to store the ground truth masks for the current patient
    gt1=[]

    # Loop over the slices for the current patient
    for slices in patient:
        # Use the separate_labels function to separate the labels in the current slice and append the result to gt1
        gt1.append(separate_labels(slices[:,:,0], num_labels))

    # Append the list of ground truth masks for the current patient to the ground_truth list
    ground_truth.append(gt1)

In [None]:
def dice_coefficient(image1, image2):
    # Flatten the images
    image1 = image1.flatten()
    image2 = image2.flatten()

    # Compute the intersection and sum of pixels
    intersection = np.sum(image1 * image2)
    sum_pixels = np.sum(image1) + np.sum(image2)

    # Compute the Dice coefficient
    dice = (2.0 * intersection) / (sum_pixels + 1e-6)  # Add a small constant to avoid division by zero

    return dice

Based on the number of slices supplied in the slices list, this code divides a set of images and ground truth masks into groups. It computes the cumulative sum of the slices list and loops through the components of the cumulative sum. It adds the appropriate subsets of masks and gt to the lists images_separated and gt_masks for each element.

In [None]:
# Initialize a list of the number of slices for each patient
slices=[10, 10, 8, 8, 9, 9, 9, 9, 10, 10, 9, 9, 9, 9, 10, 10, 8, 8, 9, 9, 6, 6, 10, 10, 10, 10, 11, 11, 10, 10, 10, 10, 10, 10, 9, 9, 17, 17, 9, 9, 8, 8, 9, 9, 8, 8, 21, 21, 17, 17,16 ,16 ,19 ,19 ,9 ,9 ,17 ,17 ,10 ,10 ,10 ,10 ,10 ,10 ,10 ,10 ,11 ,11 ,10 ,10 ,10 ,10 ,9 ,9 ,9 ,9 ,16 ,16 ,20 ,20 ,10 ,10 ,8 ,8 ,10 ,10 ,8 ,8 ,10 ,10 ,9 ,9 ,15 ,15 ,8 ,8]

# Initialize lists to store the separated images and ground truth masks
images_separated=[]
gt_masks=[]

# Calculate the cumulative sum of the slices list
cumulative_sum = np.cumsum(slices)

# Loop over the elements in the cumulative_sum list
for i,element in enumerate(cumulative_sum):
    # If this is the first element in the list
    if i==0:
        # Append the first element slices to the images_separated and gt_masks lists
        images_separated.append(masks[0:element])
        gt_masks.append(gt[0:element])
    else:
        # Append the slices from the previous element to the current element to the images_separated and gt_masks lists
        images_separated.append(masks[cumulative_sum[i-1]:element])
        gt_masks.append(gt[cumulative_sum[i-1]:element])

This code determines the regions for every image in a collection of distinct images. To record the areas for each patient, it initializes empty lists (patients_areas1, patients_areas2, patients_areas3).

In [None]:
# Initialize lists to store the areas for each patient
patients_areas1=[]
patients_areas2=[]
patients_areas3=[]

# Loop over the images in the images_separated list
for indx in range(len(images_separated)):
    # Get the shape of the current image
    num,d1,d2,ch=np.shape(images_separated[indx])

    # Initialize lists to store the areas for each image
    area_1=[]
    area_2=[]
    area_3=[]

    # Loop over the number of images
    for i in range(num):
        # Create an array of threshold values from 0 to the maximum value in the current image, with 20 evenly spaced values
        thresholds=np.linspace(0,max(images_separated[idx],20)

        # Initialize lists to store the areas for each threshold value
        specific_th1=[]
        specific_th2=[]
        specific_th3=[]

        # Loop over the threshold values
        for th in thresholds:
            # Calculate the unique values and their counts in the current image, thresholded by th
            values, counts = np.unique (images_separated[indx][i][:,:,1]>th.flatten(), return_counts=True)

            # Get the height and width of the current image
            height,width=np.shape(images_separated[indx][i][:,:,1])

            # Calculate the ratio of counts to total pixels in the image
            area_ratio1 = (counts / (height*width))

            # Subtract the first element of area_ratio1 from 1 and append it to specific_th1
            area_ratio1= 1-area_ratio1[0]
            specific_th1.append(area_ratio1)

            # Repeat the above process for the second and third channels of the current image, appending to specific_th2 and specific_th3 respectively
            values, counts = np.unique (images_separated[indx][i][:,:,2]>th.flatten(), return_counts=True)
            height,width=np.shape(images_separated[indx][i][:,:,2])
            area_ratio1 = (counts / (height*width))
            area_ratio1= 1-area_ratio1[0]
            specific_th2.append(area_ratio1)

            values, counts = np.unique (images_separated[indx][i][:,:,3]>th.flatten(), return_counts=True)
            height,width=np.shape(images_separated[indx][i][:,:,3])
            area_ratio1 = (counts / (height*width))
            area_ratio1= 1-area_ratio1[0]
            specific_th3.append(area_ratio1)

        # Append the lists of areas for each threshold value to the corresponding list for each image
        area_1.append(specific_th1)
        area_2.append(specific_th2)
        area_3.append(specific_th3)

    # Append the lists of areas for each image to the corresponding list for each patient
    patients_areas1.append(area_1)
    patients_areas2.append(area_2)
    patients_areas3.append(area_3)

Path needs to be changed.

In [None]:
path=order_strings(read_png_files("/Users/anacordonavila/Desktop/DLMI/group_project/database/testing/masks"))

In [None]:
slices=[10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 11, 11, 10, 10, 10, 10, 10, 10, 10, 10, 9, 9, 10, 10, 10, 10, 10, 10, 9, 9, 10, 10, 9, 9, 8, 8, 11, 11, 8, 8, 10, 10, 7, 7, 9, 9, 8, 8, 9, 9, 10, 10, 10, 10, 10, 10, 11, 11, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 13, 13, 8, 8, 7, 7, 8, 8, 9, 9, 10, 10, 6, 6, 9, 9, 12, 12, 9, 9, 8, 8, 9, 9, 9, 9, 8, 8, 7, 7, 10, 10, 10, 10, 8, 8, 7, 7, 8, 8, 9, 9, 9, 9, 8, 8, 9, 9, 9, 9, 9, 9, 9, 9, 10, 10, 8, 8, 10, 10, 8, 8, 9, 9, 10, 10, 7, 7, 7, 7, 6, 6, 10, 10, 8, 8, 7, 7, 8, 8, 14, 14, 8, 8, 8, 8, 8, 8, 9, 9, 6, 6, 17, 17, 16, 16, 6, 6, 12, 12, 15, 15, 7, 7, 8, 8, 16, 16, 6, 6, 7, 7, 8, 8, 15, 15, 10, 10, 10, 10, 14, 14, 18, 18, 8, 8, 7, 7, 16, 16, 8, 8]
gt_masks2=[]
cumulative_sum = np.cumsum(slices)
for i,element in enumerate(cumulative_sum):
    if i==0:
        #images_separated.append(resized_masks[0:element])
        gt_masks2.append(g_t[0:element])
    else:
        #images_separated.append(resized_masks[cumulative_sum[i-1]:element])
        gt_masks2.append(g_t[cumulative_sum[i-1]:element])

In [None]:
# Initialize lists to store the areas for each patient
patients_areas1_2=[]
patients_areas2_2=[]
patients_areas3_2=[]

# Loop over the masks in the gt_masks2 list
for indx in range(len(gt_masks2)):
    # Get the shape of the current mask
    num,ch,d1,d2=np.shape(gt_masks2[indx])

    # Initialize lists to store the areas for each mask
    area_1=[]
    area_2=[]
    area_3=[]

    # Loop over the number of masks
    for i in range(num):
        # Calculate the unique values and their counts in the current mask
        values, counts = np.unique (gt_masks2[indx][i][0].flatten(), return_counts=True)

        # Get the height and width of the current mask
        height,width=np.shape(gt_masks2[indx][i][0])

        # Calculate the ratio of counts to total pixels in the mask
        area_ratio1 = (counts / (height*width))

        # Subtract the first element of area_ratio1 from 1 and append it to area_1
        area_ratio1= 1-area_ratio1[0]
        area_1.append(area_ratio1)

        # Repeat the above process for the second and third channels of the current mask, appending to area_2 and area_3 respectively
        values, counts = np.unique (gt_masks2[indx][i][1].flatten(), return_counts=True)
        height,width=np.shape(gt_masks2[indx][i][1])
        area_ratio2 = (counts / (height*width))
        area_ratio2= 1-area_ratio2[0]
        area_2.append(area_ratio2)

        values, counts = np.unique (gt_masks2[indx][i][2].flatten(), return_counts=True)
        height,width=np.shape(gt_masks2[indx][i][2])
        area_ratio3 = (counts / (height*width))
        area_ratio3= 1-area_ratio3[0]
        area_3.append(area_ratio3)

    # Append the lists of areas for each mask to the corresponding list for each patient
    patients_areas1_2.append(area_1)
    patients_areas2_2.append(area_2)
    patients_areas3_2.append(area_3)

To store areas for each patient, this code initializes three empty lists (a1, a2, and a3). Then, after performing a loop over the items in the three lists (patients_areas1_2, patients_areas2_2, and patients_areas3_2), the following actions are taken:

It enters the if statement if the length of the current element is 10.

Depending on the list being processed, it appends the current element to a1 (in the first loop), a2 (in the second loop), or a3 (in the third loop).

In [None]:
# Initialize lists to store the areas for each patient
a1=[]
a2=[]
a3=[]

# Loop over the elements in the patients_areas1_2 list
for element in patients_areas1_2:
    # If the length of the current element is 10
    if len(element)==10:
        # Append the element to the a1 list
        a1.append(element)

# Repeat the above process for the patients_areas2_2 and patients_areas3_2 lists, appending to the a2 and a3 lists respectively
for element in patients_areas2_2:
    if len(element)==10:
        a2.append(element)

for element in patients_areas3_2:
    if len(element)==10:
        a3.append(element)

The function average_lists, which is defined in this code, determines the average of matching elements from a list of sublists. The input list is represented by the function's sole input parameter, lst. Here is a list of the code's functions:



The variable num_lists is set to the number of sublists in the lst.



It determines the longest sublist in lst and stores it in the variable max_len.


It creates an empty list called result and initializes it to hold the average values.

It iterates through the exclusive range from 0 to max_len.

The variables total and count are initialized inside the loop to keep track of the sum and the quantity of elements at the current index i.

The entire lst's sublists are looped over.

The element at index i is added to total and the count is increased if the index i is inside the boundaries of the current sublist.


The average of the elements at index i is calculated by dividing total by count and is added to the result list if count is larger than 0.

In [None]:
def average_lists(lst):
    # Get the number of lists in lst
    num_lists = len(lst)

    # Get the maximum length of the sublists in lst
    max_len = max(len(sublist) for sublist in lst)

    # Initialize a list to store the result
    result = []

    # Loop over the range from 0 to max_len
    for i in range(max_len):
        # Initialize variables to store the total and count of elements at index i
        total = 0
        count = 0

        # Loop over the sublists in lst
        for sub_list in lst:
            # If index i is within the bounds of the current sublist
            if i < len(sub_list):
                # Add the element at index i to the total and increment the count
                total += sub_list[i]
                count += 1

        # If count is greater than 0, append the average of the elements at index i to the result list
        if count > 0:
            result.append(total / count)

    # Return the result list
    return result

In [None]:
# Calculate the average of the lists in a1 and store the result in reference1
reference1=average_lists(a1)

# Calculate the average of the lists in a2 and store the result in reference2
reference2=average_lists(a2)

# Calculate the average of the lists in a3 and store the result in reference3
reference3=average_lists(a3)

The function find_min_mse_list returns the index of the list inside the list of lists that has the least mean squared error (MSE) in relation to the reference list and accepts a reference list and a list of lists as inputs. The MSE between each list and the reference list is calculated, and the minimum MSE and its index are tracked as it loops across the lists in the list of lists.


An image, a new height, and a new width are passed as parameters to the resize_image function, which returns a new image with the new height and width. The new image is created by centering the previous image and filling any remaining space with zeros. The function determines the padding values for each side of the image, creates a new image that is entirely made of zeros, determines the starting and ending indices for positioning the original image in the center of the new image, places the original image there, and sets any additional values in the first channel to 1.

In [None]:
def find_min_mse_list(reference_list, list_of_lists):
    min_mse = np.inf
    min_mse_index = None

    for i, current_list in enumerate(list_of_lists):
        mse = np.mean((np.array(current_list) - np.array(reference_list)) ** 2)
        if mse < min_mse:
            min_mse = mse
            min_mse_index = i

    return min_mse_index,min_mse

def resize_image(image, new_height, new_width):
    original_height, original_width = image.shape

    # Calculate padding values
    pad_top = (new_height - original_height) // 2
    pad_bottom = new_height - original_height - pad_top
    pad_left = (new_width - original_width) // 2
    pad_right = new_width - original_width - pad_left

    # Create new image with zeros
    resized_image = np.zeros((4, new_height, new_width), dtype=image.dtype)

    # Calculate starting and ending indices for original image placement
    start_row = pad_top
    end_row = pad_top + original_height
    start_col = pad_left
    end_col = pad_left + original_width

    # Assign original image to the center of the new image
    resized_image[:, start_row:end_row, start_col:end_col] = image

    # Set the new added values to 1 for the first channel
    resized_image[0, :pad_top, :] = 1
    resized_image[0, end_row:, :] = 1
    resized_image[0, :, :pad_left] = 1
    resized_image[0, :, end_col:] = 1

    return resized_image

This code scans photos and estimates mean squared errors (MSE) for distinct elements. New_images_label1 and mses1 are two empty lists that are initialized. Then, after performing actions such as obtaining photos, locating the minimal MSE, and figuring out the design of ground truth masks, it loops over the elements again. However, the offered code snippet is incomplete and does not include further explanation of what happens after locating the minimum MSE.

In [None]:
# Initialize lists to store the new images and mean squared errors
new_images_label1=[]
mses1=[]

# Loop over the elements in the patients_areas1 list
for i,element in enumerate(patients_areas1):
    # Initialize a list to store the mean squared errors for the current element
    mse=[]

    # If the length of the current element is 10
    if len(element)==10:
        # Get the images for the current patient
        img2 = images_separated[i]

        # Initialize a list to store the new images for the current patient
        imgs=[]

        # Get the shape of the ground truth masks for the current patient
        s,new_height,new_width,ch=np.shape(gt_masks[i])

        # Use the find_min_mse_list function to find the index of the list in reorganize_lists(element) that has the minimum MSE with respect to reference2
        x,m=find_min_mse_list(reference2,reorganize_lists(element))

This program analyzes photos and computes mean squared errors (MSE) for various components. New_images_label2 and mses2 are two empty lists that are initialized. Then, it runs over the items and carries out a number of activities, such as extracting photos, figuring out a threshold, locating the minimal MSE, resizing images, and producing new binary images. The updated pictures and MSE scores are then added to the appropriate output lists.

In [None]:
# Initialize lists to store the new images and mean squared errors
new_images_label2=[]
mses2=[]

# Loop over the elements in the patients_areas2 list
for i,element in enumerate(patients_areas2):
    # Initialize a list to store the mean squared errors for the current element
    mse=[]

    # If the length of the current element is 10
    if len(element)==10:
        # Get the images for the current patient
        img2 = images_separated[i]

        # Initialize a list to store the new images for the current patient
        imgs=[]

        # Get the shape of the ground truth masks for the current patient
        s,new_height,new_width,ch=np.shape(gt_masks[i])

        # Use the find_min_mse_list function to find the index of the list in reorganize_lists(element) that has the minimum MSE with respect to reference2
        x,m=find_min_mse_list(reference2,reorganize_lists(element))

        # Get the threshold value at index x
        th=thresholds[x]

        # Append m to the mse list
        mse.append(m)

        # Loop over the slices in img2
        for slices in img2:
            # Resize the current slice and threshold it by th, then append it to imgs
            sl = resize_image(slices[2],new_height,new_width)
            imgs.append(sl[0,:,:]>th)

        # Append imgs to new_images_label2
        new_images_label2.append(imgs)

    # Append mse to mses2
    mses2.append(mse)

This program analyzes photos and computes mean squared errors (MSE) for various list elements. Two empty lists, new_images_label3 and mses3, are initialized. Then, it runs through the patients_areas3 list's components. It carries out a number of processes for each element, such as retrieving the photos for the current patient, calculating a threshold based on the minimum MSE, resizing photographs, and producing new binary images, as well as finding the minimum MSE between a reference list and the element in question. The updated pictures and MSE scores are then added to the appropriate output lists.

In [None]:
# Initialize lists to store the new images and mean squared errors
new_images_label3=[]
mses3=[]

# Loop over the elements in the patients_areas3 list
for i,element in enumerate(patients_areas3):
    # Initialize a list to store the mean squared errors for the current element
    mse=[]

    # If the length of the current element is 10
    if len(element)==10:
        # Get the images for the current patient
        img2 = images_separated[i]

        # Initialize a list to store the new images for the current patient
        imgs=[]

        # Get the shape of the ground truth masks for the current patient
        s,new_height,new_width,ch=np.shape(gt_masks[i])

        # Use the find_min_mse_list function to find the index of the list in reorganize_lists(element) that has the minimum MSE with respect to reference2
        x,m=find_min_mse_list(reference2,reorganize_lists(element))

        # Get the threshold value at index x
        th=thresholds[x]

        # Append m to the mse list
        mse.append(m)

        # Loop over the slices in img2
        for slices in img2:
            # Resize the current slice and threshold it by th, then append it to imgs
            sl = resize_image(slices[3],new_height,new_width)
            imgs.append(sl[0,:,:]>th)

        # Append imgs to new_images_label3
        new_images_label3.append(imgs)

    # Append mse to mses3
    mses3.append(mse)

This algorithm determines the Dice coefficients between each patient's fresh photos and their ground truth masks. This is accomplished by iterating through the patients in the new_images_label1, new_images_label2, and new_images_label3 lists, then iterating through the slices for each patient and calculating the Dice coefficient between the current slice and the corresponding ground truth mask using the dice_coefficient function. The dices1, dices2, and dices3 lists, respectively, provide the Dice coefficients. The Dice coefficient, which ranges from 0 (no overlap) to 1 (perfect overlap), is a metric for comparing two sets. It is applied here to gauge how closely the fresh photos resemble the ground truth masks.

In [None]:
# Initialize lists to store the Dice coefficients for each patient
dices1=[]
dices2=[]
dices3=[]

# Loop over the patients in new_images_label1
for idx, patient in enumerate(new_images_label1):
    # Initialize a list to store the Dice coefficients for the current patient
    dices_2=[]

    # Loop over the slices for the current patient
    for idx2, slices in enumerate(patient):
        # Get the corresponding ground truth mask
        slices2=gt1[idx][idx2][:,:,0]

        # Calculate the Dice coefficient between slices and slices2, then append it to dices_2
        d=dice_coefficient(np.flip(slices),slices2)
        dices_2.append(d)

    # Append dices_2 to dices1
    dices1.append((dices_2))

# Repeat the above process for new_images_label2 and new_images_label3, appending to dices2 and dices3 respectively
for idx, patient in enumerate(new_images_label2):
    dices_2=[]
    for idx2, slices in enumerate(patient):
        slices2=gt2[idx][idx2][:,:,0]
        d=dice_coefficient(slices,slices2)
        dices_2.append(d)
    dices2.append((dices_2))

for idx, patient in enumerate(new_images_label3):
    dices_2=[]
    for idx2, slices in enumerate(patient):
        slices2=gt3[idx][idx2][:,:,0]
        d=dice_coefficient(slices,slices2)
        dices_2.append(d)
    dices3.append(dices_2)

# **Testing Secret Dataset**

This snipped code has been used to test the secret test we were given.

Let's start by reading the images of the new patients.

In [None]:
def read_data2(path,mode="training",label="images"):
    path_img=[]
    total= path+mode+"/"+label
    files=os.listdir(total)
    paths=[]
    for element in files:
      paths.append(total+'/'+element)
    paths.sort()
    print(paths)

    for element in (paths):
            path_slice = element+"/"
            file_list = glob.glob(glob.escape(path_slice[0]) + "*.gz")
            sorted_file_list = sorted(file_list)
            path_img.append(element)

    return path_img

In [None]:
path = '/content/drive/MyDrive/database/'
path_img = read_data2(path,mode="testin2",label="images")

In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the weights of the network trained in the training phase
state_dict = torch.load("/content/drive/MyDrive/results_0.680.pth")

# Define your network architecture
net = monai.networks.nets.UNet(
    spatial_dims=2,
    in_channels=1,
    out_channels=4,
    channels=(8, 16, 32, 64, 128),
    strides=(2, 2, 2, 2),
    num_res_units=2,
).to(device)

# Load the state dictionary into the model
net.load_state_dict(state_dict)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net = net.to(device)

from monai.transforms import (
    Compose, LoadImage, AddChannel, RandFlip, RandRotate, ToTensor, CenterSpatialCropd,
    ScaleIntensity, AsDiscrete
)
import torch
import numpy as np

class LoadAndTransform:
    def __init__(self):
        self.transform = Compose([
            LoadImaged(keys=['img']),
            AddChanneld(keys=['img']),
            ScaleIntensityd(keys=['img']),  # normalize intensities of the image
            CenterSpatialCropd(keys=['img'], roi_size=[128, 128]),
            ToTensorD(keys=['img'])
        ])

    def __call__(self, sample):
        data = {'img': sample['img']}  # Assuming 'img' is the key for the image data
        return self.transform(data)


train_transform = LoadAndTransform()
train_data = pd.DataFrame({'img':path_img})

train_data = train_data.to_dict('records')
train_dataset = monai.data.CacheDataset(train_data, train_transform)
dataloader_train = DataLoader(dataset=train_dataset, batch_size=16, shuffle=True)

# Set the model to evaluation mode
net.eval()

# Create a list to store the predicted masks
masks = []

# Run inference on the dataset
with torch.no_grad():
    for batch in dataloader_train:
        images = batch["img"].to(device)
        outputs=net(images)
        masks.append(outputs)

# Concatenate the predicted masks into a single tensor
masks = torch.cat(masks, dim=0)

The code loops through each element in the path_img list and checks if a specific condition is met. The condition compares a substring of the element's filename (element[47:-10]) with the string 'patient152_frame01'. If the condition is true, it means that the element corresponds to a specific slice identified as "patient152_frame01". In such cases, the slices variable is incremented by 1, effectively counting the number of slices that meet the condition.

In [None]:
slices = 0  # Initialize a variable to count the number of slices
for element in path_img:  # Iterate through each element in the path_img list
  if element[47:-10] == 'patient152_frame01':  # Check if the specific condition is met
    slices = slices + 1  # Increment the slice count by 1

This code snippet reads an image, retrieves its shape, and also obtains the shape information of a masks array.

In [None]:
image = sitk.ReadImage(path_img[14]) # make sure the index you are setting corresponds to the subject we want to get the size from
image = sitk.GetArrayFromImage(image)
s1,s2 = np.shape(image) # get desired shape
image,channels,s1_1,s2_1=np.shape(masks)
shape = (slices, channels,s1,s2)  # Specify the desired shape of the array
print(shape)

The scope of this code is to create an empty NumPy array of a specific shape and populate it with masks from the masks array. It then saves the resulting array as a NIfTI file.

In [None]:
empty_array = np.empty(shape)

# get sizes to reseize
m,n,s1_target,s2_target=(np.shape(empty_array))
count=0
for element in range(len(path_img)):
  if path_img[element][47:-10]=='patient152_frame01':
      mask=masks[count,:,:,:].detach().cpu().numpy()
      # threshold that can be implemented we can modify to get better results

      mask[0,:,:]=mask[0,:,:]<0
      mask[1,:,:]=mask[1,:,:]>0
      mask[2,:,:]=mask[2,:,:]>0
      mask[3,:,:]=mask[3,:,:]>0

     # desired shape for the masks

      desired_shape = (s1_target, s2_target)
      pad_height = max(0, (desired_shape[0] - 128) // 2)
      pad_width = max(0, (desired_shape[1] - 128) // 2)
      mask1= np.pad(mask[0,:,:], ((pad_height, pad_height), (pad_width, pad_width)), mode='constant')
      mask2 = np.pad(mask[1,:,:], ((pad_height, pad_height), (pad_width, pad_width)), mode='constant')
      mask3 = np.pad(mask[2,:,:], ((pad_height, pad_height), (pad_width, pad_width)), mode='constant')
      mask4 = np.pad(mask[3,:,:], ((pad_height, pad_height), (pad_width, pad_width)), mode='constant')

    # fill in array

      empty_array[count,0,:,:]=mask1
      empty_array[count,1,:,:]=mask2
      empty_array[count,2,:,:]=mask3
      empty_array[count,3,:,:]=mask4
      count=count+1

path1='/content/drive/MyDrive/database/testin2/masks/'
path2='patient152_frame01'+"_seg"+".nii.gz"

affine = np.eye(4)
nifti_file = nibabel.Nifti1Image(empty_array, affine)
nibabel.save(nifti_file, path1+path2)

Printing and visualizing masks obteined.

In [None]:
image = sitk.ReadImage('/content/drive/MyDrive/database/testin2/masks/patient152_frame01_seg.nii.gz')
image = sitk.GetArrayFromImage(image)
print(np.shape(image))
plt.imshow(image[:,:,2,8])

# **Classification**

Different typologies of CNNs have been tested for this task. Instead of using the actual images as inputs to the neural network, segmentation masks that emphasize the regions of interest were employed. This made sure that the network focused on the important areas and was not diverted by noise or background information. SqueezeNet, VGG, AlexNet, DenseNet121, and ResNet18 have been tried out. Namely ResNet have been implemented it from scratch with its regularization techniques: dropout, early stopping, batch normalization, and L2 regularization.

Due to the limitation of GPU we were not able to train the models properly (particularly ResNet 18 from scatch which is not pre-trained).

A future work would be to feed the networks with more information (like weight and height of the patients) in order to make it able to distinguish doubtful situations.

As in segmentation we need first of all to create proper dictionaries, this time reading the labels provided for each patient.

In [None]:
def read_data_classification(path, mode="training"):
    path_mask=[]
    group_labels=[]
    paths=glob.glob(data_path + mode + "/*")
    paths.sort()

    # define the group-to-index mapping
    group_to_idx = {'NOR': 0, 'MINF': 1, 'DCM': 2, 'HCM': 3, 'RV': 4} # modify this according to your groups

    for element in paths:
        if (element[-10:-3]) == "patient":
            path_slice = glob.glob(element + "/")
            cfg_path = os.path.join(element, "Info.cfg")

            # read group label from the cfg file
            group = None
            with open(cfg_path, 'r') as cfg_file:
                lines = cfg_file.readlines()
                for line in lines:
                    if 'Group' in line:
                        group = line.split(":")[1].strip()
                        break

            # convert group to integer index
            group_idx = group_to_idx[group]

            file_list = glob.glob(glob.escape(path_slice[0]) + "*.gz")
            sorted_file_list = sorted(file_list)
            for element in sorted_file_list:
                if (element[-9:-7])=="gt":
                    path_mask.append(element)
                    group_labels.append(group_idx)  # assign group index to the mask

    dicts = pd.DataFrame({'mask':path_mask, 'label':group_labels })
    return dicts

In [None]:
from sklearn.model_selection import train_test_split

data_path = '/content/'

# Read data for training
train_data_classification = read_data_classification(data_path, mode='training')

# Split the training data into train and validation sets
train_data, val_data = train_test_split(train_data_classification, test_size=0.2, random_state=42)

# Convert the DataFrames to lists of dictionaries
train_dicts_classification = train_data.to_dict('records')
val_dicts_classification = val_data.to_dict('records')

# Read data for testing
test_dicts_classification = read_data_classification(data_path, mode='testing')
test_dicts_classification = test_dicts_classification.to_dict('records')

In [None]:
from monai.transforms import (
    Compose, LoadImaged, AddChanneld, RandFlipd, RandRotated, ToTensorD, CenterSpatialCropd, SqueezeDimd
)

class LoadAndTransform_classification:
    def __init__(self):
        self.transform = Compose([
            LoadImaged(keys=['mask']),  # Load the mask using the specified key
            AddChanneld(keys=['mask']),  # Add a channel dimension to the mask
            CenterSpatialCropd(keys=['mask'], roi_size=[128, 128, 1]),  # Crop the mask to a specific size
            SqueezeDimd(keys=['mask'], dim=-1),  # Remove the last dimension from the mask
            ToTensorD(keys=['mask'])  # Convert the mask to a PyTorch tensor
        ])

    def __call__(self, sample):
        return self.transform(sample)  # Apply the defined transformation to the input sample

train_transform = LoadAndTransform_classification()  # Instantiate the LoadAndTransform_classification class for training data
train_dataset_classification = monai.data.CacheDataset(train_dicts_classification, train_transform)  # Create a dataset using the training transformation

val_transform = LoadAndTransform_classification()  # Instantiate the LoadAndTransform_classification class for validation data
val_dataset_classification = monai.data.CacheDataset(val_dicts_classification, val_transform)

test_transform = LoadAndTransform_classification()  # Instantiate the LoadAndTransform_classification class for test data
test_dataset_classification = monai.data.CacheDataset(test_dicts_classification, test_transform)  # Create a dataset using the test transformation

In [None]:
# Create the dataloader
dataloader_train_classification = DataLoader(dataset=train_dataset_classification, batch_size=16, shuffle=True)
dataloader_val_classification = DataLoader(dataset=val_dataset_classification, batch_size=16, shuffle=True)
dataloader_test_classification = DataLoader(dataset=test_dataset_classification, batch_size=16, shuffle=False)

The first framework used for the purpose of image classification is ResNet 18, here implemented from the scratch.

ResNet18 is a convolutional neural network architecture part of the ResNet family of models, which was designed to overcome the problem of vanishing gradients in very deep neural networks. The vanishing gradient problem occurs when gradients become very small during backpropagation, making it difficult to train deep neural networks.

ResNet18 is made up of 18 layers, including several residual blocks. Residual blocks are a key feature of the ResNet architecture and are designed to enable the efficient training of very deep neural networks. In a residual block, the input to a layer is added to the output of the layer after passing through one or more convolutional layers. This so-called skip connection enables the network to learn residual functions, which can be easier to optimize than the original functions.

The techniques of Droupout as well as L2-normalization are added as regularization techniques in order to prevent the phenomenon of Early Stopping.

In [None]:
class ResidualBlock(nn.Module):

    def __init__(self, inchannel, outchannel, stride=1):

        super(ResidualBlock, self).__init__()

        # First convolutional layer
        self.conv1 = nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(outchannel)

        # Second convolutional layer
        self.conv2 = nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(outchannel)

        # Shortcut connection to add to output
        self.shortcut = nn.Sequential()
        if stride != 1 or inchannel != outchannel:
            self.shortcut = nn.Sequential(
                nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outchannel)
            )

    def forward(self, x):

        # First convolutional layer
        out = F.relu(self.bn1(self.conv1(x)))

        # Second convolutional layer
        out = self.bn2(self.conv2(out))

        # Shortcut connection
        out = out + self.shortcut(x)

        # ReLU activation
        out = F.relu(out)

        return out


class ResNet(nn.Module):

    def __init__(self, ResidualBlock, num_classes=12):
        super(ResNet, self).__init__()

        self.inchannel = 6

        self.conv1 = nn.Sequential(
            # define first convolution layer

            nn.Conv2d(3, 6, kernel_size=(2, 2), stride=1, padding=1, bias=False),
            nn.BatchNorm2d(6),
            nn.ReLU()
        )

        # define the first residual block layer
        self.layer1 = self.make_layer(ResidualBlock, 32, 2, stride=1)

        # define the second residual block layer
        self.layer2 = self.make_layer(ResidualBlock, 64, 2, stride=2)

        # define the third residual block layer
        self.layer3 = self.make_layer(ResidualBlock, 128, 2, stride=2)

        # define the fourth residual block layer
        self.layer4 = self.make_layer(ResidualBlock, 512, 2, stride=2)

        # define global average pooling layer
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))

        # define the fully connected layer
        self.fc = nn.Linear(512, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(p=0.5)


    def make_layer(self, block, channels, num_blocks, stride):

        strides = [stride] + [1] * (num_blocks - 1)

        layers = []

        for stride in strides:

            # create a residual block layer and append it to layers
            layers.append(block(self.inchannel, channels, stride))
            self.inchannel = channels

        # create a sequential layer using the created residual blocks
        return nn.Sequential(*layers)

    def forward(self, x):

        # pass the input through the first convolution layer
        out = self.conv1(x)

        # pass the input through the first residual block layer
        out = self.layer1(out)

        # pass the input through the second residual block layer
        out = self.layer2(out)

        # pass the input through the third residual block layer
        out = self.layer3(out)

        # pass the input through the fourth residual block layer
        out = self.layer4(out)

        # perform global average pooling on the output of the last residual block layer
        out = self.global_avg_pool(out)

        # flatten the output tensor
        out = out.view(out.size(0), -1)

        # L2 normalization
        out = F.normalize(out, p=2, dim=1, eps=1e-12)

        # Dropout
        out = self.dropout(out)

        # pass the output tensor through the fully connected layer
        out = self.fc(out)
        return out

def ResNet18():
  return ResNet(ResidualBlock)

In the following code cell we want to test other frameworks imported directly from the PyTorch library. Specifically we use Alexnet, VGG, Squeezenet, and Densenet.

AlexNet is a deep convolutional neural network architecture that was proposed by Alex Krizhevsky, Ilya Sutskever, and Geoffrey Hinton in 2012. Some of its most important features include:

AlexNet has a total of 8 layers, including 5 convolutional layers, 2 fully connected layers, and 1 softmax layer;
It uses the rectified linear unit (ReLU) activation function, which helps to improve training time and reduce the risk of the vanishing gradient problem;
AlexNet introduced the concept of overlapping pooling, which means that the pooling regions of adjacent pooling layers overlap instead of being disjoint;
The architecture also includes local response normalization (LRN), which helps to normalize the response of adjacent neurons and improves the generalization of the model;
AlexNet was trained on the ImageNet dataset, which contains millions of labeled images across 1,000 different categories
VGG (Visual Geometry Group) is a convolutional neural network architecture that was proposed by Karen Simonyan and Andrew Zisserman from the University of Oxford in 2014. The VGG architecture is known for its simplicity and effectiveness in deep learning for computer vision tasks. Some key features of VGG are:

VGG has a total of 19 layers, including 16 convolutional layers and 3 fully connected layers;
The convolutional layers in VGG are composed of 3x3 filters, which are stacked on top of each other to form deeper representations;
The pooling layers in VGG use max pooling with a 2x2 filter size and a stride of 2;
VGG uses the rectified linear unit (ReLU) activation function, which helps to improve training time and reduce the risk of the vanishing gradient problem;
VGG was also trained on the ImageNet dataset, which contains millions of labeled images across 1,000 different categories.
SqueezeNet is a convolutional neural network architecture that was proposed by Forrest N. Iandola, Song Han, Matthew W. Moskewicz, Khalid Ashraf, William J. Dally, and Kurt Keutzer from the University of California, Berkeley in 2016. The SqueezeNet architecture is known for its small size, which makes it well-suited for resource-constrained environments such as mobile devices and embedded systems. Here are some key features of SqueezeNet:

SqueezeNet is a small network with a total of only 23 layers, which is much smaller than other popular architectures such as AlexNet and VGG;
It achieves its small size by using a combination of 1x1 convolutional filters and a technique called "fire modules," which are designed to balance the number of filters and the computational cost;
SqueezeNet also uses a technique called "deep compression," which reduces the size of the network even further by applying pruning and quantization techDespite its small size, SqueezeNet achieves competitive accuracy on the ImageNet dataset, which contains millions of labeled images across 1,000 different categories.
The following function is to use the pre-trained model only to extract image features. This can be useful to save computational time and resources, as there is no need to calculate gradients and update model weights.

In [None]:
def set_parameter_requires_grad(model, feature_extracting):

    """
    A function that sets the requires_grad attribute of model parameters
    to False if feature_extracting is True, and to True otherwise.

    Args:
    - model: a PyTorch model
    - feature_extracting: a boolean value that indicates whether we want
      to extract features from the model (True) or fine-tune the model (False)

    Returns:
    - None
    """

    # If we're only extracting features, we don't want to update the model parameters
    if feature_extracting:
        # Loop over all parameters in the model and set their requires_grad attribute to False
        for param in model.parameters():
            param.requires_grad = False

In [None]:
# Models to choose from [resnet, alexnet, vgg, squeezenet, densenet]
model_name = "resnet"

# Number of classes in the dataset
num_classes = 5

# Flag for feature extracting. When False, we finetune the whole model,
#   when True we only update the reshaped layer params
feature_extract = True

def initialize_model(model_name, num_classes, feature_extract, use_pretrained):
    # Initialize these variables which will be set in this if statement. Each of these
    #   variables is model specific.
    model_ft = None
    input_size = 0

    if model_name == "resnet":
        """ Resnet18
        """
        model_ft = models.resnet18(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 256

    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 128

    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg11_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "squeezenet":
        """ Squeezenet
        """
        model_ft = models.squeezenet1_0(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
        model_ft.num_classes = num_classes
        input_size = 224

    elif model_name == "densenet":
        """ Densenet
        """
        model_ft = models.densenet121(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier.in_features
        model_ft.classifier = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "ResNet18":
        model_ft = ResNet18()
        input_size = 224

    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

# Initialize the model for this run
model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=False)

# Print the model we just instantiated
print(model_ft)

In [None]:
# Gather the parameters to be optimized/updated in this run. If we are finetuning we will be updating all parameters. However, if we are doing
# feature extract method, we will only update the parameters that we have just initialized, i.e. the parameters with requires_grad is True

def update_params_to_learn(model_ft, feature_extract):

    """
    A function that determines which parameters of a PyTorch model should be
    updated during training, based on whether we want to fine-tune the model
    (feature_extract = False) or extract features (feature_extract = True).

    Args:
    - model_ft: a PyTorch model
    - feature_extract: a boolean value that indicates whether we want to
      extract features from the model (True) or fine-tune the model (False)

    Returns:
    - params_to_update: a list of parameters to be updated during training
    """

    # Get all parameters of the model
    params_to_update = model_ft.parameters()

    # If we're only extracting features, we only want to update certain parameters
    if feature_extract:
        # Create an empty list to store the parameters we want to update
        params_to_update = []
        # Loop over all named parameters in the model and check if requires_grad is True
        for name, param in model_ft.named_parameters():
            if param.requires_grad == True:
                # If the parameter should be updated, append it to the list
                params_to_update.append(param)
                print("\t", name)
    # If we're fine-tuning the model, update all parameters with requires_grad=True
    else:
        for name, param in model_ft.named_parameters():
            if param.requires_grad == True:
                print("\t", name)

    return params_to_update

Here below we can choose which one between the neural networks architectures previously mentioned we want to test, just changing the parameter "model_name"

In [None]:
# Set the GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Set some parameters
epoch = 50
learning_rate = 0.001
batch_size=1
momentum=0.9
weight_decay=5e-4

# Set the model we are going to use
model_ft = model_ft.to(device)

# Loss funtion
cost_function = nn.CrossEntropyLoss()

# Optimizer
optimizer = optim.SGD(model_ft.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)

Early stopping is a technique used to prevent overfitting and it is based on the idea that the model's performance on the validation set will stop improving after a certain number of training epochs, and continuing to train beyond this point will only lead to overfitting.

In this case the monitor is on the perplexity: after three times we see the perplexity increasing we stop the training and get the final value.

There are several benefits to using early stopping:

It can help prevent overfitting by stopping the training process before the model starts to overfit the training data;
It can save time and resources by avoiding the need to train the model for a large number of epochs;
It can improve the generalization ability of the model by avoiding the use of models that are overfitted to the training data.

In [None]:
class EarlyStopping:

  """
    A class for early stopping of training process.

    Args:
        patience (int): Number of epochs to wait before triggering early stopping. Default: 5
        verbose (bool): Flag indicating whether to print messages when early stopping is triggered. Default: True

    Attributes:
        patience (int): Number of epochs to wait before triggering early stopping.
        verbose (bool): Flag indicating whether to print messages when early stopping is triggered.
        counter (int): Counter for the number of epochs since the last improvement.
        best_score (float): Best score seen so far. None by default.
  """

  def __init__(self, patience=5, verbose=True):

    # set the number of epochs to wait before triggering early stopping
    self.patience = patience

    # set a flag indicating whether to print messages when early stopping is triggered
    self.verbose = verbose

    # initialize the counter for the number of epochs since the last improvement
    self.counter = 0

    # initialize the best score seen so far to None
    self.best_score = None

  def step(self, val_loss):

    """
        Function to check if early stopping should be triggered.

        Args:
            val_loss (float): Current validation loss.

        Returns:
            bool: Whether early stopping should be triggered or not.
    """

    # if this is the first epoch, set the best score to the current validation loss
    if self.best_score is None:

      self.best_score = val_loss

    # if the current validation loss is worse than the best score seen so far
    elif val_loss > self.best_score:

      # increment the counter
      self.counter += 1

      # if the counter has reached the patience threshold
      if self.counter >= self.patience:

        # if verbose is True, print a message indicating that early stopping has been triggered
        if self.verbose:

          print(f'Early stopping triggered with counter {self.counter} and patience {self.patience}')

        # return True to indicate that early stopping should be triggered
        return True

    # if the current validation loss is better than the best score seen so far

    else:

      # set the best score to the current validation loss
      self.best_score = val_loss

      # reset the counter
      self.counter = 0

    # if the counter has not reached the patience threshold, return False
    return False

The training_step function is a Python function that represents a training step (training step) of a neural network. The function takes as input a neural network (net), a training set (train_loader), a validation set (valid_loader), an optimizer (optimizer), a cost function (cost_function), and a device (device) on which to run the training (by default "cuda," indicating an NVIDIA GPU).

The function first sets the neural network to training mode (net.train()), then runs a training cycle on the training set (for inputs, targets in train_loader:). For each batch of inputs and targets in the training set, the function performs the following steps:

Resizing the input image to a specific size. Moving the inputs and targets to the specified device (device). Zeroing the gradients of the optimizer (optimizer.zero_grad()). Forward pass of the neural network (outputs = net(inputs)). Calculation of the cost function (loss = cost_function(outputs, targets)). Backward pass to calculate gradients (loss.backward()). Update of neural network parameters (optimizer.step()). Calculating training accuracy (train_acc) and tracking training loss (train_loss). After running the training cycle on the training set, the function runs a validation cycle on the validation set (for inputs, targets in valid_loader:). For each batch of inputs and targets in the validation set, the function performs steps 1 through 5, calculates the validation accuracy (valid_acc) and tracks the validation loss (valid_loss).

Finally, the function returns the average training loss (train_loss), average training accuracy (train_acc), average validation loss (valid_loss) and average validation accuracy (valid_acc) as a tuple. These values can be used to monitor and evaluate the performance of the neural network during training.

In [None]:
def training_step(net, train_loader, val_loader, optimizer, cost_function, device='cuda'):

    # set the network to training mode
    net.train()

    # train loop
    train_loss = 0.0
    train_acc = 0.0
    num_train_samples = 0

    # iterate over training data
    for batch in train_loader:

        # move inputs and targets to specified device
        inputs = batch['mask'].float().to(device)
        targets = batch['label'].to(device)
        # repeat the grayscale channel 3 times to create a 3-channel image
        inputs = inputs.repeat(1, 3, 1, 1)

        # zero the optimizer gradients
        optimizer.zero_grad()

        # forward pass
        outputs = net(inputs)

        # compute the loss
        loss = cost_function(outputs, targets)

        # backward pass
        loss.backward()

        # update model parameters
        optimizer.step()

        # compute training accuracy
        _, predicted = outputs.max(1)
        num_train_samples += targets.size(0)
        train_acc += predicted.eq(targets).sum().item()

        # track training loss
        train_loss += loss.item()

    # compute average training loss and accuracy
    train_loss /= num_train_samples
    train_acc /= num_train_samples

    # set the network to evaluation mode
    net.eval()

    # validation loop
    val_loss = 0.0
    val_acc = 0.0
    num_val_samples = 0

    with torch.no_grad():
        for batch in val_loader:
            inputs = batch['mask'].float().to(device)
            targets = batch['label'].to(device)
            inputs = inputs.repeat(1, 3, 1, 1)

            outputs = net(inputs)
            loss = cost_function(outputs, targets)

            _, predicted = outputs.max(1)
            num_val_samples += targets.size(0)
            val_acc += predicted.eq(targets).sum().item()

            val_loss += loss.item()

    val_loss /= num_val_samples
    val_acc /= num_val_samples

    return train_loss, train_acc, val_loss, val_acc

The code cell in question defines a Python function called test_step, which is used to test a neural network during the validation or testing phase.

The function takes as input the neural network (net), a data loader (data_loader), a cost function (cost_function) and a device (device) on which to run the test.

At the beginning of the function, the neural network is set to evaluation mode (net.eval()). Next, a for loop is created to run through the data loader, which contains the test data set. For each batch of inputs and targets in the data loader, the function performs the following steps:

Moving the inputs and targets to the specified device (device). Forward pass of the neural network (outputs = net(inputs)). Calculating the cost function (loss = cost_function(outputs, targets)). Update of evaluation metrics, specifically: Calculating total accuracy (total_acc) by adding the total number of samples in the batch (targets.size(0)) to the variable total_acc. Calculation of correct accuracy (correct_acc) by adding the number of correct predictions ((predicted == targets).sum().item()) to the correct_acc variable. Calculating total loss (loss_acc) by summing the batch loss (loss.item()) multiplied by the total number of batch samples (targets.size(0)) to the variable loss_acc. At the end of the function, the function returns the average test loss (loss_acc / total_acc) and average test accuracy (correct_acc / total_acc) as a tuple. These values can be used to evaluate the performance of the neural network on the test data.

In [None]:
def test_step(net, data_loader, cost_function, device):
  net.eval()

  test_loss = 0.0
  test_acc = 0.0
  num_test_samples = 0

  with torch.no_grad():
      for batch in data_loader:
          inputs = batch['mask'].float().to(device)
          targets = batch['label'].to(device)
          inputs = inputs.repeat(1, 3, 1, 1)

          outputs = net(inputs)
          loss = cost_function(outputs, targets)

          _, predicted = outputs.max(1)
          num_test_samples += targets.size(0)
          test_acc += predicted.eq(targets).sum().item()

          test_loss += loss.item()

  test_loss /= num_test_samples
  test_acc /= num_test_samples

  return test_loss, test_acc #loss_acc / total_acc, correct_acc / total_acc

The provided code is a Python function named "main" that implements a complete iteration of a neural network training process for classification. The function takes as input the training configuration parameters such as the number of epochs, batch size, learning rate, weight decay, momentum, training data loader, validation data loader, test data loader, optimizer and cost function.

Within the function, an Early Stopping criterion is created, which is used to stop training when the validation loss does not improve for a certain number of epochs. Next, the function iterates over the specified number of epochs and at each iteration calls the training_step function to train the network based on the training and validation data loader.

During training, the model parameters with the lowest validation loss up to that point are saved. At the end of training, the function uses the saved model parameters to perform the final test on the test data.

Finally, the function returns the training, validation and test losses and accuracies as output. The code also provides the ability to save the trained model to disk for later use.



In [None]:
import copy

def main(batch_size,
         device,
         learning_rate,
         weight_decay,
         momentum,
         epochs,
         train_loader,
         val_loader,
         test_loader,
         optimizer,
         cost_function):

    # Create the early stopping criterion
    #criterion = EarlyStopping(patience=5, verbose=True)

    # initialize variables to keep track of best validation loss and corresponding model parameters
    best_loss = float('inf')
    best_params = None

    # iterate over the number of epochs
    for e in range(epochs):

        # train & log
        train_loss, train_acc, val_loss, val_acc = training_step(model_ft, train_loader, val_loader, optimizer, cost_function, device)

        # check if the validation loss has improved
        if val_loss < best_loss :
            best_loss = val_loss
            best_params = copy.deepcopy(model_ft.state_dict())

        # log epoch results
        print(f'Epoch {e+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}, Val Loss: {val_loss:.4f}, , Val Accuracy: {val_acc:.4f}')

    # load the best model parameters
    model_ft.load_state_dict(best_params)

    # test the model on the test set
    test_loss, test_acc = test_step(model_ft, test_loader, cost_function, device)

    # log test results
    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')

    return train_loss, train_acc, val_loss, val_acc, test_loss, test_acc

In [None]:
ltrain_loss = []
train_loss, train_acc, val_loss, val_acc, test_loss, test_acc = main(batch_size, device, learning_rate, weight_decay, momentum, epoch, dataloader_train_classification, dataloader_val_classification, dataloader_test_classification, optimizer, cost_function)