#Download Dataset

In [4]:
!wget https://nextcloud.in.tum.de/index.php/s/9tp3yCSiLgjLWdj/download/liver_endoscopy_dataset.zip

--2022-05-25 13:03:23--  https://nextcloud.in.tum.de/index.php/s/9tp3yCSiLgjLWdj/download/liver_endoscopy_dataset.zip
Resolving nextcloud.in.tum.de (nextcloud.in.tum.de)... 131.159.0.29, 2a09:80c0::29
Connecting to nextcloud.in.tum.de (nextcloud.in.tum.de)|131.159.0.29|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2907480775 (2.7G) [application/zip]
Saving to: ‘liver_endoscopy_dataset.zip.1’


2022-05-25 13:03:43 (149 MB/s) - ‘liver_endoscopy_dataset.zip.1’ saved [2907480775/2907480775]



In [1]:
!gdown --id 1MHp64mCt2m8NxCW3-4kjD39m3Rry0ekA

Access denied with the following error:

 	Too many users have viewed or downloaded this file recently. Please
	try accessing the file again later. If the file you are trying to
	access is particularly large or is shared with many people, it may
	take up to 24 hours to be able to view or download the file. If you
	still can't access a file after 24 hours, contact your domain
	administrator. 

You may still be able to access the file from the browser:

	 https://drive.google.com/uc?id=1MHp64mCt2m8NxCW3-4kjD39m3Rry0ekA 



#Prepare Dataset

In [None]:
  !unzip -qq liver_endoscopy_dataset
  !rm liver_endoscopy_dataset.zip

: 

#Install Additional Requirements

In [6]:
!pip -qq install pytorch_lightning==1.6.2

