# MALIS PROJECT

## Importing libraries

In [None]:
# Importing all the required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import os
from tqdm import tqdm
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import cv2
from glob import glob
import tensorflow as tf 
import keras
from tensorflow.keras.utils import Sequence, to_categorical, plot_model
from tensorflow.keras.layers import Conv2D, Dropout, MaxPooling2D, UpSampling2D, concatenate, Input, Dense,BatchNormalization,Activation,MaxPool2D
from tensorflow.keras import layers
from albumentations import HorizontalFlip, RandomRotate90, RandomBrightnessContrast
from tensorflow.keras.models import Model,Sequential
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, Callback

## Utils 

In [None]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
        
def preprocessing_with_augmentation(images, masks, save_path, augment=True):
    H = 256  
    W = 256
    for x,y in tqdm(zip(images, masks), total=len(images)):
        name = x.split('/')[-1].split('.')
        image_name = name[0]
        image_ext = name[1]

        name = y.split('/')[-1].split('.')
        mask_name = name[0]
        mask_ext = name[1]       
        
        x = cv2.imread(x, cv2.IMREAD_COLOR)
        x = cv2.resize(x, (W, H))
        y = cv2.imread(y, cv2.IMREAD_COLOR)
        y = cv2.resize(y, (W, H))
        
        if augment == True:
            
            aug = RandomRotate90()
            augmented = aug(image=x, mask=y)
            x1 = augmented['image']
            y1 = augmented['mask']
 
            aug = HorizontalFlip(always_apply=False, p=1.0)
            augmented = aug(image=x, mask=y)
            x2 = augmented['image']
            y2 = augmented['mask']
            
            aug = RandomBrightnessContrast(p=0.1)
            augmented = aug(image=x, mask=y)
            x3 = augmented['image']
            y3 = augmented['mask']      
            
            save_images = [x, x1, x2, x3]
            save_masks = [y, y1, y2, y3]            
          
        else:
            save_images = [x]
            save_masks = [y]
        
        idx = 0
        for i, m in zip(save_images, save_masks):
            i = cv2.resize(i, (W, H))
            m = cv2.resize(m, (W, H))
            
            new_image_name = f"{image_name}_{idx}.{image_ext}"
            new_mask_name = f"{mask_name}_{idx}.{mask_ext}" 
            
            image_path = os.path.join(save_path, 'images', new_image_name)
            mask_path = os.path.join(save_path, 'masks', new_mask_name)
            
            cv2.imwrite(image_path, i)
            cv2.imwrite(mask_path, m)

            idx+=1

            
