# Time Segmentation 

IPython Notebook for segmentation of morphological structures imperative for "Tumor Micro Environemnt (TIME)" analysis.

This Code uses MONAI framework for carrying out the segmentation task

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

from urllib.request import urlopen
from io import BytesIO

import torch, torch.nn as nn, torch.nn.functional as F

import numpy as np

import matplotlib.pyplot as plt

import monai
from monai.utils import first
from monai.transforms import AddChannel, Compose, LoadImage, RandRotate90, RandSpatialCrop, ScaleIntensity, ToTensor,SplitChanneld,SplitChannel, Lambda,Lambdad,Resized,Resize
from monai.losses import DiceLoss,TverskyLoss
from monai.metrics import DiceMetric
from monai.data import ArrayDataset
from torch.utils.data import DataLoader
from monai.utils import progress_bar
from monai.networks.utils import one_hot
import random

Load the dataset from npz file

In [None]:
npz = 'Data.npz'
data = np.load('Data.npz',allow_pickle=True)

images = data["images"]  # images in BHW array order
segs = data["mask"]  # segmentations in BHW array order

# Merging some of the labels for ease 
segs[segs==2]=1
segs[segs==3]=2
segs[segs==4]=2
segs[segs==5]=3
segs[segs==6]=0

- Setup the parameters for model training
- We will split our data into a training and test sets

In [None]:
random.seed(0) 
device = torch.device("cuda:0")

# Setting up trainign parameters
batch_size = 16
num_workers = 0
num_epochs = 600
lr = 5e-2
numLabels = 3

#Randomize the data for training and testing.
indices = list(range(images.shape[0]))
random.shuffle(indices)
case_indices = 100
test_index = indices[-case_indices: -1]  # keep the last 6 cases for testing
train_index = indices[0:images.shape[0]-case_indices]

# divide the images, segmentations, and categories into train/test sets
train_images, train_segs = images[train_index,:,:], segs[train_index,:,:]
test_images, test_segs = images[test_index,:,:], segs[test_index,:,:]

We can now create a MONAI data loading object to compose batches during training, and another for validation:

In [None]:
from monai.data import CacheDataset
from monai.transforms import (
    AddChanneld,
    ScaleIntensityd,
    ToTensord,
    RandFlipd,
    RandRotate90d,
    RandZoomd,
    Rand2DElasticd,
    RandAffined,
    RandGaussianSharpend,
    RandGaussianNoised,
    Resized,
    Resize,
)

aug_prob = 0.2
keys = ("img", "seg")

# use these when interpolating binary segmentations to ensure values are 0 or 1 only
zoom_mode = monai.utils.enums.InterpolateMode.NEAREST
resize_mode = monai.utils.enums.InterpolateMode.NEAREST
elast_mode = monai.utils.enums.GridSampleMode.NEAREST


trans = Compose(
    [
        ScaleIntensityd(keys=("img",)),  # rescale image data to range [0,1]
        AddChanneld(keys=keys),  # add 1-size channel dimension
        Resized(keys = keys,spatial_size=(256,256),mode=['bilinear','nearest']),
        #RandGaussianNoised(keys=("img",), prob=aug_prob, mean=0.2, std=0.05),
        RandRotate90d(keys=keys, prob=aug_prob),
        RandFlipd(keys=keys, prob=aug_prob),
        RandZoomd(keys=keys, prob=aug_prob,mode=['bilinear','nearest']),
        Rand2DElasticd(keys=keys, prob=aug_prob, spacing=10, magnitude_range=(-2, 2),mode=['bilinear','nearest']),
        RandAffined(keys=keys, prob=aug_prob,padding_mode = 'zeros',shear_range = 0.2,rotate_range=20, translate_range=16, mode=['bilinear','nearest']),
        #RandGaussianSharpend(keys=("img",),sigma1_x = (0.05,0.1),sigma1_y = (0.05,0.1),sigma2_x = (0.1,0.2),sigma2_y = (0.1,0.2), alpha = (10.0,30.0),prob = aug_prob),
        ToTensord(keys=keys),  # convert to tensor
    ]
)


