In [None]:
import numpy as np 
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import numpy as np
import os
import PIL
import glob 
import matplotlib.pyplot as plt
from PIL import Image
import torch

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
import torchvision
import torch.nn.functional as F
from torch.autograd import Variable

from PIL import Image
import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2


import time
import os
from tqdm.notebook import tqdm
import torchvision.transforms as transforms

!pip install segmentation-models-pytorch
!pip install torchsummary
from torchsummary import summary
import segmentation_models_pytorch as smp

# Import data
train_images = '../input/fdl21-fdl-dsba/train_images/train_images/'
train_masks = '../input/fdl21-fdl-dsba/train_masks/train_masks/'
test_images = '../input/fdl21-fdl-dsba/test_images/test_images/'

n_classes = 25
# Create a df to store image_id
def create_df(filepath):
    name = []
    for dirname, _, filenames in os.walk(filepath):
        for filename in filenames:
            name.append(filename.split('.')[0])
    
    return pd.DataFrame({'id': name}, index = np.arange(0, len(name)))

train_df = create_df(train_images)

test_df=create_df(test_images)

# Split data
X_train, X_val = train_test_split(train_df['id'].values, test_size=0.15, random_state=19)

class UAVDataset(Dataset):
    
    def __init__(self, img_path, mask_path, X, mean, std, transform=None, patch=False):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X
        self.transform = transform
        self.patches = patch
        self.mean = mean
        self.std = std
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        img = cv2.imread(self.img_path + self.X[idx] + '.jpg')
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.mask_path + self.X[idx] + '.png', cv2.IMREAD_GRAYSCALE)
        if self.transform is not None:
            aug = self.transform(image=img, mask=mask)
            img = Image.fromarray(aug['image'])
            mask = aug['mask']
        
        if self.transform is None:
            img = Image.fromarray(img)
        
        t = T.Compose([T.ToTensor(), T.Normalize(self.mean, self.std)])
        img = t(img)
        mask = torch.from_numpy(mask).long()
        
        if self.patches:
            img, mask = self.tiles(img, mask)
            
        return img, mask
    def tiles(self, img, mask):

        img_patches = img.unfold(1, 512, 512).unfold(2, 768, 768) 
        img_patches  = img_patches.contiguous().view(3,-1, 512, 768) 
        img_patches = img_patches.permute(1,0,2,3)
        
        mask_patches = mask.unfold(0, 512, 512).unfold(1, 768, 768)
        mask_patches = mask_patches.contiguous().view(-1, 512, 768)
        
        return img_patches, mask_patches


img_h, img_w = 40, 30
means, stdevs = [], []
img_list = []
 
imgs_path = '../input/fdl21-fdl-dsba/train_images/train_images'
imgs_path_list = os.listdir(imgs_path)
 
len_ = len(imgs_path_list)
i = 0
for item in imgs_path_list:
    img = cv2.imread(os.path.join(imgs_path,item))
    img = cv2.resize(img,(img_w,img_h))
    img = img[:, :, :, np.newaxis]
    img_list.append(img)
    i += 1
       
imgs = np.concatenate(img_list, axis=3)
imgs = imgs.astype(np.float32) / 255.
 
for i in range(3):
    pixels = imgs[:, :, i, :].ravel()  # flatten
    means.append(np.mean(pixels))
    stdevs.append(np.std(pixels))

transform_train = A.Compose([# resize 
                             A.Resize(512, 512, interpolation=cv2.INTER_NEAREST), 
                             # flip
                             A.HorizontalFlip(p=0.8),  # probability to be reviwed
                             A.VerticalFlip(p=0.8),
                             # rotate
                             A.RandomRotate90(p=0.8),
                             # transpose
                             A.Transpose(p=0.8),
                             # contrast 
                             A.RandomBrightnessContrast(brightness_limit = (-0.2,0.5),
                                                         contrast_limit = (-0.2,0.5),
                                                         p=0.8),
                             # sharpen 
                             A.Sharpen (p=0.8),
                             # crop 
                             A.RandomCrop(height = 512, width = 512, p=0.00001), 
                             ]) 

                              

