# **HW5**

In this notebook, our focus is on employing both 2D-UNet and 3D-UNet models to perform segmentation on a set of 30 volumetric medical images. Your task involves completing the designated sections of the notebook and subsequently comparing the outcomes achieved by the 2D and 3D models in terms of segmentation performance.

In [None]:
import cv2
import torch
import numpy as np
from blocks import *
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader

# Data

In this step, your objective is to load data from the provided numpy file. Given that the images have varying numbers of slices, your task is to add zero-padded slices to ensure that all images contain a standardized total of 208 slices



In [None]:
!wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=1DXe9f_8k2_iPdJsrz2ktMXmMw2G2qPGE' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=1DXe9f_8k2_iPdJsrz2ktMXmMw2G2qPGE" -O dataset.npy && rm -rf /tmp/cookies.txt

In [None]:
dataset_path = 'dataset.npy'
labeled_images = np.load(dataset_path, allow_pickle=True)
ORGAN_idx = 6
ORGAN = 'liver'
image_shape = (128, 128)
image_slices = 208

data_X = []
data_Y = []
for idx in range(len(labeled_images)):
    xx = labeled_images[idx].get("image")
    yy = labeled_images[idx].get("label")

    yy[np.where(yy != ORGAN_idx)] = 0
    yy[np.where(yy == ORGAN_idx)] = 1

    x = []
    y = []
    for i in range(len(xx)):
        x.append(cv2.resize(xx[i,:,:], image_shape))
        y.append(cv2.resize(yy[i,:,:], image_shape))
    x = np.asarray(x)
    y = np.asarray(y)

    # -------------------------------- YOUR CODE --------------------------------

    # -------------------------------- YOUR CODE --------------------------------

data_X = np.asarray(data_X)
data_Y = np.asarray(data_Y)
print(data_X.shape)
print(data_Y.shape)

In [None]:
n_train, n_valid = 20, 10

train_X = data_X[:n_train]
train_Y = data_Y[:n_train]

# valid_X = data_X[:n_train]
# valid_Y = data_Y[:n_train]

valid_X = data_X[n_train:n_train+n_valid]
valid_Y = data_Y[n_train:n_train+n_valid]

print(train_X.shape, train_Y.shape)
print(valid_X.shape, valid_Y.shape)

In the next cell write code to visualize some of the slices randomly from the dataset

In [None]:
# -------------------------------- YOUR CODE --------------------------------

# -------------------------------- YOUR CODE --------------------------------

# Utils

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
def pad_to_shape(this, shp):
    if len(shp) == 4:
        pad = (0, shp[3] - this.shape[3], 0, shp[2] - this.shape[2])
    elif len(shp) == 5:
        pad = (0, shp[4] - this.shape[4], 0, shp[3] - this.shape[3], 0, shp[2] - this.shape[2])
    return F.pad(this, pad)

Write a function that computes the dice score between a batch of prediction and ground truths.

In [None]:
def dice_score(y_pred_bin, y_true):
    """
    Args:
        y_pred_bin: shape => (batch_size, 1, h, w)
        y_true: shape => (batch_size, 1, h, w)

    Returns:
        : shape => (batch_size, dice_score)
    """

    # -------------------------------- YOUR CODE --------------------------------

    # -------------------------------- YOUR CODE --------------------------------

# Part 1: 2D UNet

In this section we are going to use a 2D UNet to train a segmentation model.

## Part 1.1: Model Definition

First we need to implement the model architecture. The necessary modules are created in the init functions. Complete the forward method for the UNet model.

In [None]:
class UNet2D(nn.Module):

    def __init__(self, in_channels, out_channels, conv_depths=(16, 32, 64, 128, 256)):
        assert len(conv_depths) > 2, 'conv_depths must have at least 3 members'
        super(UNet2D, self).__init__()

        # defining encoder layers
        encoder_layers = []
        encoder_layers.append(First2D(in_channels, conv_depths[0], conv_depths[0]))
        encoder_layers.extend([Encoder2D(conv_depths[i], conv_depths[i + 1], conv_depths[i + 1])
                               for i in range(len(conv_depths)-2)])

        # defining decoder layers
        decoder_layers = []
        decoder_layers.extend([Decoder2D(2 * conv_depths[i + 1], 2 * conv_depths[i], 2 * conv_depths[i], conv_depths[i])
                               for i in reversed(range(len(conv_depths)-2))])
        decoder_layers.append(Last2D(conv_depths[1], conv_depths[0], out_channels))

        # encoder, center and decoder layers
        self.encoder_layers = nn.Sequential(*encoder_layers)
        self.center = Center2D(conv_depths[-2], conv_depths[-1], conv_depths[-1], conv_depths[-2])
        self.decoder_layers = nn.Sequential(*decoder_layers)

    def forward(self, x, return_all=False):

        # -------------------------------- YOUR CODE --------------------------------

        # -------------------------------- YOUR CODE --------------------------------