data = [
    {"img": train_images[i], "seg": train_segs[i]} for i in range(len(train_images))
]

ds = CacheDataset(data, trans)
loader = DataLoader(
    dataset=ds,
    batch_size=batch_size,
    num_workers=num_workers,
    shuffle=True,
    pin_memory=torch.cuda.is_available(),
)

val_image_trans = Compose([ScaleIntensity(), AddChannel(),Resize(spatial_size=(256,256),mode='bilinear'), ToTensor(),])

val_seg_trans = Compose([AddChannel(),Resize(spatial_size=(256,256),mode='nearest'), ToTensor()])


val_ds = ArrayDataset(test_images, val_image_trans, test_segs, val_seg_trans)
val_loader = DataLoader(
    dataset=val_ds,
    batch_size=batch_size,
    num_workers=num_workers,
    shuffle=False,
    drop_last = False,
    pin_memory=torch.cuda.is_available(),
)

View a sample

In [None]:
batch = first(loader)
im1 = batch["img"]
seg1 = batch["seg"]

print(seg1.unique())
plt.imshow(seg1[0, 0].numpy())
plt.show()
plt.imshow(im1[0, 0].numpy())
plt.show()
plt.imshow(im1[0, 0].numpy()+ 0.25*seg1[0, 0].numpy(), cmap="gray")
plt.show()

Defining the DICE loss. As our data is **partially labelled**, the Dice loss that we use **ignores the unlabelled pixels (we call them background pixels) during loss calculation**

In [None]:
import warnings
from typing import Callable, Optional, Union

import numpy as np
import torch
import torch.nn.functional as F
from torch.nn.modules.loss import _Loss

from monai.networks import one_hot
from monai.utils import LossReduction, Weight