^C
[31mERROR: Operation cancelled by user[0m


#Imports

In [1]:
import json
from collections import defaultdict
from pathlib import Path
import random
from typing import Optional

import torch
from torch import nn
import torch.nn.functional as F
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision
import pytorch_lightning as pl
pl.seed_everything(42)

Global seed set to 42


42

#Dataset
We load the liver endoscopy dataset here. This cell defines the video splits, correctly loads the dataset depending on the task.

In [2]:

video_splits = {'train': ['01', '09', '17', '20', '25', '27', '28', '35', '37'], 'val': ['12','18', '24'], 'test': ['26', '48', '52', '43', '55']}

class LiverEndoscopy(Dataset):
    def __init__(self,
                 task_type: str = 'classification', split: str = 'train', balance_data: bool = False, temporal: bool = False,
                 pil_transform: Optional[transforms.Compose] = None, tensor_transform: Optional[transforms.Compose] = None):
        assert split in ['train', 'val', 'test']
        self.split = split
        self.balance_data = balance_data
        self.task_type = task_type
        self.temporal = temporal
        self.pil_transform = pil_transform
        self.tensor_transform = tensor_transform

        export_dataset_path = Path('/home/icb/mostafa.shahhosseini/cls/data')
        self.images_path = export_dataset_path / 'images'
        self.seg_masks_path = export_dataset_path / 'seg_masks'
        with open(export_dataset_path / 'classification_annotations.json', 'r') as f:
            self.classification_annotations = json.load(f)
        with open(export_dataset_path / 'phase_annotations.json', 'r') as f:
            self.workflow_phase_annotations = json.load(f)
        with open(export_dataset_path / 'has_liver.json', 'r') as f:
            self.has_liver = json.load(f)

        if task_type == 'classification' or task_type == 'segmentation' or (task_type == 'workflow' and not temporal):
            self.image_names = []
            for image_path in sorted(self.images_path.glob('*.png')):
                video_id = image_path.name.split('_')[0].replace('video', '')
                if video_id in video_splits[split]:
                    self.image_names.append(image_path.name.replace('.png', ''))
            self.image_names = sorted(self.image_names)

        elif task_type == 'workflow' and temporal:
            self.window_size = 8
            self.downsample_factor = 5
            chunks = []
            start = -1
            end = -1
            prev_frame_number = None
            prev_video_id = None
            for image_path in sorted(self.images_path.glob('*.png')):
                video_id = image_path.name.split('_')[0].replace('video', '')
                if video_id not in video_splits[split]:
                    continue
                if prev_video_id is None:
                    prev_video_id = video_id
                elif prev_video_id != video_id:
                    chunks.append((prev_video_id, start, end))
                    start = -1
                    end = -1
                    prev_frame_number = None
                    prev_video_id = video_id

                frame_number = int(image_path.name.split('_')[1].replace('.png', ''))
                if prev_frame_number is None:
                    start = frame_number
                else:
                    if frame_number == prev_frame_number + 1:
                        end = frame_number
                    else:
                        chunks.append((video_id, start, end))
                        start = frame_number
                prev_frame_number = frame_number
            self.windows = []
            for video_id, start, end in chunks:
                for last_frame_index in range(start + self.window_size, end + 1):
                    all_frames = []
                    for frame_index in range(last_frame_index - self.window_size * self.downsample_factor, last_frame_index, self.downsample_factor):
                        if frame_index < start or frame_index > end:
                            continue
                        all_frames.append(frame_index)
                    if len(all_frames) == self.window_size:
                        self.windows.append((video_id, all_frames))

        if balance_data:
            self.do_balance_data(task_type, temporal)

    def do_balance_data(self, task_type, temporal):
        print('Balancing data by oversampling under-represented classes...')
        class_to_samples = defaultdict(list)
        if not temporal:
            for image_name in self.image_names:
                if task_type == 'classification':
                    label = self.classification_annotations[image_name]
                elif task_type == 'segmentation':
                    label = self.has_liver[image_name]
                elif task_type == 'workflow' and not temporal:
                    label = self.workflow_phase_annotations[image_name]
                class_to_samples[label].append(image_name)
            max_number = max([len(elem) for elem in class_to_samples.values()])
            self.image_names = []
            for key, value in class_to_samples.items():
                if len(value) < max_number:
                    self.image_names += random.choices(value, k=max_number)
                else:
                    self.image_names += value
            random.shuffle(self.image_names)
        else:
            for video_id, window in self.windows:
                label = self.workflow_phase_annotations[f'video{video_id}_{str(window[-1]).zfill(6)}']
                class_to_samples[label].append((video_id, window))
            max_number = max([len(elem) for elem in class_to_samples.values()])
            self.windows = []
            for key, value in class_to_samples.items():
                if len(value) < max_number:
                    self.windows += random.choices(value, k=max_number)
                else:
                    self.windows += value
            random.shuffle(self.windows)

    def __len__(self):
        if self.task_type == 'workflow' and self.temporal:
            return len(self.windows)
        else:
            return len(self.image_names)

    def phase_label_to_number(self, phase_label):
        if phase_label == 'Preparation':
            return 0
        elif phase_label == 'CalotTriangleDissection':
            return 1
        elif phase_label == 'GallbladderDissection':
            return 2
        else:
            raise ValueError('Unknown phase label: {}'.format(phase_label))

    def number_to_phase_label(self, phase_number):
        if phase_number == 0:
            return 'Preparation'
        elif phase_number == 1:
            return 'CalotTriangleDissection'
        elif phase_number == 2:
            return 'GallbladderDissection'
        else:
            raise ValueError('Unknown phase number: {}'.format(phase_number))

    def __getitem__(self, index):
        if self.task_type == 'workflow' and self.temporal:
            video_id, window = self.windows[index]
            image_paths = []
            for frame_number in window:
                image_paths.append(self.images_path / f'video{video_id}_{str(frame_number).zfill(6)}.png')

            image_tensors = []
            for image_path in image_paths:
                image = Image.open(image_path)
                if self.pil_transform is not None:
                    image = self.pil_transform(image)
                image_tensor = transforms.ToTensor()(image)
                if self.tensor_transform is not None:
                    image_tensor = self.tensor_transform(image_tensor)
                image_tensors.append(image_tensor)

            phase = self.phase_label_to_number(self.workflow_phase_annotations[image_paths[-1].name.replace('.png', '')])
            image_tensors = torch.stack(image_tensors)
            return image_tensors, phase
        else:
            image_name = self.image_names[index]
            image_path = self.images_path / f'{image_name}.png'
            seg_mask_path = self.seg_masks_path / f'{image_name}_liver_mask.png'
            image = Image.open(image_path)
            if self.pil_transform is not None:
                image = self.pil_transform(image)
            image_tensor = transforms.ToTensor()(image)
            if self.tensor_transform is not None:
                image_tensor = self.tensor_transform(image_tensor)
            seg_mask = Image.open(seg_mask_path)
            if self.pil_transform is not None:
                seg_mask = self.pil_transform(seg_mask)
            seg_mask_tensor = transforms.ToTensor()(seg_mask)[0].float()
            if self.tensor_transform is not None:
                seg_mask_tensor = self.tensor_transform(seg_mask_tensor)
            intrument_exists = int(self.classification_annotations[image_name])
            phase = self.phase_label_to_number(self.workflow_phase_annotations[image_name])

            if self.task_type == 'classification':
                return image_tensor, intrument_exists
            elif self.task_type == 'segmentation':
                return image_tensor, seg_mask_tensor
            elif self.task_type == 'workflow' and not self.temporal:
                return image_tensor, phase


In [3]:
test = LiverEndoscopy()

In [4]:
test.__getitem__(0)[0].shape

torch.Size([3, 480, 854])

# Classification 1
###### You need to define classification model with two CNN layers, ReLU and pooling operator afterwards and three Linear layers.

In [5]:
class ClassificationModel(nn.Module):
    def __init__(self):
        super().__init__()
        #######################################
        # To be filled
        #
        # define the layers
        # conv1: 3 filters, kernel_size = 5
        # maxpooling: size = 2 
        # conv1: 6 filters, kernel_size = 5
        # fc1: output features = 120
        # fc2: output features = 84
        # fc3: output features = num_classes
        #######################################
        self.conv1 = nn.Conv2d(3, 3, kernel_size=5)
        self.pool = nn.MaxPool2d(2)
        self.conv2 = nn.Conv2d(3, 6, kernel_size=5)
        self.fc1 = nn.Linear(16854, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Classification 2
###### You need to define classification model with pretrained ResNet18.

In [19]:
import torchvision.models as models

class ResNetModel(nn.Module):
    def __init__(self):
        super().__init__()
        #######################################
        # To be filled
        #
        # define the backbone using ResNet18
        #######################################
        self.image_backbone = models.resnet18(pretrained = True)
        #######################################
        # To be filled
        #
        # disable the last layer, redefine it to match the number of classes
        #######################################
        self.image_backbone.fc = nn.Identity()
        self.linear = nn.Linear(512, 9)

    def forward(self, x):
        x = self.image_backbone(x)
        x = self.linear(x)
        return x

In [20]:
model = ResNetModel()

In [21]:
model.image_backbone.fc

Identity()

# Classification 3
###### Train both models with CrossEntropy loss.

In [22]:
from sklearn.metrics import classification_report
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn


class Classifier(pl.LightningModule):
    def __init__(self):
        super().__init__()
        #######################################
        # To be filled
        #
        # use the model from Classification 1
        # define the loss
        #######################################
        self.model = ClassificationModel()
        # define the loss
        self.loss = nn.CrossEntropyLoss()
        
        self.train_preds = []
        self.train_gts = []
        self.val_preds = []
        self.val_gts = []
        self.test_preds = []
        self.test_gts = []
        self.reset_metrics()

        self.train_loss = []
        self.val_loss = []
        self.test_loss = []

    def training_step(self, batch, batch_idx):
        #######################################
        # To be filled
        #
        # run the model on input image to get the prediction y_hat
        # specify the loss inputs
        #######################################
        x, y = batch
        y_hat = self.model(x)
        loss = self.loss(y_hat, y)
        
        self.update_metrics(y, y_hat, split='train')
        self.train_loss.append(loss.item())
        return {'loss': loss}


    def validation_step(self, batch, batch_idx):
        # validation_step defines the validation loop.
        x, y = batch
        #######################################
        # To be filled
        #
        # run the model on input image to get the prediction y_hat
        # specify the loss inputs
        #######################################
        y_hat = self.model(x)
        loss = self.loss(y_hat, y)
        self.update_metrics(y, y_hat, split='val')
        self.val_loss.append(loss.item())
        return {'val_loss': loss}

    def test_step(self, batch, batch_idx):
        # test_step defines the test loop.
        x, y = batch
        #######################################
        # To be filled
        #
        # run the model on input image to get the prediction y_hat
        # specify the loss inputs
        #######################################
        y_hat = self.model(x)
        loss = self.loss(y_hat, y)
        self.update_metrics(y, y_hat, split='test')
        self.test_loss.append(loss.item())
        return {'test_loss': loss}

    def configure_optimizers(self):
        #######################################
        # To be filled
        #
        # Define Adam optimizer with different learning rates in range (1e-2, 1e-4) 
        #######################################
        optimizer = torch.optim.Adam(self.parameters(), lr = 0.001, weight_decay=1e-5)
        return optimizer

    def reset_metrics(self, split=None):
        if split == 'train':
            self.train_preds = []
            self.train_gts = []
        elif split == 'val':
            self.val_preds = []
            self.val_gts = []
        elif split == 'test':
            self.test_preds = []
            self.test_gts = []
        else:
            self.train_preds = []
            self.train_gts = []
            self.val_preds = []
            self.val_gts = []
            self.test_preds = []
            self.test_gts = []

    def update_metrics(self, gt, pred, split='train'):
        if split == 'train':
            self.train_preds.extend(pred.detach().cpu().numpy().argmax(1))
            self.train_gts.extend(gt.detach().cpu().numpy())
        elif split == 'val':
            self.val_preds.extend(pred.detach().cpu().numpy().argmax(1))
            self.val_gts.extend(gt.detach().cpu().numpy())
        elif split == 'test':
            self.test_preds.extend(pred.detach().cpu().numpy().argmax(1))
            self.test_gts.extend(gt.detach().cpu().numpy())
        else:
            raise NotImplementedError()

    def training_epoch_end(self, outputs):
        self.evaluate_predictions(split='train')
        self.reset_metrics(split='train')

    def validation_epoch_end(self, outputs):
        self.evaluate_predictions(split='val')
        self.reset_metrics(split='val')
    
    def test_epoch_end(self, outputs):
        self.evaluate_predictions(split='test')
        self.reset_metrics(split='test')

    def evaluate_predictions(self, split):
        if split == 'train':
            preds = self.train_preds
            gts = self.train_gts
        elif split == 'val':
            preds = self.val_preds
            gts = self.val_gts
        elif split == 'test':
            preds = self.test_preds
            gts = self.test_gts
        else:
            raise NotImplementedError()

        cls_report = classification_report(gts, preds)
        print(split)
        print(cls_report)


    

In [None]:
from sklearn.metrics import classification_report
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn


class Classifier(pl.LightningModule):
    def __init__(self):
        super().__init__()
        #######################################
        # To be filled
        #
        # use the Resnet model from Classification 2
        # define the loss
        #######################################
        self.model = ResNetModel()
        # define the loss
        self.loss = nn.CrossEntropyLoss()
        
        self.train_preds = []
        self.train_gts = []
        self.val_preds = []
        self.val_gts = []
        self.test_preds = []
        self.test_gts = []
        self.reset_metrics()

        self.train_loss = []
        self.val_loss = []
        self.test_loss = []

    def training_step(self, batch, batch_idx):
        #######################################
        # To be filled
        #
        # run the model on input image to get the prediction y_hat
        # specify the loss inputs
        #######################################
        x, y = batch
        y_hat = self.model(x)
        loss = self.loss(y_hat, y)
        
        self.update_metrics(y, y_hat, split='train')
        self.train_loss.append(loss.item())
        return {'loss': loss}


    def validation_step(self, batch, batch_idx):
        # validation_step defines the validation loop.
        x, y = batch
        #######################################
        # To be filled
        #
        # run the model on input image to get the prediction y_hat
        # specify the loss inputs
        #######################################
        y_hat = self.model(x)
        loss = self.loss(y_hat, y)
        self.update_metrics(y, y_hat, split='val')
        self.val_loss.append(loss.item())
        return {'val_loss': loss}

    def test_step(self, batch, batch_idx):
        # test_step defines the test loop.
        x, y = batch
        #######################################
        # To be filled
        #
        # run the model on input image to get the prediction y_hat
        # specify the loss inputs
        #######################################
        y_hat = self.model(x)
        loss = self.loss(y_hat, y)
        self.update_metrics(y, y_hat, split='test')
        self.test_loss.append(loss.item())
        return {'test_loss': loss}

    def configure_optimizers(self):
        #######################################
        # To be filled
        #
        # Define Adam optimizer with different learning rates in range (1e-2, 1e-4) 
        #######################################
        optimizer = torch.optim.Adam(self.parameters(), lr = 0.001, weight_decay=1e-5)
        return optimizer

    def reset_metrics(self, split=None):
        if split == 'train':
            self.train_preds = []
            self.train_gts = []
        elif split == 'val':
            self.val_preds = []
            self.val_gts = []
        elif split == 'test':
            self.test_preds = []
            self.test_gts = []
        else:
            self.train_preds = []
            self.train_gts = []
            self.val_preds = []
            self.val_gts = []
            self.test_preds = []
            self.test_gts = []

    def update_metrics(self, gt, pred, split='train'):
        if split == 'train':
            self.train_preds.extend(pred.detach().cpu().numpy().argmax(1))
            self.train_gts.extend(gt.detach().cpu().numpy())
        elif split == 'val':
            self.val_preds.extend(pred.detach().cpu().numpy().argmax(1))
            self.val_gts.extend(gt.detach().cpu().numpy())
        elif split == 'test':
            self.test_preds.extend(pred.detach().cpu().numpy().argmax(1))
            self.test_gts.extend(gt.detach().cpu().numpy())
        else:
            raise NotImplementedError()

    def training_epoch_end(self, outputs):
        self.evaluate_predictions(split='train')
        self.reset_metrics(split='train')

    def validation_epoch_end(self, outputs):
        self.evaluate_predictions(split='val')
        self.reset_metrics(split='val')
    
    def test_epoch_end(self, outputs):
        self.evaluate_predictions(split='test')
        self.reset_metrics(split='test')

    def evaluate_predictions(self, split):
        if split == 'train':
            preds = self.train_preds
            gts = self.train_gts
        elif split == 'val':
            preds = self.val_preds
            gts = self.val_gts
        elif split == 'test':
            preds = self.test_preds
            gts = self.test_gts
        else:
            raise NotImplementedError()

        cls_report = classification_report(gts, preds)
        print(split)
        print(cls_report)



    

# Dataloaders 
###### Define the validation and test datasets simular to training dataset. 



In [None]:
pil_transform = transforms.Compose([transforms.Resize((224, 224))])
train_dataset = LiverEndoscopy(split='train', balance_data=True, task_type='classification', temporal=False, pil_transform=pil_transform)
#######################################
# To be filled
# Define the validation and test datasets 
# simular to training dataset
#######################################
val_dataset = None
test_dataset = None
#######################################
# To be filled
# Define loaders with batch_size = 64 and 
# shuffle=True for train loader and 
# shuffle=False for the rest, num_workers=1
#######################################
train_loader = None
val_loader = None
test_loader = None
#######################################
# To be filled
# Define the model
#######################################
model = None

In [None]:
trainer = pl.Trainer(gpus=1, max_epochs=3, num_sanity_val_steps=0)
                     
trainer.fit(model=model, train_dataloaders=train_loader, val_dataloaders=val_loader)

In [None]:
trainer.test(model, dataloaders=test_loader)