# Dependencies

In [95]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import os
from PIL import Image
import numpy as np

# CFG

In [96]:
# Hyperparameters etc.
LEARNING_RATE = 1e-4
BATCH_SIZE = 1
NUM_EPOCHS = 15
NUM_WORKERS = 2
IMAGE_HEIGHT = 160  # 576 originally
IMAGE_WIDTH = 240  # 768 originally
PIN_MEMORY = True
LOAD_MODEL = True
TRAIN_PERCENT = 0.7
VAL_PERCENT = 0.15
IMG_LEN = 240

IMG_DIR = "../input/btsd-test/imgs"

FILE_LIST = []

for root, dirs, files in os.walk(IMG_DIR):
    for filename in files:
        FILE_LIST.append(filename)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Model

In [97]:
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.conv(x)

class UNET(nn.Module):
    def __init__(
            self, in_channels=3, out_channels=1, features=[64, 128, 256, 512],
    ):
        super(UNET, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Down part of UNET
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        # Up part of UNET
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(
                    feature*2, feature, kernel_size=2, stride=2,
                )
            )
            self.ups.append(DoubleConv(feature*2, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]

            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)

        return self.final_conv(x)

# Dataset

In [98]:
class BTSDDataset(Dataset):
    def __init__(self, image_dir, file_list, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.file_list = file_list
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        if index not in range(0, len(self.file_list)):
            return self.__getitem__(np.random.randint(0, self.__len__()))
        
        img_path = os.path.join(self.image_dir, self.file_list[index])
        image = np.array(Image.open(img_path).convert("RGB"))

        if self.transform is not None:
            augmentations = self.transform(image=image)
            image = augmentations["image"]

        return image, self.file_list[index]

# Utils

In [103]:
def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)

def load_checkpoint(checkpoint, model):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])

def get_loaders(
    img_dir,
    test_file_list,
    batch_size,
    test_transform,
    num_workers=4,
    pin_memory=True,
):
    test_ds = BTSDDataset(
        image_dir=img_dir,
        file_list=test_file_list,
        transform=test_transform,
    )

    test_loader = DataLoader(
        test_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=False,
    )

    return test_loader


def save_predictions_as_imgs(
    loader, model, folder="./saved_images", device="cuda"
):
    print(enumerate(loader))
    model.eval()
    for idx, (x, name) in enumerate(loader):
        print(name[0].split('.')[0])
        x = x.to(device=device)
        with torch.no_grad():
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
        torchvision.utils.save_image(
            preds, f"{name[0].split('.')[0]}.png"
        )

# Test

In [104]:
def main():

    test_transforms = A.Compose(
        [
            A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
            A.Normalize(
                mean=[0.0, 0.0, 0.0],
                std=[1.0, 1.0, 1.0],
                max_pixel_value=255.0,
            ),
            ToTensorV2(),
        ],
    )

    model = UNET(in_channels=3, out_channels=1).to(DEVICE)
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    test_loader = get_loaders(
        IMG_DIR,
        FILE_LIST,
        BATCH_SIZE,
        test_transforms,
        NUM_WORKERS,
        PIN_MEMORY,
    )

    if LOAD_MODEL:
        load_checkpoint(torch.load("../input/btsd-assignment-models/3_v1.0_Baseline_10epochs_160x240.pth.tar"), model)

    # print some examples to a folder
    save_predictions_as_imgs(
        test_loader, model, folder="./saved_images", device=DEVICE
    )


if __name__ == "__main__":
    main()