class DiceLoss_ignoreBack(_Loss):
    """
    Compute average Dice loss between two tensors. It can support both multi-classes and multi-labels tasks.
    Input logits `input` (BNHW[D] where N is number of classes) is compared with ground truth `target` (BNHW[D]).
    Axis N of `input` is expected to have logit predictions for each class rather than being image channels,
    while the same axis of `target` can be 1 or N (one-hot format). The `smooth` parameter is a value added to the
    intersection and union components of the inter-over-union calculation to smooth results and prevent divide by 0,
    this value should be small. The `include_background` class attribute can be set to False for an instance of
    DiceLoss to exclude the first category (channel index 0) which is by convention assumed to be background.
    If the non-background segmentations are small compared to the total image size they can get overwhelmed by
    the signal from the background so excluding it in such cases helps convergence.

    Milletari, F. et. al. (2016) V-Net: Fully Convolutional Neural Networks forVolumetric Medical Image Segmentation, 3DV, 2016.

    """

    def __init__(
        self,
        include_background: bool = True,
        to_onehot_y: bool = False,
        sigmoid: bool = False,
        softmax: bool = False,
        other_act: Optional[Callable] = None,
        squared_pred: bool = False,
        jaccard: bool = False,
        reduction: Union[LossReduction, str] = LossReduction.MEAN,
    ) -> None:
        """
        Args:
            include_background: if False channel index 0 (background category) is excluded from the calculation.
            to_onehot_y: whether to convert `y` into the one-hot format. Defaults to False.
            sigmoid: if True, apply a sigmoid function to the prediction.
            softmax: if True, apply a softmax function to the prediction.
            other_act: if don't want to use `sigmoid` or `softmax`, use other callable function to execute
                other activation layers, Defaults to ``None``. for example:
                `other_act = torch.tanh`.
            squared_pred: use squared versions of targets and predictions in the denominator or not.
            jaccard: compute Jaccard Index (soft IoU) instead of dice or not.
            reduction: {``"none"``, ``"mean"``, ``"sum"``}
                Specifies the reduction to apply to the output. Defaults to ``"mean"``.

                - ``"none"``: no reduction will be applied.
                - ``"mean"``: the sum of the output will be divided by the number of elements in the output.
                - ``"sum"``: the output will be summed.

        Raises:
            TypeError: When ``other_act`` is not an ``Optional[Callable]``.
            ValueError: When more than 1 of [``sigmoid=True``, ``softmax=True``, ``other_act is not None``].
                Incompatible values.

        """
        super().__init__(reduction=LossReduction(reduction).value)
        if other_act is not None and not callable(other_act):
            raise TypeError(f"other_act must be None or callable but is {type(other_act).__name__}.")
        if int(sigmoid) + int(softmax) + int(other_act is not None) > 1:
            raise ValueError("Incompatible values: more than 1 of [sigmoid=True, softmax=True, other_act is not None].")
        self.include_background = include_background
        self.to_onehot_y = to_onehot_y
        self.sigmoid = sigmoid
        self.softmax = softmax
        self.other_act = other_act
        self.squared_pred = squared_pred
        self.jaccard = jaccard

    def forward(self, input: torch.Tensor, target: torch.Tensor, smooth: float = 1e-5) -> torch.Tensor:
        """
        Args:
            input: the shape should be BNH[WD].
            target: the shape should be BNH[WD].
            smooth: a small constant to avoid nan.

        Raises:
            ValueError: When ``self.reduction`` is not one of ["mean", "sum", "none"].

        """
        if self.sigmoid:
            input = torch.sigmoid(input)

        n_pred_ch = input.shape[1]
        
        if self.softmax:
            if n_pred_ch == 1:
                warnings.warn("single channel prediction, `softmax=True` ignored.")
            else:
                input = torch.softmax(input, 1)

        if self.other_act is not None:
            input = self.other_act(input)

        if self.to_onehot_y:
            if n_pred_ch == 1:
                warnings.warn("single channel prediction, `to_onehot_y=True` ignored.")
            else:
                if not self.include_background:
                    target = one_hot(target.permute([1,0,2,3]).view(-1,1), num_classes=n_pred_ch+1)
                else:
                    target = one_hot(target.permute([1,0,2,3]).view(-1,1), num_classes=n_pred_ch)
        
        if not self.include_background:
            if n_pred_ch == 1:
                warnings.warn("single channel prediction, `include_background=False` ignored.")
            else:
                # if skipping background, removing first channel
                mask = (target[:,0].unsqueeze(dim=1))
                target = target[:, 1:]                
                input = (torch.reshape(input.permute([0,2,3,1]),(-1,n_pred_ch)))*(1-mask)
        assert (
            target.shape == input.shape
        ), f"ground truth has differing shape ({target.shape}) from input ({input.shape})"

        
        # reducing only spatial dimensions (not batch nor channels)
        reduce_axis = 0#list(range(2, len(input.shape)))
        intersection = torch.sum(target * input, dim=reduce_axis)

        if self.squared_pred:
            target = torch.pow(target, 2)
            input = torch.pow(input, 2)

        ground_o = torch.sum(target, dim=reduce_axis)
        pred_o = torch.sum(input, dim=reduce_axis)

        denominator = ground_o + pred_o

        if self.jaccard:
            denominator = 2.0 * (denominator - intersection)

        f: torch.Tensor = 1.0 - (2.0 * intersection + smooth) / (denominator + smooth)

        if self.reduction == LossReduction.MEAN.value:
            f = torch.mean(f)  # the batch and channel average
        elif self.reduction == LossReduction.SUM.value:
            f = torch.sum(f)  # sum over the batch and channel dims
        elif self.reduction == LossReduction.NONE.value:
            pass  # returns [N, n_classes] losses
        else:
            raise ValueError(f'Unsupported reduction: {self.reduction}, available options are ["mean", "sum", "none"].')

        return f

Define the model. 

MONAI offers several segmentation model that you can pick and choose among. We used UNET model for this application.

