In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


### Reqirements
- keras >= 2.2.0 or tensorflow >= 1.13
- segmenation-models==1.0.*
- albumentations==0.3.0

In [None]:
# Install required libs

### please update Albumentations to version>=0.3.0 for `Lambda` transform support
!pip install -U git+https://github.com/albu/albumentations --no-cache-dir

!pip uninstall -y opencv-python
!pip install opencv-python

In [None]:
# !git clone https://github.com/qubvel/segmentation_models.pytorch

In [None]:
%cd ./gdrive/MyDrive/Colab Notebooks/Solar Panels

!pip install -r requirements.txt
!ls

# Loading dataset

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import albumentations as A

import segmentation_models_pytorch as smp

from pathlib import Path

from torch.utils.data import DataLoader
from torch.utils.data import Dataset

print(smp.__version__)
# os.environ['CUDA_VISIBLE_DEVICES'] = '0'

0.2.1


# Dataloader and utility functions 

In [None]:
from src.models.segmentation.datasets import *


# # classes for data loading and preprocessing
# class SolarPanelsDataset(Dataset):
#     """CamVid Dataset. Read images, apply augmentation and preprocessing transformations.
#
#     Args:
#         images_dir (str): path to images folder
#         masks_dir (str): path to segmentation masks folder
#         class_values (list): values of classes to extract from segmentation mask
#         augmentation (albumentations.Compose): data transformation pipeline
#             (e.g. flip, scale, etc.)
#         preprocessing (albumentations.Compose): data preprocessing
#             (e.g. normalization, shape manipulation, etc.)
#
#     """
#
#     CLASSES = ['solar_panel']
#
#     def __init__(
#             self,
#             images_dir,
#             masks_dir,
#             classes=None,
#             augmentation=None,
#             preprocessing=None,
#     ):
#         self.ids = os.listdir(images_dir)
#         self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
#         self.masks_fps = [os.path.join(masks_dir, image_id.split('.')[0]+'_label.png') for image_id in self.ids]
#
#         # convert str names to class values on masks
#         self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]
#
#         self.augmentation = augmentation
#         self.preprocessing = preprocessing
#
#
#     def __getitem__(self, i):
#         # read data
#         image = cv2.imread(self.images_fps[i])
#         image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
#         mask = cv2.imread(self.masks_fps[i],0)
#         mask = cv2.threshold(mask, 0, 255, cv2.THRESH_BINARY)[1]
#
#         # extract certain classes from mask (e.g. cars)
#         masks = [(mask != v) for v in self.class_values]
#         # masks = [(mask == v) for v in self.class_values]
#         mask = np.stack(masks, axis=-1).astype('float')
#
#         # add background if mask is not binary
#         if mask.shape[-1] != 1:
#             background = 1 - mask.sum(axis=-1, keepdims=True)
#             mask = np.concatenate((mask, background), axis=-1)
#
#         # apply augmentations
#         if self.augmentation:
#             sample = self.augmentation(image=image, mask=mask)
#             image, mask = sample['image'], sample['mask']
#
#         # apply preprocessing
#         if self.preprocessing:
#             sample = self.preprocessing(image=image, mask=mask)
#             image, mask = sample['image'], sample['mask']
#
#         return image, mask
#
#     def __len__(self):
#         return len(self.ids)
#
# class GoogleMapsDataset(Dataset):
#     """CamVid Dataset. Read images, apply augmentation and preprocessing transformations.
#
#     Args:
#         images_dir (str): path to images folder
#         augmentation (albumentations.Compose): data transfromation pipeline
#             (e.g. flip, scale, etc.)
#         preprocessing (albumentations.Compose): data preprocessing
#             (e.g. noralization, shape manipulation, etc.)
#     """
#
#     def __init__(
#             self,
#             images_dir,
#             augmentation=None,
#             preprocessing=None,
#     ):
#         self.ids = os.listdir(images_dir)
#         self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
#
#         self.augmentation = augmentation
#         self.preprocessing = preprocessing
#
#     def __getitem__(self, i):
#
#         # read data
#         image = cv2.imread(self.images_fps[i])
#         image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
#
#         # apply augmentations
#         if self.augmentation:
#             sample = self.augmentation(image=image)
#             image = sample['image']
#
#         # apply preprocessing
#         if self.preprocessing:
#             sample = self.preprocessing(image=image)
#             image = sample['image']
#
#         return image
#
#     def __len__(self):
#         return len(self.ids)

### Augmentations

