In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import os
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import os

import numpy as np
import cv2
import matplotlib.pyplot as plt
import torchvision.transforms as tvtransforms
import time
import torch
import numpy as np
import segmentation_models_pytorch as smp
import metrics as smpmetrics
import albumentations as A
from tqdm import tqdm
from meter import AverageValueMeter
from albumentations.pytorch import ToTensorV2
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset
from PIL import Image
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset
import albumentations as A
from torch.utils.data import DataLoader
import segmentation_models_pytorch as smp


In [None]:
MULTICLASS_MODE: str = "multiclass"
ENCODER = "resnet18"
ENCODER_WEIGHTS = 'imagenet'
CLASSES = ['background', 'ocean', 'wetsand', 'buildings', 'vegetation', 'drysand']
ACTIVATION = None
BATCH_SIZE = 2
MODEL_NAME = 'deeplabv3'
EPOCHS = 200
DEVICE = 'cuda'
TRAIN_IMG_DIR = "../../CoastSat/data/blackpool/images/train/"
TRAIN_MASK_DIR = "../../CoastSat/data/blackpool/images/trainannot/"
VAL_IMG_DIR = "../../CoastSat/data/blackpool/images/test/"
VAL_MASK_DIR = "../../CoastSat/data/blackpool/images/testannot/"

In [None]:
avail = torch.cuda.is_available() # just checking which devices are available for training
devCnt = torch.cuda.device_count()
devName = torch.cuda.get_device_name(0)
print("Available: " + str(avail) + ", Count: " + str(devCnt) + ", Name: " + str(devName))

In [None]:
class Dataset(Dataset):
    """This method creates the dataset from given directories"""
    def __init__(self, image_dir, mask_dir, transform=None):
        """initialize directories

        :image_dir: TODO
        :mask_dir: TODO
        :transform: TODO

        """
        self._image_dir = image_dir
        self._mask_dir = mask_dir
        self._transform = transform
        self.images = os.listdir(image_dir)

        self.mapping = {(0, 0, 0): 0, # background class (black)
                        (0, 0, 255): 1,  # 0 = class 1
                        (225, 0, 225): 2,  # 1 = class 2
                        (255, 0, 0): 3,  # 2 = class 3
                        (255, 225, 225): 4, # 3 = class 4
                        (255, 255, 0): 5}  # 4 = class 5

    def __len__(self):
        """returns length of images
        :returns: TODO

        """
        return len(self.images)
    
    def mask_to_class_rgb(self, mask):
        #print('----mask->rgb----')
        h=20
        w=722
        mask = torch.from_numpy(mask)
        mask = torch.squeeze(mask)  # remove 1

        # check the present values in the mask, 0 and 255 in my case
        #print('unique values rgb    ', torch.unique(mask)) 
        # -> unique values rgb     tensor([  0, 255], dtype=torch.uint8)

        class_mask = mask
        class_mask = class_mask.permute(2, 0, 1).contiguous()
        h, w = class_mask.shape[1], class_mask.shape[2]
        mask_out = torch.zeros(h, w, dtype=torch.long)

        for k in self.mapping:
            idx = (class_mask == torch.tensor(k, dtype=torch.uint8).unsqueeze(1).unsqueeze(2))         
            validx = (idx.sum(0) == 3)          
            mask_out[validx] = torch.tensor(self.mapping[k], dtype=torch.long)

        # check the present values after mapping, in my case 0, 1, 2, 3PSPNet
        #print('unique values mapped ', torch.unique(mask_out))
        # -> unique values mapped  tensor([0, 1, 2, 3])
       
        return mask_out

    def __getitem__(self, index):
        """TODO: Docstring for __getitem__.
        :returns: TODO

        """
        img_path = os.path.join(self._image_dir, self.images[index])
        mask_path = os.path.join(self._mask_dir, self.images[index])
        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("RGB"))
        mask = self.mask_to_class_rgb(mask).cpu().detach().numpy()


        if self._transform is not None:
            augmentations = self._transform(image=image, mask=mask)
            image = augmentations["image"]
            mask = augmentations["mask"]
            

        return image, mask

In [None]:
def get_loaders(
    train_dir,
    train_mask_dir,
    val_dir,
    val_mask_dir,
    batch_size,
    train_transform,
    val_transform,
    num_workers=4,
    pin_memory=True,
):
    """
    This method creates the dataloader objects for the training loops

    :train_dir: directory of training images
    :train_mask_dir: directory of training masks
    :val_dir: validation image directory

    :returns: training and validation dataloaders
recall
    """
    train_ds = Dataset(image_dir=train_dir,
                             mask_dir=train_mask_dir,
                             transform=train_transform)
    train_loader = DataLoader(
        train_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=True,
    )

    val_ds = Dataset(
        image_dir=val_dir,
        mask_dir=val_mask_dir,
        transform=val_transform,
    )

    val_loader = DataLoader(
        val_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=False,
    )

    return train_loader, val_loader