transform_val = A.Compose([A.Resize(512, 512, interpolation=cv2.INTER_NEAREST)
                          ])

#datasets
train_set = UAVDataset(train_images, train_masks, X_train, means, stdevs, transform_train, patch=False)
val_set = UAVDataset(train_images, train_masks, X_val, means, stdevs, transform_val, patch=False)

#dataloader
batch_size= 15

train_loader = DataLoader(train_set, batch_size=batch_size, num_workers = 4, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)

#### Descriptive Analysis of the Images

In [None]:
#Let's look at one image to see what exactly we need to train on 
images, masks = iter(train_loader).next()

fig,ax = plt.subplots(3,2,dpi=300)

ax[0][0].imshow(images[0][0])
ax[0][1].imshow(masks[0])
ax[0][0].set_title('Image (Red Channel)')
ax[0][1].set_title('Mask')

ax[1][0].imshow(images[1][0])
ax[1][1].imshow(masks[1])
ax[1][0].set_title('Image (Red Channel)')
ax[1][1].set_title('Mask')

ax[2][0].imshow(images[2][0])
ax[2][1].imshow(masks[2])
ax[2][0].set_title('Image (Red Channel)')
ax[2][1].set_title('Mask')

plt.tight_layout()

In [None]:
def to_categorical(y, num_classes):
    """ 1-hot encodes a tensor """
    return np.eye(num_classes, dtype='uint8')[y]

## Network Architecture: U-Net GAN 

The model which will be using is built on top of a GAN, where the following networks are described as follows: 

1. Generator: U-Net 
2. Discriminator: Image GAN 

In [None]:
import torch.nn as nn
import torch.nn.functional as F

# Define double convolution structure
def double_conv(in_c, out_c):
    conv = nn.Sequential(
        nn.Conv2d(in_c,out_c,3,1,padding='same'),
        # Add 2 extra batch normalization layers
        nn.BatchNorm2d(out_c),
        nn.ReLU(),
        nn.Conv2d(out_c,out_c,3,1,padding='same'),
        nn.BatchNorm2d(out_c),
        nn.ReLU()
    )
    return conv

class Generator(nn.Module):
    
    def __init__(self):
        super(Generator, self).__init__()
        
        #Downsampling 
        
        self.downlayer1 = double_conv(3, 16)
        self.maxpool = nn.MaxPool2d(2)
        
        #Add dropout layer
        self.dropout = nn.Dropout(p=0.1)
        
        self.downlayer2 = double_conv(16, 32)
        
        self.downlayer3 = double_conv(32, 64)
        
        self.downlayer4 = double_conv(64, 128)
        
        self.downlayer5 = double_conv(128, 256)
        
        
        #UpSampling
        self.up_trans1 = nn.ConvTranspose2d(256,128,2,2)
        self.up_conv1 = double_conv(256, 128)
        
        self.up_trans2 = nn.ConvTranspose2d(128,64,2,2)
        self.up_conv2 = double_conv(128, 64)
        
        self.up_trans3 = nn.ConvTranspose2d(64,32,2,2)
        self.up_conv3 = double_conv(64, 32)
        
        self.up_trans4 = nn.ConvTranspose2d(32,16,2,2)
        self.up_conv4 = double_conv(32, 16)
        
        self.output = nn.Conv2d(16,25,kernel_size = 1)
        
        #normaliza output
        self.softmax = nn.Softmax(dim=1)
        
        
    def forward(self,x):
        
        #encoder
        x1 = self.downlayer1(x) #
        x2 = self.maxpool(x1)
        x3 = self.dropout(x2)
        
        x4 = self.downlayer2(x3) #
        x5 = self.maxpool(x4)
        x6 = self.dropout(x5)
        
        x7 = self.downlayer3(x6) #
        x8 = self.maxpool(x7)
        x9 = self.dropout(x8)
 
        x10 = self.downlayer4(x9) #
        x11 = self.maxpool(x10)
        x12 = self.dropout(x11)
        
        x13 = self.downlayer5(x12)
        
        
        #decoder
        x = self.up_trans1(x13)
        x = self.up_conv1(torch.cat([x,x10],1))
        
        x = self.up_trans2(x)
        x = self.up_conv2(torch.cat([x,x7],1))
        
        x = self.up_trans3(x)
        x = self.up_conv3(torch.cat([x,x4],1))
        
        x = self.up_trans4(x)
        x = self.up_conv4(torch.cat([x,x1],1))
        
        output = self.output(x)
        output = self.softmax(output)
        
        return output

