In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
#drive.flush_and_unmount()

Mounted at /content/drive


In [None]:
%ls '/content/drive/MyDrive/Colab Notebooks/ml_tasks'
%cd '/content/drive/MyDrive/Colab Notebooks/ml_tasks'

best_model.pth  [0m[01;34mdata[0m/  semantic_segmentation.ipynb
/content/drive/MyDrive/Colab Notebooks/ml_tasks


In [None]:
!pip install segmentation-models-pytorch

import shutil
from pathlib import Path

import cv2
from PIL import Image
from tqdm import tqdm
import numpy as np
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torchvision import transforms as T
import segmentation_models_pytorch as smp

## Creating a dataset

In [None]:
def make_dir(path):
    path = Path(path)

    if path.is_dir():
        shutil.rmtree(path)

    path.mkdir(parents=True, exist_ok=True)

    return path
    
def prepare_dataset(imgs_dir, masks_dir, dataset_dir, image_size, test_size, seed=42):
    def save_data(data_dir, imgs_paths, masks_paths):
        data_dir = make_dir(data_dir)

        for n, (img_path, mask_path) in enumerate(zip(imgs_paths, masks_paths)):
            img = cv2.imread(str(img_path))
            mask = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)

            img = cv2.resize(img, image_size, interpolation=cv2.INTER_LINEAR)
            mask = cv2.resize(mask, image_size, interpolation=cv2.INTER_NEAREST)

            stem = data_dir / str(n).zfill(4)

            cv2.imwrite(str(stem.with_suffix('.jpg')), img)
            cv2.imwrite(str(stem.with_suffix('.png')), mask)

            print(f'save {n} / {len(imgs_paths)} to {data_dir}')

    imgs_paths = list(Path(imgs_dir).glob('**/*.jpg'))
    imgs_paths.sort()

    masks_paths = list(Path(masks_dir).glob('**/*.png'))
    masks_paths.sort()

    imgs_paths_train, imgs_paths_test, masks_paths_train, masks_paths_test = train_test_split(
        imgs_paths,
        masks_paths,
        test_size=test_size,
        random_state=seed)

    dataset_dir = make_dir(dataset_dir)

    save_data(dataset_dir / 'train', imgs_paths_train, masks_paths_train)
    save_data(dataset_dir / 'test', imgs_paths_test, masks_paths_test)

In [None]:
# params
seed = 42
test_size = 0.3
image_size = 512, 512
batch_size = 16
num_classes = 23
epochs = 100
learning_rate = 0.01
num_workers = 2
early_stopping = 5

dataset_dir = Path('data/segmentation_dataset')

In [None]:
# run only once!
# prepare_dataset('data/semantic_drone_dataset/original_images',
#                 'data/semantic_drone_dataset/label_images_semantic',
#                 dataset_dir, image_size, test_size)

In [None]:
class MaskToTensor:
    def __call__(self, mask):
        target = torch.as_tensor(np.array(mask), dtype=torch.long)
        return target


# ImageNet normalization
NORMALIZE = {
    'mean': [0.485, 0.456, 0.406],
    'std': [0.229, 0.224, 0.225]
}

class SegmentationDataset(Dataset):
    def __init__(self, data_dir):
        self.imgs_paths = list(Path(data_dir).glob('**/*.jpg'))
        self.imgs_paths.sort()

        self.masks_paths = list(Path(data_dir).glob('**/*.png'))
        self.masks_paths.sort()

        self.imgs_transform = T.Compose([
            T.RandomApply([
                T.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.2, hue=0.1)
            ], p=0.5),
            T.ToTensor(),
            T.Normalize(mean=NORMALIZE['mean'], std=NORMALIZE['std'])
        ])

        self.masks_transform = T.Compose([
            MaskToTensor()
        ])

    def __len__(self):
        return len(self.imgs_paths)

    @staticmethod
    def load_image(filename, mode):
        with open(filename, 'rb') as f:
            image = Image.open(f)
            image = image.convert(mode)

        return image

    def __getitem__(self, idx):
        img = self.load_image(self.imgs_paths[idx], mode='RGB')
        mask = self.load_image(self.masks_paths[idx], mode='L')

        img = self.imgs_transform(img)
        mask = self.masks_transform(mask)

        return img, mask


In [None]:
train_dataset = SegmentationDataset(dataset_dir / 'train')
train_loader = DataLoader(train_dataset,
                          batch_size=batch_size,
                          shuffle=True,
                          drop_last=True,
                          num_workers=num_workers)

valid_dataset = SegmentationDataset(dataset_dir / 'test')
valid_loader = DataLoader(valid_dataset,
                          batch_size=batch_size,
                          shuffle=False,
                          drop_last=False,
                          num_workers=num_workers)

## Model creation and training

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print('Using device:', device)

model = smp.Unet('mobilenet_v2', encoder_weights='imagenet', classes=num_classes)
model.train()

model.segmentation_head = nn.Sequential(
    nn.Conv2d(16, num_classes, kernel_size=1, padding=0),
)

for param in model.encoder.parameters():
    param.requires_grad = False

model.to(device)

In [None]:
optimizer = torch.optim.Adam(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=learning_rate)

# optimizer = torch.optim.SGD(
#         filter(lambda p: p.requires_grad, model.parameters()),
#         lr=learning_rate)

loss_fn = nn.CrossEntropyLoss()

In [None]:
def iou_fn(prediction, target, num_classes):
    prediction = F.one_hot(prediction.argmax(dim=0), num_classes)
    target = F.one_hot(target.argmax(dim=0), num_classes)

    intersection = (prediction & target).sum()
    union = (prediction | target).sum()

    iou = (intersection + 1e-6) / (union + 1e-6)

    return iou

In [None]:
best_loss = float('inf')
no_improvement = 0

train_loss = np.zeros((0,))
valid_loss = np.zeros((0,))
valid_iou = np.zeros((0,))

for epoch in range(epochs):
  train_loss_ = np.zeros((0,))
  valid_loss_ = np.zeros((0,))
  valid_iou_ = np.zeros((0,))
  
  # train stage
  progress = tqdm(train_loader, total=len(train_loader), leave=False)

  for batch, target in progress:
    output = model(batch.to(device))
    loss = loss_fn(output, target.to(device))

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    train_loss_ = np.append(train_loss_, loss.item())

    progress.set_description(f'train: {epoch}/{epochs} '
                              f'loss: {train_loss_.mean():.3f}')
    
  # valid stage
  progress = tqdm(enumerate(valid_loader), total=len(valid_loader), leave=False)

  for n, (batch, target) in progress:
    with torch.no_grad():
      output = model(batch.to(device))

    target = target.to(device)
    loss = loss_fn(output, target)

    prediction = F.softmax(output, dim=1).argmax(dim=1)
    iou = iou_fn(prediction, target, num_classes)

    valid_loss_ = np.append(valid_loss_, loss.item())
    valid_iou_ = np.append(valid_iou_, iou.item())

    progress.set_description(f'valid: {epoch}/{epochs} '
                              f'loss: {valid_loss_.mean():.3f} '
                              f'IoU: {valid_iou_.mean():.3f}')
    
  loss = valid_loss_.mean()

  if loss < best_loss:
    best_loss = loss
    torch.save(model.state_dict(), 'best_model.pth')
    no_improvement = 0
  else:
    no_improvement += 1

  # early stopping
  if no_improvement > early_stopping:
    print(f'early stopping {no_improvement}, epoch {epoch}')
    break

  train_loss = np.append(train_loss, train_loss_.mean())
  valid_loss = np.append(valid_loss, valid_loss_.mean())
  valid_iou = np.append(valid_iou, valid_iou_.mean())

                                                                                   

early stopping 6, epoch 34




## Plot metrics

In [None]:

from bokeh.plotting import figure, show, output_file
from bokeh.io import output_notebook

output_notebook()

TOOLS = 'pan,wheel_zoom,box_zoom,reset,save,box_select'

ticks = np.array(range(len(train_loss)))

p1 = figure(title='Segmentation train / valid loss', tools=TOOLS)
p1.line(ticks, valid_loss, line_color='blue', line_width=1)
p1.circle(ticks, valid_loss, legend_label='valid loss', color='red', size=3)
p1.line(ticks, train_loss, line_color='gray', line_width=1)
p1.circle(ticks, train_loss, legend_label='train loss', color='green', size=3)

p2 = figure(title='IoU', tools=TOOLS)
p2.line(ticks, valid_iou, line_color='blue', line_width=1)
p2.circle(ticks, valid_iou, legend_label='IoU', color='red', size=3)
p2.legend.location = 'bottom_right'

show(p1)
show(p2)