def read_image(x):
    x = cv2.imread(x, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x/255.0
    x = x.astype(np.float32)
    return x


def read_mask(x):
    x = cv2.imread(x, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (W, H))
    x = x.astype(np.int32)
    return x


def tf_dataset(x,y, batch=4):
    dataset = tf.data.Dataset.from_tensor_slices((x,y))
    dataset = dataset.shuffle(buffer_size=500)
    dataset = dataset.map(preprocess)
    dataset = dataset.batch(batch)
    dataset = dataset.repeat()
    dataset = dataset.prefetch(2)
    return dataset
    

def preprocess(x,y):
    def f(x,y):
        x = x.decode()
        y = y.decode()
        image = read_image(x)
        mask = read_mask(y)
        return image, mask
    
    image, mask = tf.numpy_function(f,[x,y],[tf.float32, tf.int32])
    mask = tf.one_hot(mask, num_classes, dtype=tf.int32)
    image.set_shape([H, W, 3])    # In the Images, number of channels = 3. 
    mask.set_shape([H, W, num_classes])    # In the Masks, number of channels = number of classes. 
    return image, mask

## Visualization of a sample

In [None]:
path = '/kaggle/input/semantic-drone-dataset/dataset/semantic_drone_dataset/'
image_path =  path+"original_images/"
label_path =  path+"label_images_semantic/"
img = cv2.imread(image_path + '004.jpg')
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
mask = cv2.imread(label_path + '004.png', cv2.IMREAD_GRAYSCALE)
fig, axs = plt.subplots(1, 2, figsize=(20, 10))
axs[0].imshow(img)
axs[1].imshow(mask)

## Data preparation pipeline

In [None]:
## Split data
list_images =  sorted(glob(os.path.join(image_path, "*")))
list_labels =  sorted(glob(os.path.join(label_path, "*")))

img_train, img_test ,mask_train,mask_test = train_test_split(list_images,list_labels, test_size=0.2, random_state=19)

In [None]:
# Augment training data only and spit it in training and validation

# augment and preprocess training data 
create_dir("./training_validation/images/")
create_dir("./training_validation/masks/")

save_path = "./training_validation/"

preprocessing_with_augmentation(img_train, mask_train, save_path, augment=True)

# preprocess test data  without augmentation
 
create_dir("./testing/images/")
create_dir("./testing/masks/")

save_path = "./testing/"

preprocessing_with_augmentation(img_test, mask_test, save_path, augment=False)

In [None]:
viz_images =  sorted(glob(os.path.join('./training_validation/images/', "013*")))
viz_labels =  sorted(glob(os.path.join('./training_validation/masks/', "013*")))
for i in range(len(viz_images)):
    img = cv2.imread(viz_images[i])
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    mask = cv2.imread(viz_labels[i], cv2.IMREAD_GRAYSCALE)
    #mask = mask.cvtColor(img, cv2.COLOR_BGR2RGB)
    fig, axs = plt.subplots(1, 2, figsize=(20, 10))
    axs[0].imshow(img)
    axs[1].imshow(mask)

In [None]:
## Split data
list_images =  sorted(glob(os.path.join('./training_validation/images/', "*")))
list_labels =  sorted(glob(os.path.join('./training_validation/masks/', "*")))
img_test = sorted(glob(os.path.join('./testing/images/', "*")))
mask_test = sorted(glob(os.path.join('./testing/masks/', "*")))
img_train, img_val ,mask_train,mask_val = train_test_split(list_images,list_labels, test_size=0.1, random_state=19)

In [None]:
print('training samples : ', len(img_train))
print('validation samples : ', len(img_val))
print('testing samples : ', len(img_test))

## Define the model : U-NET

In [None]:
H = 256   
W = 256

img_size = (H, W)
num_classes = 23
batch_size = 8

train_dataset = tf_dataset(img_train, mask_train, batch = batch_size)
valid_dataset = tf_dataset(img_val, mask_val, batch = batch_size)
test_dataset = tf_dataset(img_test, mask_test, batch = batch_size)

train_steps = len(img_train)//batch_size
valid_steps = len(img_val)//batch_size
test_steps = len(img_test)//batch_size



def get_model(img_size, num_classes):
    inputs = keras.Input(shape=img_size + (3,))

    ### [First half of the network: downsampling inputs] ###

    # Entry block
    x = layers.Conv2D(32, 3, strides=2, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    # Blocks 1, 2, 3 are identical apart from the feature depth.
    for filters in [64, 128, 256]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = layers.Conv2D(filters, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    ### [Second half of the network: upsampling inputs] ###

    for filters in [256, 128, 64, 32]:
        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.UpSampling2D(2)(x)

        # Project residual
        residual = layers.UpSampling2D(2)(previous_block_activation)
        residual = layers.Conv2D(filters, 1, padding="same")(residual)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    # Add a per-pixel classification layer
    outputs = layers.Conv2D(num_classes, 3, activation="softmax", padding="same")(x)

    # Define the model
    model = keras.Model(inputs, outputs)
    return model


# Free up RAM in case the model definition cells were run multiple times
keras.backend.clear_session()

# Build model
model = get_model(img_size, num_classes)
model.summary()

In [None]:
# Configure the model for training.+
# We use the "sparse" version of categorical_crossentropy
# because our target data is integers.
model.compile(optimizer=tf.keras.optimizers.Adam(10e-4), loss="categorical_crossentropy",metrics=[tf.keras.metrics.MeanIoU(num_classes=23),'accuracy'])

callbacks = [
    keras.callbacks.ModelCheckpoint("model.h5", save_best_only=True)
]

# Train the model, doing validation at the end of each epoch.
epochs = 50
history=model.fit(train_dataset, epochs=epochs,steps_per_epoch=train_steps,validation_steps=valid_steps, validation_data=valid_dataset, callbacks=callbacks)


In [None]:
model = keras.models.load_model('model.h5')

In [None]:
img1 = img.reshape((1,256,256,3))

In [None]:
pred = model.predict(img1)

In [None]:
pred = np.argmax(pred,axis=3)

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(20, 10))
t = img1[0]
t1 = pred[0]
t2 = mask
axs[0].imshow(t)
axs[1].imshow(t1)
axs[2].imshow(t2)

In [None]:
plt.plot(history.history['mean_io_u'])
plt.plot(history.history['val_mean_io_u'])
plt.title('model mean_io_u')
plt.ylabel('mean_io_u')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()