In [None]:
import torch.optim as optim
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
lr = 0.001

model = Generator()
model.to(device)

optimizer = optim.Adam(model.parameters(), lr = lr)
criterion = nn.CrossEntropyLoss()
epochs = 25

In [None]:
train_loss = 0.0

for epoch in range(epochs):

    for i, (images, labels) in enumerate(train_loader):
        
        images = images.to(device=device, dtype=torch.float)
        labels = labels.to(device=device, dtype=torch.int64)
        
        # Clear gradients
        optimizer.zero_grad()
        
        # Forward propagation
        outputs = model(images)
        
        # Calculate softmax and ross entropy loss
        loss = criterion(outputs, labels)
        
        # Calculating gradients
        loss.backward()
        
        # Update parameters
        optimizer.step()
        
        train_loss += loss.item()
        
    # compute average training loss
    train_loss = train_loss/len(train_loader)
    print(f"Epoch {epoch+1} Training loss: {train_loss}")
        

Predictions on testing data

In [None]:
class TestLoader(Dataset):
    
    def __init__(self, images, tgt_dim):
        self.images = images 
        self.tgt_dim = tgt_dim
        
    def __len__(self):
        return len(self.images)

    def __getitem__(self,idx):
        idx_image = self.images[idx]
        image = Image.open(idx_image).resize((self.tgt_dim,
                                             self.tgt_dim),PIL.Image.NEAREST)
        
        img = np.array(image).astype(float)
        
        transform = transforms.Compose([

            transforms.ToTensor(),
            transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
        ])
        return transform(img)

In [None]:
#Load the test set 
test = sorted(glob.glob(test_images+'*.jpg'))
test_set = TestLoader(test,512)
test_dataloader = torch.utils.data.DataLoader(test_set,shuffle=False)

In [None]:
from itertools import islice
#Predict the test set maps
test_predictions = []
for image in test_dataloader:
    with torch.no_grad():
        image = image.to(device)
        prediction = model(image.float())
        prediction = torch.argmax(prediction, dim=1)
        test_predictions.append(np.uint8(prediction.cpu().numpy()[0]))

#Saving images to output path 
# os.makedirs('./test_preds')
test_dir = './test_pre_4/'
test_img_names = sorted([s[:-4] for s in os.listdir(test_images)])
d = dict(zip(test_img_names,test_predictions))
for name, pred in d.items():
    im = Image.fromarray(pred)
    im.save(test_dir+name+'.png')
    print("Image saved: ", test_dir + name + '.png')

In [None]:
os.makedirs('./test_pre_4')

Preparing data for submission

In [None]:
import os

from PIL import Image
import numpy as np

def rle_encode(img):
    '''
    img: numpy array, 1 - mask, 0 - background
    Returns run length as string formated
    '''
    pixels = img.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

def rle_decode(mask_rle, shape):
    '''
    mask_rle: run-length as string formated (start length)
    shape: (height,width) of array to return
    Returns numpy array, 1 - mask, 0 - background
    '''
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape)


def create_rles():
    """Used for Kaggle submission: predicts and encode all test images"""
    dir = './test_pre_4/'
    N = len(list(os.listdir(dir)))
    with open('submission_file_4.csv', 'w') as f:
        f.write('ImageClassId,rle_mask\n')
        for index, i in enumerate(os.listdir(dir)):
            # print('{}/{}'.format(index, N))

            mask = Image.open(dir + i)
            mask = mask.resize((1024, 1024), resample=Image.NEAREST)
            mask = np.array(mask)

            for x in range(1, 25):
                enc = rle_encode(mask == x)
                f.write(f"{i.split('_')[0]}_{x},{enc}\n")

create_rles()