In [None]:
'''
class SegNet(nn.Module):
    def __init__(self):
        super().__init__()

        self.model = nn.Sequential(
            # layer 1: convolution, normalization, downsampling
            nn.Conv2d(1, 2, 3, 1, 1),
            nn.BatchNorm2d(2),
            nn.ReLU(),
            nn.MaxPool2d(3, 2, 1),
            # layer 2
            nn.Conv2d(2, 4, 3, 1, 1),
            # layer 3
            nn.ConvTranspose2d(4, 2, 3, 2, 1, 1),
            nn.BatchNorm2d(2),
            nn.ReLU(),
            # layer 4: output
            nn.Conv2d(2, 1, 3, 1, 1),
        )

    def forward(self, x):
        return self.model(x)
#SegNET MODEL
net = monai.networks.nets.SegResNet(spatial_dims=2, 
                                    init_filters=32, 
                                    in_channels=1, 
                                    out_channels=2,
                                    dropout_prob=0.5, 
                                    norm_name='group', 
                                    num_groups=8, 
                                    use_conv_final=True, 
                                    blocks_down=(1, 2, 2, 4), 
                                    blocks_up=(1, 1, 1), upsample_mode='trilinear')
# VNET MODEL
net = monai.networks.nets.VNet(spatial_dims=2, 
                               in_channels=1, 
                               out_channels=3, 
                               act=('elu', {'inplace': True}), 
                               dropout_prob=0.5, 
                               dropout_dim=(2,3))
#HighResNET MODEL
net = monai.networks.nets.HighResNet(spatial_dims=2, 
                                    in_channels=1, 
                                    out_channels=3, 
                                    norm_type='instance', 
                                    dropout_prob=0.5)
'''
#UNET MODEL
net = monai.networks.nets.UNet(dimensions = 2, 
                               in_channels = 1 , 
                               out_channels = 3, 
                               channels = [32,64,128,256,512], 
                               strides = [1,2,2,2,1], 
                               kernel_size=3, 
                               up_kernel_size=3, 
                               num_res_units=2, 
                               act='PRELU', 
                               norm='INSTANCE', 
                               dropout=0.2)

m = nn.Softmax(dim=1)

In [None]:
#See a summary of the model.
from torchsummary import summary
summary(net.to(device),(1,256,256))


Training Cycle

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR
from monai.optimizers import Novograd

#Choice among different optimizers
#opt = Novograd(net.parameters(), lr=lr, betas=(0.9, 0.98), eps=1e-08, weight_decay=1e-2, grad_averaging=False, amsgrad=False)
#opt = torch.optim.Adam(net.parameters(), lr)
opt = torch.optim.SGD(net.parameters(),lr,momentum =0.9, weight_decay = 0.01,nesterov=True)

#Choise different lost functions.
#loss = TverskyLoss(softmax=False, to_onehot_y = True,alpha = 0.25,beta = 0.75,include_background=False,reduction="mean")
loss = DiceLoss_ignoreBack(softmax=False, to_onehot_y = True,include_background=False,reduction="mean")

#Choice among different Metrics
#metric = DiceMetric(include_background=False, to_onehot_y=True, reduction="mean")
metric = DiceMetric(include_background=True, to_onehot_y=False, reduction="mean")

scheduler = CosineAnnealingLR(opt, (num_epochs), eta_min=lr/1000.)


step_losses = []
epoch_metrics = []
total_step = 0


