<a href="https://colab.research.google.com/github/s1m0nS/mapAI-UNet/blob/main/UNet_binary_segmentation_PyTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Binary semantic segmentation example using U-Net
Pytorch implementation: https://www.youtube.com/watch?v=IHq1t7NxS8k

In [1]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive/')

# Unzip dataset --> run only in Colab
import zipfile

mapai_zip = '/content/drive/MyDrive/datasets/mapai_small.zip'
zip_ref = zipfile.ZipFile(mapai_zip, 'r')
zip_ref.extractall("/tmp") # extracting dataset to tmp folder
zip_ref.close()

Mounted at /content/drive/


### DATASET: dataset.**py**

In [24]:
import os
from PIL import Image
from torch.utils.data import Dataset
import numpy as np

class mapAIdataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, index):
        # IMAGES are TIF
        img_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index])
        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"), dtype=np.float32)
        mask[mask == 255.0] = 1.0 # convert 255 to 1

        if self.transform is not None:
            augmentations = self.transform(image=image, mask=mask)
            image = augmentations["image"]
            mask = augmentations["mask"]

        return image, mask

### MODEL: model.py

torch.Size([3, 1, 500, 500])
torch.Size([3, 1, 500, 500])


## UTILITIES: utilities.py

In [27]:
import torch
import torchvision
#from dataset import mapAIdataset
from torch.utils.data import DataLoader

def save_checkpoint(state, filename="mapAI-checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)

def load_checkpoint(checkpoint, model):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])

def get_loaders(
    train_dir,
    train_maskdir,
    val_dir,
    val_maskdir,
    test_dir,
    test_maskdir,
    batch_size,
    train_transform,
    val_transform,
    num_workers=4,
    pin_memory=True
):
    
    train_ds = mapAIdataset(
        image_dir=train_dir,
        mask_dir=train_maskdir,
        transform=train_transform,
    )

    train_loader = DataLoader(
        train_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=True,
    )

    val_ds = mapAIdataset(
        image_dir=val_dir,
        mask_dir=val_maskdir,
        transform=val_transform,
    )

    val_loader = DataLoader(
        val_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=False,
    )

    return train_loader, val_loader

def check_accuracy(loader, model, device="cuda"):
    num_correct = 0
    num_pixels = 0
    dice_score = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            # if the label does not have channel use unsqueeze(1)
            y = y.to(device).unsqueeze(1) 
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
            num_correct += (preds == y).sum()
            num_pixels += torch.numel(preds)
            dice_score += (2 * (preds * y).sum()) / (
                (preds + y).sum() + 1e-8
            )

    print(
        f"Got {num_correct}/{num_pixels} with acc {num_correct/num_pixels*100:.2f}"
    )
    print(f"Dice score: {dice_score/len(loader)}")
    model.train()

def save_predictions_as_imgs(
    loader, model, folder="predictions/", device="cuda"
):
    model.eval()
    for idx, (x, y) in enumerate(loader):
        x = x.to(device=device)
        with torch.no_grad():
            print(len(dataloaders_dict['test']))
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
        torchvision.utils.save_image(
            preds, f"{folder}/pred_{idx}.png"
        )
        torchvision.utils.save_image(y.unsqueeze(1), f"{folder}{idx}.png")

    model.train()

### TRAIN: train.py

In [28]:
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
#from model import UNET
#from utils import (
#    load_checkpoint,
#    save_checkpoint,
#    get_loaders,
#    check_accuracy,
#    save_predictions_as_imgs,
#)

# HYPERPARAMETERS
LEARNING_RATE = 1e-3
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 8 # number of images
NUM_EPOCHS = 5
NUM_WORKERS = 2
IMAGE_HEIGHT = 500
IMAGE_WIDTH = 500
PIN_MEMORY = True
LOAD_MODEL = False

# SPECIFY PATH FOR MAPAI DATASET
TRAIN_IMG_DIR = "/tmp/mapai_small/train/images/"
TRAIN_MASK_DIR = "/tmp/mapai_small/train/masks/"
VAL_IMG_DIR = "/tmp/mapai_small/val/images/"
VAL_MASK_DIR = "/tmp/mapai_small/val/masks/"

def train_fn(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader)

    for batch_idx, (data, targets) in enumerate(loop):
        data = data.to(device=DEVICE)
        targets = targets.float().unsqueeze(1).to(device=DEVICE)

        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)

        # backward
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        # update tqdm to show the loss function
        loop.set_postfix(loss=loss.item())

def main():
    train_transform = A.Compose(
        [
            A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
            A.Rotate(limit=35, p=1.0),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.1),
            A.Normalize(
                mean=[0.0, 0.0, 0.0],
                std=[1.0, 1.0, 1.0],
                max_pixel_value=255.0,
            ),
            ToTensorV2(),
        ],
    )

    val_transforms = A.Compose(
        [
            A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
            A.Normalize(
                mean=[0.0, 0.0, 0.0],
                std=[1.0, 1.0, 1.0],
                max_pixel_value=255.0,
            ),
            ToTensorV2(),
        ],
    )

    model = UNET(in_channels=3, out_channels=1).to(DEVICE)
    loss_fn = nn.BCEWithLogitsLoss() # Binary-Cross-Entropy loss with Logits
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
    # load train, validation dataset
    train_loader, val_loader = get_loaders(
        TRAIN_IMG_DIR,
        TRAIN_MASK_DIR,
        VAL_IMG_DIR,
        VAL_MASK_DIR,
        BATCH_SIZE,
        train_transform,
        val_transforms,
        NUM_WORKERS,
        PIN_MEMORY
        )

    if LOAD_MODEL:
        load_checkpoint(torch.load("mapAI-checkpoint.pth.tar"), model)

    check_accuracy(val_loader, model, device=DEVICE)
    scaler = torch.cuda.amp.GradScaler()
    
    epoch = 0
    for epoch in range(NUM_EPOCHS):
        print('Epoch', epoch)
        train_fn(train_loader, model, optimizer, loss_fn, scaler)

        # save model
        checkpoint = {
            "state_dict": model.state_dict(),
            "optimizer":optimizer.state_dict(),
        }
        save_checkpoint(checkpoint)

        # check accuracy
        check_accuracy(val_loader, model, device=DEVICE)

        # print some examples to a folder
        save_predictions_as_imgs(
            val_loader, model, folder="/content/drive/MyDrive/predictions/", device=DEVICE
        )
        epoch = epoch + 1

    # Save model
    savedm = torch.save(model.state_dict(), "/content/drive/MyDrive/models/mapAI-torch.pth")

if __name__ == "__main__":
    main()

ValueError: ignored

In [None]:
model = torch.load("/content/drive/MyDrive/models/mapAI-torch.pth")
model.eval()

#def get_prediction(image_bytes):
#    tensor = transform_image(image_bytes=image_bytes)
#    tensor=tensor.to(device)
#    output = model.forward(tensor)
#     
#    probs = torch.nn.functional.softmax(output, dim=1)
#    conf, classes = torch.max(probs, 1)
#    return conf.item(), index_to_breed[classes.item()]
# 
#image_path="/content/test/06b3a4da7b96404349e51551bf611551.jpg"
#image = plt.imread(image_path)
#plt.imshow(image)
# 
#with open(image_path, 'rb') as f:
#    image_bytes = f.read()
# 
#    conf,y_pre=get_prediction(image_bytes=image_bytes)
#    print(y_pre, ' at confidence score:{0:.2f}'.format(conf))


AttributeError: ignored