Data augmentation is a powerful technique to increase the amount of your data and prevent model overfitting.  
If you not familiar with such trick read some of these articles:
 - [The Effectiveness of Data Augmentation in Image Classification using Deep
Learning](http://cs231n.stanford.edu/reports/2017/pdfs/300.pdf)
 - [Data Augmentation | How to use Deep Learning when you have Limited Data](https://medium.com/nanonets/how-to-use-deep-learning-when-you-have-limited-data-part-2-data-augmentation-c26971dc8ced)
 - [Data Augmentation Experimentation](https://towardsdatascience.com/data-augmentation-experimentation-3e274504f04b)

Since our dataset is very small we will apply a large number of different augmentations:
 - horizontal flip
 - affine transforms
 - perspective transforms
 - brightness/contrast/colors manipulations
 - image bluring and sharpening
 - gaussian noise
 - random crops

All this transforms can be easily applied with [**Albumentations**](https://github.com/albu/albumentations/) - fast augmentation library.
For detailed explanation of image transformations you can look at [kaggle salt segmentation exmaple](https://github.com/albu/albumentations/blob/master/notebooks/example_kaggle_salt.ipynb) provided by [**Albumentations**](https://github.com/albu/albumentations/) authors.


In [None]:
# define heavy augmentations
def get_training_augmentation():
    train_transform = [

        A.HorizontalFlip(p=0.5),

        A.ShiftScaleRotate(scale_limit=0.5, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0),

        A.PadIfNeeded(min_height=256, min_width=256, always_apply=True, border_mode=0),
        A.RandomCrop(height=256, width=256, always_apply=True),

        A.GaussNoise(p=0.2),
        #A.Perspective(p=0.5),

        A.OneOf(
            [
                A.CLAHE(p=1),
                A.RandomBrightnessContrast(p=1),
                A.RandomGamma(p=1),
            ],
            p=0.9,
        ),

        A.OneOf(
            [
                A.Sharpen(p=1),
                A.Blur(blur_limit=3, p=1),
                A.MotionBlur(blur_limit=3, p=1),
            ],
            p=0.9,
        ),

        A.OneOf(
            [
                A.RandomBrightnessContrast(p=1),
                A.HueSaturationValue(p=1),
            ],
            p=0.9,
        ),
    ]
    return A.Compose(train_transform)


def get_validation_augmentation():
    """Add paddings to make image shape divisible by 32"""
    test_transform = [
        A.PadIfNeeded(256, 256)
    ]
    return A.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn):
    """Construct preprocessing transform
    
    Args:
        preprocessing_fn (callable): data normalization function
            (can be specific for each pretrained neural network)
    Return:
        transform: albumentations.Compose
    
    """
    
    _transform = [
        A.Lambda(image=preprocessing_fn),
        A.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return A.Compose(_transform)

# Segmentation model training

In [None]:
def get_sp_dataset(folder, augmentation, params):
    x_dir = os.path.join(params['data_dir'], f'{folder}/images')
    y_dir = os.path.join(params['data_dir'], f'{folder}/masks')

    return SolarPanelsDataset(
        x_dir, y_dir,
        classes=params['classes'],
        augmentation=augmentation(),
        preprocessing=get_preprocessing(smp.encoders.get_preprocessing_fn(params['encoder'])),
    )

def get_gm_dataset(folder, augmentation, params):
    x_dir = os.path.join(params['data_dir'], folder)
    return GoogleMapsDataset(
        x_dir,
        augmentation=augmentation(),
        preprocessing=get_preprocessing(smp.encoders.get_preprocessing_fn(params['encoder'])),
    )

def get_model(model, encoder, n_classes, activation):
    return model(encoder, classes=n_classes, activation=activation)

def get_model_info(model_name):
    info, ext = model_name.split('.')
    arch, *enc, epochs = info.split('_')
    
    enc = '_'.join(enc[:-1])
    raw_name = arch + '_' + enc
    return raw_name, enc, int(epochs)

def model_exists(model_name):
    parent = Path(model_name).parent
    name, _, _ = get_model_info(model_name)
    for model in os.listdir(parent):
        if model.startswith(name):
            return os.path.join(parent, model)

def get_optimizer(model, optimizer, lr):
    return optimizer(params=model.parameters(), lr=lr)

In [None]:
def train(train_params, device, verbose=True):

    model_name = model_exists(train_params['model_name'])
    n_classes = 1 if len(train_params['classes']) == 1 else (len(train_params['classes']) + 1)  # case for binary and multiclass segmentation

    if model_name is not None:

        model = torch.load(model_name)
        raw_name, _, prev_epochs = get_model_info(model_name)

        if prev_epochs == 0:
            print(f'There already exists a model: {model_name}')
            return

        train_params['epochs'] -= prev_epochs

    else:
        model = get_model(
            model=train_params['architecture'],
            encoder=train_params['encoder'],
            activation='sigmoid' if n_classes == 1 else 'softmax',
            n_classes=n_classes,
        )
        
    train_dataset = get_sp_dataset('train', get_training_augmentation, train_params)  # Dataset for training images
    valid_dataset = get_sp_dataset('val', get_validation_augmentation, train_params)  # Dataset for validation images

    train_loader = DataLoader(
        train_dataset,
        batch_size=train_params['batch_size'],
        shuffle=True,
        num_workers=2
    )
    valid_loader = DataLoader(
        valid_dataset,
        batch_size=1,
        shuffle=False,
        num_workers=2
    )
    return trainloop(model, train_loader, valid_loader, train_params, device, verbose)


def trainloop(model, train_loader, valid_loader, train_params, device, verbose):
    optimizer = get_optimizer(model, train_params['optimizer'], train_params['lr'])

    train_epoch = smp.train.TrainEpoch(
        model, 
        loss=train_params['loss'], 
        metrics=train_params['metrics'], 
        optimizer=optimizer,
        device=device,
        verbose=verbose,
    )
    valid_epoch = smp.train.ValidEpoch(
        model, 
        loss=train_params['loss'], 
        metrics=train_params['metrics'],
        device=device,
        verbose=verbose,
    )

    max_score = 0
    print(train_params['model_name'])
    for epoch in range(train_params['epochs']):
        print(f'\nEpoch: {epoch + 1}')
        train_logs = train_epoch.run(train_loader)
        valid_logs = valid_epoch.run(valid_loader)

    if not os.path.exists(train_params['model_name']):
        torch.save(model, train_params['model_name'])

In [None]:
CLASSES = ['solar_panel']
BATCH_SIZE = 16
LR = 0.0001
LOSS = smp.losses.DiceLoss(smp.losses.BINARY_MODE, from_logits=True)

DEVICE = 'cuda'
DATA_DIR = 'data'

In [None]:
def gen_params(arch, encoder, epochs):
    return {
        'architecture': arch,
        'encoder': encoder,
        'model_name': f'models_pytorch/{arch.__name__.lower()}_{encoder}_model_{epochs}.pth',
        'data_dir': DATA_DIR,

        'classes': CLASSES,
        'lr': LR,
        'epochs': epochs,
        'batch_size': BATCH_SIZE,

        'loss': LOSS,

        'metrics': [smp.metrics.IoU(threshold=0.5),
                    smp.metrics.Fscore(threshold=0.5)],
        'optimizer': torch.optim.Adam
    }

def gen_test_params(model_name):
    _, encoder, _ = get_model_info(model_name)
    return {
        'encoder': encoder,
        'data_dir': DATA_DIR,
        'classes': CLASSES,

        'loss': LOSS,

        'metrics': [smp.metrics.IoU(threshold=0.5),
                    smp.metrics.Fscore(threshold=0.5)],
    }

In [None]:
def test(model, test_params, device):

    test_dataset = get_sp_dataset('test', get_validation_augmentation, test_params)  # Dataset for validation images
    test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=2)

    test_epoch = smp.train.ValidEpoch(
        model=model,
        loss=test_params['loss'],
        metrics=test_params['metrics'],
        device=device,
    )

    logs = test_epoch.run(test_dataloader)

    return test_dataset, logs

def inference(model, params, device, img_folder='gmaps'):
    dataset = get_gm_dataset(img_folder, get_validation_augmentation, params)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=2)

    masks = []
    model.eval()
    with torch.no_grad():
        for image in dataloader:
            image = image.to(device)
            pr_mask = model.predict(image).cpu()
            pr_mask = (pr_mask.squeeze().numpy().round())
            prob = model(image)
            masks.append(pr_mask)

    return masks

In [None]:
# from itertools import product

# epochs = 25
# architectures = [smp.Unet, smp.UnetPlusPlus, smp.MAnet, smp.Linknet, smp.FPN, smp.PSPNet, smp.PAN, smp.DeepLabV3, smp.DeepLabV3Plus]

# encoders = [
#     # 'resnet50',
#     # 'resnext50_32x4d',
#     # 'timm-resnest50d_4s2x40d',
#     # 'timm-res2next50', 'timm-regnetx_064', 'timm-gernet_m',
#     # 'se_resnext101_32x4d',
#     # 'densenet201',
#     # 'xception',
#     'efficientnet-b2',
#     # 'timm-efficientnet-b3',
#     'timm-mobilenetv3_large_100',
#     'vgg16_bn', 'vgg19_bn'
# ]
# count = 0
# for arch, encoder in product(architectures, encoders):
#     train_params = gen_params(arch, encoder, epochs)
#     print('ARCH:', train_params['architecture'].__name__)
#     print('ENCODER:', train_params['encoder'])

#     train(train_params, DEVICE, verbose=False)

#     best_model = torch.load(train_params['model_name'])
#     _, logs = test(best_model, train_params, DEVICE)

#     if logs.get('fscore') > 0.92:
#         break

#     print()

In [None]:
ARCHITECTURE = smp.UnetPlusPlus
ENCODER = 'se_resnext101_32x4d'
EPOCHS = 50

train_params = gen_params(ARCHITECTURE, ENCODER, EPOCHS)

In [None]:
train(train_params, DEVICE)

# Model Evaluation

In [None]:
import matplotlib.patches as patches
from PIL import Image, ImageDraw
from skimage.filters import threshold_otsu
import skimage.measure as km
from scipy import ndimage as nd
import skimage.morphology as morph


def post_process(image):
    image = nd.binary_closing(image)
    image = nd.binary_fill_holes(image)
    image = morph.erosion(image, selem=morph.disk(5))
    image = morph.dilation(image, selem=morph.disk(5))
    return image

def overlap(image, mask):
    color = np.array([255, 0, 0], dtype='uint8')  # color to fill

    # equal color where mask, else image
    # this would paint your object silhouette entirely with `color`
    masked_img = np.where(mask[...,None], color, image)

    # use `addWeighted` to blend the two images
    # the object will be tinted toward `color`
    out = cv2.addWeighted(image, 0.7, masked_img, 0.2,0)
    return out

def visualize(**images):
    """
    Helper function for data visualization
    Plot images in one row."""
    n = len(images)
    plt.figure(figsize=(16, 5))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()

In [None]:
# model_name = 'deeplabv3plus_timm-efficientnet-b3_model_25.pth' # !
model_name = 'unetplusplus_timm-resnest50d_4s2x40d_model_50.pth'
best_model = torch.load(os.path.join('models_pytorch', model_name))
path = 'gmaps/images'
path = 'plots'

params = gen_test_params(model_name)
masks = inference(best_model, params, DEVICE, img_folder=path)

data/plots


In [None]:
dataset = GoogleMapsDataset('data/' + path)
for idx, (img, mask) in enumerate(zip(dataset, masks)):
    mask = post_process(mask)
    if 1 in mask:
        mask = overlap(img, mask)
        mask_img = Image.fromarray(mask)
        mask_img.save(f'data/plots{idx}.png')
        # visualize(image=img, mask=mask)

In [None]:
path = 'plots'
dataset = GoogleMapsDataset(os.path.join(DATA_DIR, path))

for model in os.listdir('models_pytorch'):
    print(model)

    best_model = torch.load(os.path.join('models_pytorch', model))
    masks = inference(best_model, gen_test_params(model), DEVICE, img_folder=path)

    for img, mask in zip(dataset, masks):
        mask = post_process(mask)
        if 1 in mask:
            mask_img = Image.fromarray(overlap(img, mask))
            visualize(image=img, mask=mask_img)

Output hidden; open in https://colab.research.google.com to view.

In [None]:
def test_all(models_path, arch=None, enc=None, epochs=None, inference=False):
    results = {}
    arch = str(arch) if arch is not None else ''
    enc = str(enc) if enc is not None else ''
    epochs = f'{epochs}.pth' if epochs is not None else ''    

    for model in os.listdir(models_path):
        
        if model.startswith(arch) and enc in model and model.endswith(epochs):
            print(model)
            best_model = torch.load(f'{models_path}/{model}')

            params = gen_test_params(model)

            if inference:
                result = inference(best_model, params, DEVICE)
            else:
                _, result = test(best_model, params, DEVICE)

            # dataset = get_gm_dataset('test_gm', get_validation_augmentation, params)
            results[model.split('.')[0]] = result

    
    return results

In [None]:
from pprint import pprint

logs = {}
for arch in ['unetplusplus', 'fpn', 'pspnet', 'deeplabv3plus']:
    logs[arch] = test_all('models_pytorch', arch=arch)

# pprint(logs['unetplusplus'])

In [None]:
def compute_scores(results):
    scores = {}
    for arch in results:
        iou = np.mean([x.get('iou_score') for x in results[arch].values()])
        fsc = np.mean([x.get('fscore') for x in results[arch].values()])
        scores[arch] = {'iou': iou, 'fscore': fsc}
    return scores

scores = compute_scores(logs)
pprint(scores)

{'deeplabv3plus': {'fscore': 0.8635911337846935, 'iou': 0.8025295631626088},
 'fpn': {'fscore': 0.8770014081403256, 'iou': 0.8223624959882697},
 'pspnet': {'fscore': 0.8174557797752686, 'iou': 0.7396663873501653},
 'unetplusplus': {'fscore': 0.8874491387298703, 'iou': 0.8433931577938459}}