for epoch in range(num_epochs):
    net.train()
    
    metric_train = []
    batchLoss = 0
    numBatch = 0
    # train network with training images
    for batch in loader:
        bimages = batch["img"].to(device)
        bsegs = batch["seg"].to(device)
        #bsegs = one_hot(batch["seg"],num_classes=4).to(device)
        #bimages = bimages.to(device)
        #bsegs = bsegs.to(device)

        opt.zero_grad()

        prediction = m(net(bimages))
        loss_val = loss(prediction, bsegs)        
        loss_val.backward()
        opt.step()

        step_losses.append((total_step, loss_val.item()))
        total_step += 1
        numBatch += 1
        batchLoss = batchLoss+loss_val.item()
        GT = one_hot(bsegs,num_classes=numLabels+1)
        mask = GT[:,0,:,:].unsqueeze(dim=1)
        GT = GT[:,1:,:,:]
        train_metric = metric(prediction*(1-mask), GT)
        metric_train.append(train_metric.item())

    net.eval()
    metric_vals = []
    # test our network using the validation dataset
    with torch.no_grad():
        for bimages, bsegs in val_loader:
            bimages = bimages.to(device)
            bsegs = bsegs.to(device)
            #bsegs = one_hot(bsegs,num_classes=4).to(device)

            prediction = m(net(bimages))
            GT = one_hot(bsegs,num_classes=numLabels+1)
            mask = GT[:,0,:,:].unsqueeze(dim=1)
            GT = GT[:,1:,:,:]
            pred_metric = metric(prediction*(1-mask), GT)
            metric_vals.append(pred_metric.item())

    epoch_metrics.append((total_step, np.average(metric_vals)))
    progress_bar(epoch + 1, num_epochs, f"Validation Metric: {epoch_metrics[-1][1]:.3}")
    scheduler.step()
    if epoch%10 == 0:

        print('\n Batch Loss is: %s and Training Metric is: %s '%(batchLoss/numBatch,np.average(metric_train)))
        print('Current Learning rate: %s '%(np.str(scheduler.get_lr())))

Save the model

In [None]:
torch.save(model, 'TimeSeg_Unet_B64.pth')
#torch.save(model.state_dict(), 'TimeSeg_Unet_B64.pth')

INFERENCE PORTION OF THE CODE

In [None]:
#Image Saving Routine
def saveSeg(img,pred,fname):
    im = Image.fromarray(np.repeat(np.expand_dims(np.uint8(img.squeeze().cpu().numpy()*255),axis=2),3,axis =2))
    im1 = pred.squeeze().cpu().numpy()>0.5
    im1 = Image.fromarray(np.uint8(np.transpose(im1,(1,2,0))*255))
    alphaBlended1 = Image.blend(im,im1, alpha=1)
    alphaBlended1.save(fname)

In [None]:
#Inference Code
net = torch.load('TimeSeg_Unet_B64.pth')
net.to(device)
numLabels = 3
metric = DiceMetric(include_background=True, to_onehot_y=False, reduction="mean")
m = nn.Softmax(dim=1)
net.eval()
metric_vals1 = []
metric_vals2 = []
metric_vals3 = []
# test our network using the validation dataset
counter = []
predictionCount = 0
with torch.no_grad():
    for bimages1, bsegs1 in val_loader:
        bimages1 = bimages1.to(device)
        bsegs1 = bsegs1.to(device)
        #bsegs = one_hot(bsegs,num_classes=4).to(device)

        prediction = m(net(bimages1))
        GT = one_hot(bsegs1,num_classes=numLabels+1)
        mask = GT[:,0,:,:].unsqueeze(dim=1)
        
        for i in range(prediction.shape[0]):
            fname = ("./Labels_v5/Res_%.5d.bmp"% predictionCount)
            predictionCount +=1
            saveSeg(bimages1[i,0,:,:],prediction[i,:,:,:],fname)
           
        pred_metric = metric(prediction[:,0,:,:]*(1-torch.squeeze(mask)), GT[:,1,:,:])
        metric_vals1.append(pred_metric.item())
        pred_metric = metric(prediction[:,1,:,:]*(1-torch.squeeze(mask)), GT[:,2,:,:])
        metric_vals2.append(pred_metric.item())
        pred_metric = metric(prediction[:,2,:,:]*(1-torch.squeeze(mask)), GT[:,3,:,:])
        metric_vals3.append(pred_metric.item())
        counter.append(prediction.shape[0])

print(np.sum(np.array(counter)*np.array(metric_vals1))/np.sum(np.array(counter)))
print(np.sum(np.array(counter)*np.array(metric_vals2))/np.sum(np.array(counter)))
print(np.sum(np.array(counter)*np.array(metric_vals3))/np.sum(np.array(counter)))