In [None]:
from os import listdir
from os.path import join, splitext

from typing import Tuple

import numpy as np
from PIL import Image
import pandas as pd
import torch
from torchvision import transforms
from torchvision.transforms import ToTensor, RandomHorizontalFlip, RandomVerticalFlip, Normalize

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
import os.path

DATA_FOLDER = '/content/gdrive/My Drive/'
TRAINING_SET_FOLDER = os.path.join(DATA_FOLDER, 'semantic_drone_dataset')

#Resized version of the images stored on the disk
INPUT_IMAGES_FOLDER = os.path.join(TRAINING_SET_FOLDER, 'resized_original250')
LABEL_IMAGES_FOLDER = os.path.join(TRAINING_SET_FOLDER, 'resized_label250')

TRAIN_CSV = os.path.join(DATA_FOLDER, 'train.csv')
TEST_CSV = os.path.join(DATA_FOLDER, 'test.csv')
VALIDATION_CSV = os.path.join(DATA_FOLDER, 'validation.csv')

In [None]:
"dataset: utility to access the data"

import random

class CSVDataset(torch.utils.data.Dataset):
    "dataset for a subset of the raw dataset given by a csv file"

    def __init__(self, source, transform=None):
        "Initializes a dataset from a csv file created by split"
        super().__init__()

        self.transform = transform
        if transform is None:
            self.transform = lambda e: e

        # get pd.Series containing all filename
        self.imgs = pd.read_csv(source, names=('img',)).img

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, index: int):
        "returns the index-th data item of the dataset."
        fname = self.imgs[index]
        inpt = Image.open(join(INPUT_IMAGES_FOLDER, fname))
        lbel = Image.open(join(LABEL_IMAGES_FOLDER, splitext(fname)[0]+'.png'))

        return self.transform(inpt), self.transform(lbel), fname


class SegmentationDataset(torch.utils.data.Dataset):
    "dataset holding images and their segmentation masks (greyscale)"

    def __init__(self, source: str, crop_size: Tuple[int, int], train: bool):
        "source: file path, crop_size: (width, height), train: true if training set"
        super().__init__()
        self.images_transform = transforms.Compose(
            [transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ])
        self.crop_h = crop_size[0]
        self.crop_w = crop_size[1]

        # get pd.Series containing all filename
        self.imgs = pd.read_csv(source, names=('img',)).img
        self.train = train

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, index: int):
        
        fname = self.imgs[index]
        inpt = Image.open(join(INPUT_IMAGES_FOLDER, fname))
        lbel = Image.open(join(LABEL_IMAGES_FOLDER, splitext(fname)[0] + '.png'))

        if self.train:
            crop_list = []
            for _ in range(3):

                try:
                    #Random crop
                    i, j, h, w = transforms.RandomCrop.get_params(
                        inpt, output_size=(self.crop_h, self.crop_w) )
                    image = transforms.functional.crop(inpt, i, j, h, w)
                    mask = transforms.functional.crop(lbel, i, j, h, w)
                    #Random horizontal flip
                    r = random.random()
                    if r > 0.5:
                        image = transforms.functional.hflip(image)
                        mask = transforms.functional.hflip(mask)
                    
                    crop_list.append( (self.images_transform(image), np.array(mask, dtype=np.int64)) )
            
                except ValueError:
                    print("Could not crop the image")
                    print("Image is discarded from training set")
                    break
            return crop_list
        else:
          inpt: torch.Tensor = self.images_transform(inpt)  # C, H, W
          return inpt, np.array(lbel, dtype=np.int64), fname


def trainset(size: Tuple[int, int]) -> CSVDataset:
    "trainset: returns the training set"
    return SegmentationDataset(TRAIN_CSV, size, train=True)


def testset(size: Tuple[int, int]) -> CSVDataset:
    "trainset: returns the testing set"
    return SegmentationDataset(TEST_CSV, size, train=False)

def validationset(size: Tuple[int, int] = (6000, 4000)) -> CSVDataset:
    "trainset: returns the validation set"
    return SegmentationDataset(VALIDATION_CSV, size)

def split(test=0.1, validation=0.1):
    df = pd.DataFrame(data=[f for f in listdir(
        INPUT_IMAGES_FOLDER) if f.endswith("jpg")])
    df = df.sample(frac=1, random_state=42)

    test_size = int(test * len(df))
    validation_size = int(validation * len(df))
    train_size = len(df) - test_size - validation_size

    test_end = train_size + test_size
    valid_end = test_end + validation_size

    df.iloc[0:train_size].to_csv(TRAIN_CSV, index=False, header=False)
    df.iloc[train_size:test_end].to_csv(TEST_CSV, index=False, header=False)
    df.iloc[test_end:valid_end].to_csv(
        VALIDATION_CSV, index=False, header=False)


if __name__ == "__main__":
    split()

In [None]:
torch.cuda.empty_cache()

In [None]:
import torch.nn as nn
import matplotlib.pyplot as plt


def conv(in_channels: int, out_channels: int) -> nn.Conv2d:
    return nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=1), nn.BatchNorm2d(out_channels), nn.ReLU())

def deconv(in_channels: int, out_channels: int) -> nn.Conv2d:
    return nn.Sequential(nn.ConvTranspose2d(in_channels, out_channels, kernel_size=3, padding=1, stride=1), nn.BatchNorm2d(out_channels), nn.ReLU())