In [None]:
    """
    Training and testing image transforms using albumentation libraries
    """
test_transform = A.Compose(
    [A.PadIfNeeded(min_height=32, min_width=512, border_mode=4),A.Resize(32, 512),]
)

train_transform = A.Compose(
    [
        A.PadIfNeeded(min_height=32, min_width=512, border_mode=4),
        A.Resize(32, 512),
        A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=0.5),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.MedianBlur(blur_limit=3, always_apply=False, p=0.1),
    ]
)

In [None]:
# get the dataloaders
trainDL, testDL = get_loaders(TRAIN_IMG_DIR, TRAIN_MASK_DIR,
                                           VAL_IMG_DIR, VAL_MASK_DIR,
                                           BATCH_SIZE, train_transform,
                                           test_transform)


In [None]:


def visualize(**images):
    """Method to generate visualization of the training images
    """
    n = len(images)
    plt.figure(figsize=(16,5))

    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i+1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()

In [None]:
transforms_delete = A.Compose([
        A.PadIfNeeded(32,512),
        A.Resize(32, 512),
        ToTensorV2(),
    ], )
dataset = Dataset(TRAIN_IMG_DIR, TRAIN_MASK_DIR, transform=transforms_delete)

image, mask = dataset[10] # get some sample

#visualize(
#    image=image,
#    mask=mask.squeeze()
#)
#mask = mask.unsqueeze(0)

print(mask.shape)
print(image.shape)
unique, counts = np.unique(mask, return_counts=True)
print(dict(zip(unique, counts)))
print(image.shape, image.dtype, type(image), mask.shape, 
mask.dtype, type(mask), mask.min(), 
image.max(), mask.min(), mask.max())

plt.imshow(image.permute(1,2,0))
plt.show()

plt.imshow(mask)
plt.show()

#mask = mask.unsqueeze(0)
#print(mask.shape)

print(image.shape, image.dtype, type(image), mask.shape, 
mask.dtype, type(mask), mask.min(), 
image.max(), mask.min(), mask.max())

print(len(CLASSES))


In [None]:
# initialize model
model = smp.DeepLabV3(
    encoder_name=ENCODER, 
    encoder_weights=ENCODER_WEIGHTS, 
    in_channels=3,
    classes=len(CLASSES),
    activation=ACTIVATION,
)

In [None]:


loss = smp.losses.DiceLoss(mode="multiclass")
loss.__name__ = 'Dice_loss'

# metrics have been defined in the custom training loop as giving them in a list object did not work for me
metrics = [

]

# define optimizer and learning rate
optimizer = optim.Adam(params=model.parameters(), lr=0.0001)