## Part 1.2: Dataset Definition

In [None]:
class Dataset2D(Dataset):
    def __init__(self, x, y, Normalization = False):

        self.Normalization = Normalization
        self.slices_x = []
        self.slices_y = []
        for i in range(x.shape[0]):
            for j in range(x.shape[2]):
                sx = x[i, :, j, :, :]
                sy = y[i, :, j, :, :]
                if sy.sum() > 0:
                    self.slices_x.append(sx)
                    self.slices_y.append(sy)

    def __len__(self):
        return len(self.slices_x) # number of samples


    def __getitem__(self, index): # sampling method. used by DataLoader.
        x = self.slices_x[index]
        y = self.slices_y[index]
        if self.Normalization:
            x = (x - x.min()) / (x.max() - x.min())
        return x, y, index # we return the index as well for future use

## Part 1.3: Train

In [None]:
model = UNet2D(in_channels=1, out_channels=2)
model.to(device).float()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-1)
epochs = 50

In [None]:
train_loader = DataLoader(
    Dataset2D(train_X, train_Y, Normalization=True),
    batch_size=32,
    shuffle=True,
    num_workers=6
)

print('Train Loader Done')

valid_loader = DataLoader(
    Dataset2D(valid_X, valid_Y, Normalization=True),
    batch_size=32,
    shuffle=False,
    num_workers=6
)

print('Validation Loader Done')

samples_count = len(train_loader.dataset)
val_samples_count = len(valid_loader.dataset)

The necesary components are created. Now write the training loop and train your model. Report validation results during training and save the training log in the notebook.

In [None]:
# -------------------------------- YOUR CODE --------------------------------

# -------------------------------- YOUR CODE --------------------------------

# Part 2: 3D UNet

Now we want to use a 3D model and see if we can get better results. Complete the specified parts and train the model.

## Part 2.1: Model Definition

In [None]:
class UNet3D(nn.Module):
    def __init__(self, in_channels, out_channels, conv_depths=(16, 32, 64, 128, 256)):
        assert len(conv_depths) > 2, 'conv_depths must have at least 3 members'

        super(UNet3D, self).__init__()

        # defining encoder layers
        encoder_layers = []
        encoder_layers.append(First3D(in_channels, conv_depths[0], conv_depths[0]))
        encoder_layers.extend([Encoder3D(conv_depths[i], conv_depths[i + 1], conv_depths[i + 1])
                               for i in range(len(conv_depths)-2)])

        # defining decoder layers
        decoder_layers = []
        decoder_layers.extend([Decoder3D(2 * conv_depths[i + 1], 2 * conv_depths[i], 2 * conv_depths[i], conv_depths[i])
                               for i in reversed(range(len(conv_depths)-2))])
        decoder_layers.append(Last3D(conv_depths[1], conv_depths[0], out_channels))

        # encoder, center and decoder layers
        self.encoder_layers = nn.Sequential(*encoder_layers)
        self.center = Center3D(conv_depths[-2], conv_depths[-1], conv_depths[-1], conv_depths[-2])
        self.decoder_layers = nn.Sequential(*decoder_layers)

    def forward(self, x, return_all=False):

        # -------------------------------- YOUR CODE --------------------------------

        # -------------------------------- YOUR CODE --------------------------------

## Part 2.2: Dataset Definition

In [None]:
class Dataset3D(Dataset):
    def __init__(self, x, y, Normalization = False):

        self.Normalization = Normalization
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x) # number of samples


    def __getitem__(self, index): # sampling method. used by DataLoader.
        x = self.x[index]
        y = self.y[index]
        if self.Normalization:
            x = (x - x.min()) / (x.max() - x.min())
        return x, y, index # we return the index as well for future use

## Part 2.3: Train

In [None]:
model = UNet3D(in_channels=1, out_channels=2)
model = model.to(device).float()
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
epochs = 50

In [None]:
train_loader = DataLoader(
    Dataset3D(train_X, train_Y, Normalization=True),
    batch_size=1,
    shuffle=True,
    num_workers=6
)

print('Train Loader Done')

valid_loader = DataLoader(
    Dataset3D(valid_X, valid_Y, Normalization=True),
    batch_size=1,
    shuffle=False,
    num_workers=6
)

print('Validation Loader Done')



In [None]:
# -------------------------------- YOUR CODE --------------------------------

# -------------------------------- YOUR CODE --------------------------------

# Visualization

In the final section visualize segmentation masks for a few random slices for both 2D and 3D model

In [None]:
# -------------------------------- YOUR CODE --------------------------------

# -------------------------------- YOUR CODE --------------------------------