# Semantic Segmentation using UNet on Carvana Dataset

There are four modules:
1. DataLoader
2. UNet Model
3. Utils
4. Train

## DataLoader

In [1]:
import os
from PIL import Image
import numpy as np

class CarvanaDataset:
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir) # get list of all the images in the image directory

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index].replace(".jpg", "_mask.gif"))
        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"), dtype=np.float32)
        mask[mask == 255.0] = 1.0

        if self.transform is not None:
            transformed = self.transform(image=image, mask=mask)
            image = transformed["image"]
            mask = transformed["mask"]

        return image, mask

## UNet Model

In [2]:
'''
This is my UNet model architecture.
'''
import torch
import torch.nn as nn
from functools import reduce
from operator import __add__

class Conv2dSamePadding(nn.Conv2d):
    def __init__(self,*args,**kwargs):
        super(Conv2dSamePadding, self).__init__(*args, **kwargs)
        self.zero_pad_2d = nn.ZeroPad2d(reduce(__add__,
            [(k // 2 + (k - 2 * (k // 2)) - 1, k // 2) for k in self.kernel_size[::-1]]))

    def forward(self, input):
        return  self._conv_forward(self.zero_pad_2d(input), self.weight, self.bias)
    


class UNet(nn.Module):
    def __init__(self):
        super(UNet,self).__init__()
        # Encoder Layers
        # self.layer1 = self.doubleConv(1,64)
        self.layer1 = self.doubleConv(3,64)         # changed the input channels
        self.l2 = nn.MaxPool2d(2,2,0)
        self.l3 = self.doubleConv(64,128)
        self.l4 = nn.MaxPool2d(2,2,0)
        self.l5 = self.doubleConv(128,256)
        self.l6 = nn.MaxPool2d(2,2,0)
        self.l7 = self.doubleConv(256,512)
        self.l8 = nn.MaxPool2d(2,2,0)
        self.l9 = self.doubleConv(512,1024)
        # Decoder Layers
        self.l10 = nn.ConvTranspose2d(1024,512,2,2,0)
        self.l11 = self.doubleConv(1024,512)
        self.l12 = nn.ConvTranspose2d(512,256,2,2,0)
        self.l13 = self.doubleConv(512,256)
        self.l14 = nn.ConvTranspose2d(256,128,2,2,0)
        self.l15 = self.doubleConv(256,128)
        self.l16 = nn.ConvTranspose2d(128,64,2,2,0)
        self.l17 = self.doubleConv(128,64)
        # self.l18 = nn.ConvTranspose2d(64,2,1,1,0)
        self.l18 = nn.ConvTranspose2d(64,1,1,1,0)       # Changed the output channels
        

    def doubleConv(self, in_channel, out_channel):
        return nn.Sequential(
            # nn.Conv2d(in_channel,out_channel,3,1,0),        # VALID CONVOLUTION
            # nn.Conv2d(out_channel,out_channel,3,1,0),       # VALID CONVOLUTION
            Conv2dSamePadding(in_channel,out_channel,3,1,0),      # SAME CONVOLUTION
            Conv2dSamePadding(out_channel,out_channel,3,1,0),     # SAME CONVOLUTION
            nn.ReLU()
            )

    def concatenate(self, tensor, target_tensor):
        delta = int((tensor.shape[2] - target_tensor.shape[2])/2)
        tensor = tensor[:,:, delta:tensor.shape[2]-delta, delta:tensor.shape[2]-delta]
        return torch.cat((tensor,target_tensor),1)

    def forward(self, input):
        '''Need to add batch normalization'''
        # Encoder
        x1 = self.layer1(input)
        x2 = self.l2(x1)
        x3 = self.l3(x2)
        x4 = self.l4(x3)
        x5 = self.l5(x4)
        x6 = self.l6(x5)
        x7 = self.l7(x6)
        x8 = self.l8(x7)
        x9 = self.l9(x8)
        
        # Decoder
        x10 = self.l10(x9)
        x11 = self.l11(self.concatenate(x7,x10))
        x12 = self.l12(x11)
        x13 = self.l13(self.concatenate(x5,x12))
        x14 = self.l14(x13)
        x15 = self.l15(self.concatenate(x3,x14))
        x16 = self.l16(x15)
        x17 = self.l17(self.concatenate(x1,x16))
        x18 = self.l18(x17)

        return x18

def test():    
    model = UNet()
    x = torch.rand(1,3,512,512)
    print("shape of x: ", x.shape)
    y = model(x)
    print("shape of y: ", y.shape)

  from .autonotebook import tqdm as notebook_tqdm


## Utils

In [None]:
import torch
import torchvision

def check_accuracy(loader, model, device="cuda"):
    num_correct = 0
    num_pixels = 0
    dice_score = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device).unsqueeze(1)
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
            num_correct += (preds == y).sum()
            num_pixels += torch.numel(preds)
            dice_score += (2 * (preds * y).sum()) / (
                (preds + y).sum() + 1e-8
            )

    print(
        f"Got {num_correct}/{num_pixels} with acc {num_correct/num_pixels*100:.2f}"
    )
    print(f"Dice score: {dice_score/len(loader)}")
    model.train()
    return dice_score

def save_predictions_as_imgs(
    loader, model, folder="saved_images/", device="cuda"
):
    model.eval()
    for idx, (x, y) in enumerate(loader):
        x = x.to(device=device)
        with torch.no_grad():
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
        torchvision.utils.save_image(
            preds, f"{folder}/pred_{idx}.png"
        )
        torchvision.utils.save_image(y.unsqueeze(1), f"{folder}{idx}.png")

    model.train()

## Train

In [3]:
import albumentations as A
from torch.utils.data import DataLoader
from albumentations.pytorch import ToTensorV2
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from utils import check_accuracy, save_predictions_as_imgs

# Hyperparameters etc.
LEARNING_RATE = 1e-4
BATCH_SIZE = 8
NUM_EPOCHS = 3
IMAGE_HEIGHT = 512 # 160  # 1280 originally
IMAGE_WIDTH = 512 # 240  # 1918 originally
TRAIN_IMG_DIR = "data/train_images/"
TRAIN_MASK_DIR = "data/train_masks/"
VAL_IMG_DIR = "data/val_images/"
VAL_MASK_DIR = "data/val_masks/"
LOAD_MODEL = False
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
NUM_WORKERS = 2
PIN_MEMORY = True
# model_path = 'models/train_02_d = '+str(discount)+'.pt'
model_path = "my_checkpoint.pth.tar"
lossLog = SummaryWriter()

def main():
    #--------------------------------------------------------
    '''
    Data Augmentation Pipeline:
    1. First of all, we define the train and validation transforms.
    2. Then we pass this function to the dataset class.
    '''
    '''Augment the data using the albumentation library'''
    train_transform = A.Compose([A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
                                 A.Rotate(limit=35, p=1.0),
                                 A.HorizontalFlip(p=0.5),
                                 A.VerticalFlip(p=0.1),
                                 A.Normalize(mean=[0.0, 0.0, 0.0],
                                             std=[1.0, 1.0, 1.0],
                                             max_pixel_value=255.0),
                                 ToTensorV2()
                                ])

    val_transform = A.Compose([A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
                               A.Normalize(mean=[0.0, 0.0, 0.0],
                                           std=[1.0, 1.0, 1.0],
                                           max_pixel_value=255.0),
                               ToTensorV2()
                               ])
    
    train_ds = CarvanaDataset(image_dir=TRAIN_IMG_DIR, mask_dir=TRAIN_MASK_DIR, transform=train_transform)
    val_ds = CarvanaDataset(image_dir=VAL_IMG_DIR, mask_dir=VAL_MASK_DIR, transform=val_transform)

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, shuffle=True)
    val_loader = DataLoader(val_ds,batch_size=BATCH_SIZE,num_workers=NUM_WORKERS,pin_memory=PIN_MEMORY,shuffle=False)


    #--------------------------------------------------------
    # 2. Create the Model, define the Loss Function and the Optimizer.
    # model = UNet(in_channels=3, out_channels=1).to(DEVICE)
    model = UNet().to(DEVICE)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    if LOAD_MODEL:
        print("--- Training from the last checkpoint")
        checkpoint = torch.load(model_path)
        
        model.load_state_dict(checkpoint["state_dict"])
        current_epoch = checkpoint["epoch"]
        # seed = checkpoint['seed']

    # 3. Start the training
    for epoch in tqdm(range(current_epoch, NUM_EPOCHS)):
        for batch_idx, (data, targets) in enumerate(tqdm(train_loader)):
            
            data = data.to(device=DEVICE)  # [8, 3, 572, 572]
            targets = targets.float().unsqueeze(1).to(device=DEVICE)  # [8, 1, 572, 572]

            # forward
            predictions = model(data)       # [8, 1, 572, 572]
            loss = loss_fn(predictions, targets)    # Need to check this line

            # backward
            optimizer.zero_grad()
            loss.backward()

            # update the model parameters
            optimizer.step()

        
        # Saving the model
        print("=> Saving Checkpoint")
        checkpoint = {"model": model.state_dict(),
                      "optimizer":optimizer.state_dict(),
                      "epoch": epoch,
                      # "seed": get_seed
                      }
        torch.save(checkpoint, model_path)

        # Save the data to tensorboard
        lossLog.add_scalar("Training loss per epoch", loss, epoch)
        # check accuracy
        check_accuracy(val_loader, model, device=DEVICE)

        # print some examples to a folder
        save_predictions_as_imgs(val_loader, model, folder="saved_images/", device=DEVICE)


if __name__ == '__main__':
    main()

  0%|          | 0/3 [00:00<?, ?it/s]