In [None]:
# training epoch
train_epoch = smp.utils.train.TrainEpoch(
    model, 
    loss=loss,
    metrics= metrics,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

# testing epoch
test_epoch = smp.utils.train.ValidEpoch(
    model, 
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

In [None]:
def check_accuracy(metrics, loader, model, device='cpu'):
    """ Custom method to calculate accuracy of testing data
    :loader: dataloader objects
    :model: model to test
    :device: cpu or gpu
    """

    # define scores to track
    f1_score = 0
    precision_score = 0
    recall_score = 0
    iou_score = 0
    dataset_size = len(testDL.dataset)  # number of images in the dataloader

    model.eval() # set model for evaluation
    with torch.no_grad():  # do not calculate gradients
        for x, y in loader:
            x = x.to(device)
            y = y.to(device).unsqueeze(1)
            y = y.to(torch.int64)
            x = x.permute(0,3,1,2)
            preds = model(x.float())  # get pixel predictions from image tensor
            preds = torch.argmax(preds, dim=1).unsqueeze(1).int()  # get maximum values of tensor along dimension 1

            tp, fp, fn, tn = smpmetrics.get_stats(preds, y, mode='multiclass', num_classes=6)  # get tp,fp,fn,tn from predictions

            # compute metric
            a = smpmetrics.iou_score(tp, fp, fn, tn, reduction="macro")
            b = smpmetrics.f1_score(tp,fp,fn,tn, reduction='macro')
            c = smpmetrics.precision(tp,fp,fn,tn, reduction='macro')
            d = smpmetrics.recall(tp,fp,fn,tn, reduction='macro')

            iou_score += a
            f1_score += b
            precision_score += c
            recall_score += d

    iou_score /= dataset_size  # averaged score across all images in directory
    f1_score /= dataset_size
    precision_score /= dataset_size
    recall_score /= dataset_size

    
    print('IOU Score: {} | F1 Score: {} | Precision Score: {} | Recall Score: {}'.format(iou_score, f1_score, precision_score, recall_score))
    model.train()


In [None]:
scaler = torch.cuda.amp.GradScaler()

def train_fn(loader, model, optimizer, loss_fn, scaler):
    """ Custom training loop for models

    :loader: dataloader object
    :model: model to train
    :optimizer: training optimizer
    :loss_fn: loss function
    :scaler: scaler object
    :returns:

    """
    loop = tqdm(loader)  # just a nice library to keep track of loops

    for batch_idx, (data, targets) in enumerate(loop):  # iterate through dataset
        data = data.to(device=DEVICE).float() 
        targets = targets.to(device=DEVICE)
        targets = targets.unsqueeze(1)
        data = data.permute(0,3,1,2)  # correct shape for image
        targets = targets.to(torch.int64)

        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)

        # backward
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        # loss_values.append(loss.item())
        #run['training/batch/loss'].log(loss)

        #update loop
        loop.set_postfix(loss=loss.item())

for epoch in range(EPOCHS):  # run training and accuracy functions and save model
    train_fn(trainDL, model, optimizer, loss, scaler)
    check_accuracy(metrics, testDL, model, DEVICE)
    torch.save(model, './{}.pth'.format(MODEL_NAME))

In [None]:
# Predict using saved model

val_transform = A.Compose(  # validation image transforms
    [A.Resize(32, 512),ToTensorV2()]
)

val_image = Image.open('../../CoastSat/data/blackpool/images/val/tile_0-2960.png')

#val_image.show()

transformed = val_transform(image=np.array(val_image))
image_transformed = transformed['image']
batch_tensor = torch.unsqueeze(image_transformed, 0)





In [None]:
model = torch.load('./{}.pth'.format(MODEL_NAME))  # load model

stack all tiled validation images together and save image as predictions

In [None]:
# import required module
import os
# assign directory
directory = '../../CoastSat/data/blackpool/images/val/'
image_matrix = np.zeros(shape=(32,512))


# iterate over files in
# that directory
for filename in sorted(os.listdir(directory)):
    f = os.path.join(directory, filename)
    # checking if it is a file
    if os.path.isfile(f):
        val_image = Image.open(f)
        transformed = val_transform(image=np.array(val_image))
        image_transformed = transformed['image']
        batch_tensor = torch.unsqueeze(image_transformed, 0)
        out = model(batch_tensor.to(device=DEVICE).float())

        preds = torch.argmax(out, dim=1)
        preds = preds.cpu().detach().permute(1,2,0)
        preds = preds[:,:,0]

        preds_np = preds.numpy()

        image_matrix = np.vstack((image_matrix, preds_np))

image_matrix = image_matrix[32:,:]
plt.imshow(image_matrix)
plt.savefig('blackpool_preds_{}.png'.format(MODEL_NAME))
plt.show()


        
        
        

Now do the same for the rgb image. 

Image size is 32,512,3 - so you have to divide the final tensor object by 255 for matplotlib to plot the rgb original image.

In [None]:
directory = '../../CoastSat/data/blackpool/images/val/'
image_matrix1 = np.zeros(shape=(32,512,3))
#print(image_matrix.shape)
#testmat = np.vstack((image_matrix, preds.numpy()))

 
# iterate over files in
# that directory
for filename in sorted(os.listdir(directory)):
    f = os.path.join(directory, filename)
    # checking if it is a file
    if os.path.isfile(f):
        val_image = Image.open(f)
        transformed = val_transform(image=np.array(val_image))
        image_transformed = transformed['image']
        img3 = image_transformed.permute(1,2,0)
        #img3 = img3[:,:,0]
        print(img3.shape)
        image_matrix1 = np.vstack((image_matrix1,img3))

image_matrix1 = image_matrix1[32:,:]/255
plt.imshow(image_matrix1)
plt.savefig('blackpool_gt_rgb_{}.png'.format(MODEL_NAME))
plt.show()

Finally, get the ground truth masks basically same as above

In [None]:
# import required modules
import os
# assign directory
directory3 = '../../CoastSat/data/blackpool/images/valannot/'
image_matrix3 = np.zeros(shape=(32,512,3))


# iterate over files in
# that directory
for filename in sorted(os.listdir(directory3)):
    f = os.path.join(directory3, filename)
    # checking if it is a file
    if os.path.isfile(f):
        val_image = Image.open(f)
        transformed = val_transform(image=np.array(val_image))
        image_transformed = transformed['image']
        img3 = image_transformed.permute(1,2,0)
        image_matrix3 = np.vstack((image_matrix3, img3))


image_matrix3 = image_matrix3[32:,:]
plt.imshow(image_matrix3)
plt.savefig('blackpool_gt_mask_{}.png'.format(MODEL_NAME))
plt.show()