class net(nn.Module):
    "Fully convolution neural network with a encoder based on VGG-16 architecture. Inspired from paper available on https://arxiv.org/abs/1505.04366"

    def __init__(self):
        super(net, self).__init__()
        

        self.pool = nn.MaxPool2d((2, 2), stride=2, return_indices=True)
        self.unpool = nn.MaxUnpool2d((2, 2))
        self.relu = nn.ReLU()

        self.conv1 = conv(3, 64)#1
        self.conv2 = conv(64, 64)#1
        self.conv3 = conv(64, 128)#1
        self.conv4 = conv(128, 128)#1
        self.conv5 = conv(128, 256)#1
        self.conv6 = conv(256, 256)#2
        self.conv7 = conv(256, 512)#1
        self.conv8 = conv(512, 512)#5
        #--------------------Total=13

        self.deconv8 = deconv(512, 512)
        self.deconv7 = deconv(512, 256)
        self.deconv6 = deconv(256, 256)
        self.deconv5 = deconv(256, 128)
        self.deconv4 = deconv(128, 128)
        self.deconv3 = deconv(128, 64)
        self.deconv2 = deconv(64, 64)
        self.deconv1 = deconv(64, 26)
                                 

    def forward(self, x):

        size1 = x.size()
        x = self.conv2(self.conv1(x))
        x, indices1 = self.pool(x)

        size2 = x.size()
        x = self.conv4(self.conv3(x))
        x, indices2 = self.pool(x)

        size3 = x.size()
        x = self.conv6(self.conv6(self.conv5(x)))
        x, indices3 = self.pool(x)

        size4 = x.size()
        x = self.conv8(self.conv8(self.conv7(x)))
        x, indices4 = self.pool(x)

        size5 = x.size()
        x = self.conv8(self.conv8(self.conv8(x)))
        x, indices5 = self.pool(x)


        x = self.unpool(x, indices=indices5, output_size=size5)
        x = self.deconv8(self.deconv8(self.deconv8(x)))

        x = self.unpool(x, indices=indices4, output_size=size4)
        x = self.deconv7(self.deconv8(self.deconv8(x)))

        x = self.unpool(x, indices=indices3, output_size=size3)
        x = self.deconv5(self.deconv6(self.deconv6(x)))

        x = self.unpool(x, indices=indices2, output_size=size2)
        x = self.deconv3(self.conv4(x))

        x = self.unpool(x, indices=indices1, output_size=size1)
        x = self.deconv1(self.conv2(x))

        return x


train_set = trainset((224, 224))
test_set = testset((600, 300))

train_ldr = torch.utils.data.DataLoader(
    train_set, batch_size=4, shuffle=True, num_workers=2, pin_memory=True)
test_ldr = torch.utils.data.DataLoader(
    test_set, batch_size=4, shuffle=False, num_workers=2, pin_memory=True)

device = 'cuda:0'
model = net().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

def train(num_epochs):
    "num_epochs: number of epochs used to train and test the model"
    
    train_loss = torch.zeros((num_epochs), device=device)
    test_loss = torch.zeros((num_epochs), device=device)
    test_acc = torch.zeros((num_epochs), device=device)

    for i in range(num_epochs):
        print(f'epoch ({i+1:2}/{num_epochs:2})', end='\r')
        #Training the model
        model.train()
        print("Training")
        for m, l in enumerate(train_ldr):
            print("Image " + str(m+1))
            for k, (x, y) in enumerate(l):
              outputs = model(x.to(device))
              loss = criterion(outputs, y.to(device))

              train_loss[i] += loss.detach().sum()

              optimizer.zero_grad()
              loss.backward()
              optimizer.step()

        print('')
        print(f'epoch {i+1:2}/{num_epochs:2} - evaluation')
        with torch.no_grad():
            model.eval()
            #Testing the model
            accuracy = 0.0
            for x, y, _ in test_ldr:
                
                inputs = x.to(device)
                targets = y.to(device)

                outputs = model(inputs)

                loss = criterion(outputs, targets)
                test_loss[i] += loss.sum()

                # per - pixel accuracy
                predicted = outputs.argmax(1)
                accuracy += (predicted == targets).float().mean()
            test_acc[i] = accuracy / len(test_ldr)

    train_loss /= (num_epochs * train_ldr.batch_size * len(train_ldr))
    test_loss /= (num_epochs * test_ldr.batch_size * len(test_ldr))

    return train_loss.cpu(), test_loss.cpu(), test_acc.cpu()


num_epochs = 100
print(f'training for {num_epochs} epochs')
epochs_train_loss, epochs_test_loss, test_acc = train(num_epochs)
torch.save(model.state_dict(), "/content/gdrive/MyDrive/VGG-FCN.pth")

print(f'{epochs_train_loss}\n{epochs_test_loss}\n{test_acc}')

idx = range(0, num_epochs)

plt.figure()
plt.plot(idx, epochs_train_loss, label='training loss')
plt.plot(idx, epochs_test_loss, label='testing loss')
plt.xlabel("epoch")
plt.ylabel("Cross-entropy loss")
plt.legend()
plt.show()
plt.savefig('content/gdrive/VGG-FCN.png')

plt.figure()
plt.plot(idx, test_acc, 'k', label='testing accuracy')
plt.xlabel("epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()
plt.savefig('content/gdrive/VGG-FCN.png')