In [None]:
import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'kittiroadsegmentation:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F1668350%2F2736560%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240327%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240327T140431Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D2c7687cb153e54eda187ac8454cc9616e5c0502cd4abd82c57630042740b5453469f82b35891e0590c586dc1ba96c79c9c7440af1dfe0662a1015626d2fe59bea676dc158850d82892a22d2261ec06c3c6536c68f186b95c265546e974acab6961f3c61ab024dbc63146fdfe432bb179c6c579bcbb06e45446e302d826f3dba836db2346e09ffab80462293d7f591c5fb91697efb2766d5e77717677eb9b796cb25321c22ca229a86d27974b4dc777732cfc71b328a1dbafbe1a0246c7e14289b4af599356c1874c9c889562ca57edd3650c7d02ae88223c2ec95af1b5051ef77d7c7a32f0039dd7e4092ab011c3b2a4f4dbc8fe35ac35f07432a47e94ece371'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


# Fully Convolutional Network

## Import Libraries

In [None]:
import pandas as pd
import numpy as np
import os
import random
import tensorflow as tf
import cv2
from tqdm import tqdm
import datetime
from tensorflow import keras
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Concatenate
from tensorflow.keras.layers import Input, Add, Conv2DTranspose
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.applications import VGG16
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy, MeanSquaredError, BinaryCrossentropy
from tensorflow.keras.utils import plot_model
from tensorflow.keras import callbacks

from  matplotlib import pyplot as plt
import matplotlib.image as mpimg
from IPython.display import clear_output
%matplotlib inline

from IPython.display import HTML
from base64 import b64encode

## Load Dataset

### Source Dataset

In [None]:
# Load directories
train_data_dir = "../input/kittiroadsegmentation/training/image_2/"
train_gt_dir = "../input/kittiroadsegmentation/training/gt_image_2/"

test_data_dir = "../input/kittiroadsegmentation/testing/"

In [None]:
#DATASET SPLITING
TRAINSET_SIZE = int(len(os.listdir(train_data_dir)) * 0.8)
print(f"Number of Training Examples: {TRAINSET_SIZE}")

VALIDSET_SIZE = int(len(os.listdir(train_data_dir)) * 0.1)
print(f"Number of Validation Examples: {VALIDSET_SIZE}")

TESTSET_SIZE = int(len(os.listdir(train_data_dir)) - TRAINSET_SIZE - VALIDSET_SIZE)
print(f"Number of Testing Examples: {TESTSET_SIZE}")

In [None]:
# Constant for image Processing and model training
IMG_SIZE = 128          #Size of image for processing
N_CHANNELS = 3          #Number of color channel int the image(RGB)
N_CLASSES = 1           #Number of output classes(for classification task)
SEED = 123              #Seed for random number generation

In [None]:
# Function to load an image and its corresponding segmentation mask
def parse_image(img_path: str) -> dict:
    # Read image file
    image = tf.io.read_file(img_path)
    # Decode JPEG image
    image = tf.image.decode_jpeg(image, channels=3)
    # Convert image to uint8 data type
    image = tf.image.convert_image_dtype(image, tf.uint8)

    # Define the path for the segmentation task
    mask_path = tf.strings.regex_replace(img_path, "image_2", "gt_image_2")
    mask_path = tf.strings.regex_replace(mask_path, "um_", "um_road_")
    mask_path = tf.strings.regex_replace(mask_path, "umm_", "umm_road_")
    mask_path = tf.strings.regex_replace(mask_path, "uu_", "uu_road_")
    
    # Read segmentation task 
    mask = tf.io.read_file(mask_path)
    # Decode PNG mask
    mask = tf.image.decode_png(mask, channels=3)

    # Define label colors
    non_road_label = np.array([255, 0, 0])
    road_label = np.array([255, 0, 255])
    other_road_label = np.array([0, 0, 0])

    # Convert segmentation mask to binary mask
    mask = tf.experimental.numpy.all(mask == road_label, axis = 2)
    mask = tf.cast(mask, tf.uint8)
    mask = tf.expand_dims(mask, axis=-1)
    # Return a dictionary containing the image and its segmentation mask
    return {'image': image, 'segmentation_mask': mask}

In [None]:
# Generate dataset variables
# Create a dataset containing file paths to all image files in the training data directory
all_dataset = tf.data.Dataset.list_files(train_data_dir + "*.png", seed=SEED)
# Apply the parse_image fxn to each file path to load images and their corresponding segmentation mask
all_dataset = all_dataset.map(parse_image)

# split the dataset into training, validation, and test sets
# combine the training and validation dataset
train_dataset = all_dataset.take(TRAINSET_SIZE + VALIDSET_SIZE)
# Skip the first TRAINSET_SIZE elements to obtain the validation dataset
val_dataset = train_dataset.skip(TRAINSET_SIZE)
# Take only the first TRAINSET_SIZE elements to obtain the final training dataset
train_dataset = train_dataset.take(TRAINSET_SIZE)
# Skip the training and validation data to obtain the test dataset
test_dataset = all_dataset.skip(TRAINSET_SIZE + VALIDSET_SIZE)

### Apply Transformations

In [None]:
# Tensorflow function to normalize imput images and mask to range [0,1]
@tf.function
def normalize(input_image: tf.Tensor, input_mask: tf.Tensor) -> tuple:
    input_image = tf.cast(input_image, tf.float32) / 255.0
    return input_image, input_mask

# Tensorflow function to preprocess image for training
@tf.function
def load_image_train(datapoint: dict) -> tuple:
    # Resize input image and mask to specified dimentions
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (IMG_SIZE, IMG_SIZE))
    
    # Randomly flip the image and mask horizontally with a probablity of 50%
    if tf.random.uniform(()) > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        input_mask = tf.image.flip_left_right(input_mask)

    # Normalize input image and mask
    input_image, input_mask = normalize(input_image, input_mask)

    return input_image, input_mask

# Tensorflow function to preprocess images for validation/testing
@tf.function
def load_image_test(datapoint: dict) -> tuple:
    # Resize input image and mask to specified dimentions
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (IMG_SIZE, IMG_SIZE))
    
    # Normalize input image and mask
    input_image, input_mask = normalize(input_image, input_mask)

    return input_image, input_mask

In [None]:
BATCH_SIZE = 32     #Number of samples per batch
BUFFER_SIZE = 1000  #Buffer size for shuffling dataset

# Combine datasets for training, validation and testing
dataset = {"train": train_dataset, "val": val_dataset, "test": test_dataset}

# Preprocess and configure the training dataset
# Training dataset transformation: load, shuffle, batch and prefetch
dataset['train'] = dataset['train'].map(load_image_train, num_parallel_calls=tf.data.AUTOTUNE)
dataset['train'] = dataset['train'].shuffle(buffer_size=BUFFER_SIZE, seed=SEED)
dataset['train'] = dataset['train'].batch(BATCH_SIZE)
dataset['train'] = dataset['train'].prefetch(buffer_size=tf.data.AUTOTUNE)

# Preprocess and configure the validation dataset
# Validation dataset transformations: load, batch and prefetch
dataset['val'] = dataset['val'].map(load_image_test)
dataset['val'] = dataset['val'].batch(BATCH_SIZE)
dataset['val'] = dataset['val'].prefetch(buffer_size=tf.data.AUTOTUNE)

# Preprocess and configure the testing dataset
# Testing dataset transformations: load, batch and prefetch
dataset['test'] = dataset['test'].map(load_image_test)
dataset['test'] = dataset['test'].batch(BATCH_SIZE)
dataset['test'] = dataset['test'].prefetch(buffer_size=tf.data.AUTOTUNE)

# Display information about each dataset
print(dataset['train'])
print(dataset['val'])
print(dataset['test'])

In [None]:
# Function to display sample images and mask
def display_sample(display_list):
    # Create a figure to display images and masks
    plt.figure(figsize=(18, 18))

    # Titles for different components of the display
    title = ['Input Image', 'True Mask', 'Predicted Mask']

    # Iterate over the display list 
    for i in range(len(display_list)):
        # Add a subplot for each component
        plt.subplot(1, len(display_list), i+1)
        # set title for the subplot
        plt.title(title[i])
        # Display the image/mask usng array_to_img method
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        # Turf off axis labels
        plt.axis('off')

    # Show the figure with the images and mask
    plt.show()

# Retrive a sample image and mask from the training dataset
for image, mask in dataset["train"].take(1):
    sample_image, sample_mask = image, mask
# Display the sample image and mask
display_sample([sample_image[0], sample_mask[0]])

## Define Network

In [None]:
# Load the VGG_16 network as a backbone
vgg16_model = VGG16()
# Display summary of the VGG-16 model
vgg16_model.summary()

In [None]:
# Define the input shape for the model
input_shape = (IMG_SIZE, IMG_SIZE, N_CHANNELS)

In [None]:
# Generate a new model using the VGG network as a network

# Input layer
inputs = Input(input_shape)

# Load VGG network without its top layer
vgg16_model = VGG16(include_top = False, weights = 'imagenet', input_tensor = inputs)

# Define encode layers
c1 = vgg16_model.get_layer("block3_pool").output
c2 = vgg16_model.get_layer("block4_pool").output
c3 = vgg16_model.get_layer("block5_pool").output

# Unsampling and concatenation for decoder
u1 = UpSampling2D((2, 2), interpolation = 'bilinear')(c3)
d1 = Concatenate()([u1, c2])

u2 = UpSampling2D((2, 2), interpolation = 'bilinear')(d1)
d2 = Concatenate()([u2, c1])

# Output layer
u3 = UpSampling2D((8, 8), interpolation = 'bilinear')(d2)
outputs = Conv2D(N_CLASSES, 1, activation = 'sigmoid')(u3)

# Define the model
model = Model(inputs, outputs, name = "VGG_FCN8")

## Training

### Loss Function

In [None]:
# Compile the model with Adam Optimizer, binary-cross entropy loss, and mean Intersection over Union(mIOU) metric
m_iou = tf.keras.metrics.MeanIoU(2)
model.compile(optimizer=Adam(),
              loss=BinaryCrossentropy(),
              metrics=[m_iou])

### Check Model

In [None]:
# Function to create a binary mask from network prediction
def create_mask(pred_mask: tf.Tensor) -> tf.Tensor:
    # Round predicted probablities to the nearest integer(0 or 1)
    pred_mask = tf.math.round(pred_mask)

    # Expand dimenitons to match the expected shape([IMG_SIZE], IMG_SIZE,1)
    pred_mask = tf.expand_dims(pred_mask, axis=-1)
    return pred_mask

# Function to visualize predictions
def show_predictions(dataset=None, num=1):
    if dataset:
        # Predict and display images from the input dataset
        for image, mask in dataset.take(num):
            # Generate predicted mask 
            pred_mask = model.predict(image)
            # Display input image, true mask, and predicted mask
            display_sample([image[0], true_mask, create_mask(pred_mask)])
    else:
        # Predict anddisplay a sample image
        inference = model.predict(sample_image)
        # Display sample input image, true mask, and predicted mask
        display_sample([sample_image[0], sample_mask[0],
                        inference[0]])

# Retrive a sample image and mask from the training dataset
for image, mask in dataset['train'].take(1):
    sample_image, sample_mask = image, mask

# show predictions based on the sample image
show_predictions()

### Train Model

In [None]:
# Custom callback for displaying sample predictions at the end of each epoch
class DisplayCallback(callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        # Clear the previous output
        clear_output(wait=True)
        # Show sample predictions
        show_predictions()
        # Print a mssg indicating sample predictions after the current path
        print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

# Define the log directory for TensorBoard
logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [None]:
# list of callback for training the model
callbacks = [
    # Custom callback to display sample predictions at the end of each epoch
    DisplayCallback(),
    # TensorBoard callback for logging training metrics and visualizations
    callbacks.TensorBoard(logdir, histogram_freq = -1),
    # Early stopping callback to prevent overfitting by stopping training when the validation loss stops improving
    callbacks.EarlyStopping(patience = 20, verbose = 1),
    # # Model checkpoint callback to save the best model based on validation loss
    callbacks.ModelCheckpoint('best_model.h5', verbose = 1, save_best_only = True)
]

In [None]:
# Set Variables
EPOCHS = 100
STEPS_PER_EPOCH = TRAINSET_SIZE // BATCH_SIZE
VALIDATION_STEPS = VALIDSET_SIZE // BATCH_SIZE
print(STEPS_PER_EPOCH)

In [None]:
print(STEPS_PER_EPOCH)

In [None]:
model_history = model.fit(dataset['train'], epochs=EPOCHS,
                          validation_data = dataset["val"],
                          callbacks = callbacks)

## Testing (Test Dataset)

In [None]:
# Function to calculate mask over image
def weighted_img(img, initial_img, α=1., β=0.5, γ=0.):
    return cv2.addWeighted(initial_img, α, img, β, γ)

# Function to process an individual image and it's mask
def process_image_mask(image, mask):
    # Round to closest
    mask = tf.math.round(mask)

    # Convert to mask image
    zero_image = np.zeros_like(mask)
    mask = np.dstack((mask, zero_image, zero_image))
    mask = np.asarray(mask, np.float32)

    # Convert to image image
    image = np.asarray(image, np.float32)

    # Get the final image
    final_image = weighted_img(mask, image)

    return final_image

In [None]:
# Function to save predictions
def save_predictions(dataset):
    # Predict and save image the from input dataset
    index = 0
    for batch_image, batch_mask in dataset:
        for image, mask in zip(batch_image, batch_mask):
            print(f"Processing image : {index}")
            pred_mask = model.predict(tf.expand_dims(image, axis = 0))
            save_sample([image, process_image_mask(image, pred_mask[0])], index)
            index += 1

# Function to save the images as a plot
def save_sample(display_list, index):
    plt.figure(figsize=(18, 18))

    title = ['Input Image', 'Predicted Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')

    plt.savefig(f"outputs/{index}.png")
    plt.show()

In [None]:
# os.mkdir("outputs")
save_predictions(dataset['test'])

## Testing (Videos)

In [None]:
# Function to view video
def play(filename):
    html = ''
    video = open(filename,'rb').read()
    src = 'data:video/mp4;base64,' + b64encode(video).decode()
    html += '<video width=1000 controls autoplay loop><source src="%s" type="video/mp4"></video>' % src
    return HTML(html)

In [None]:
# Function to process an individual image
def process_image(image):
    # Preprocess image
    image = cv2.resize(image, (IMG_SIZE, IMG_SIZE))
    # Get the binary mask
    pred_mask = model.predict(np.expand_dims(image, axis = 0))
    mask = np.round_(pred_mask[0])

    # Convert to mask image
    zero_image = np.zeros_like(mask)
    mask = np.dstack((mask, zero_image, zero_image)) * 255
    mask = np.asarray(mask, np.uint8)

    # Get the final image
    final_image = weighted_img(mask, image)
    final_image = cv2.resize(final_image, (1280, 720))

    return final_image

In [None]:
# Make a new directory
os.mkdir("videos")

### Project Video

In [None]:
# Creating a VideoCapture object to read the video
project_video = "project_video.mp4"
original_video = cv2.VideoCapture(test_data_dir + project_video)
frame_width = int(original_video.get(3))
frame_height = int(original_video.get(4))

# Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
fps = 60
output = cv2.VideoWriter("videos/" + project_video, fourcc, fps, (frame_width,frame_height))

# Process Video
while(original_video.isOpened()):
    ret, frame = original_video.read()

    if ret == True:
        # Write the frame into the file 'output.avi'
        output.write(process_image(frame))

    else:
        break

# When everything done, release the video capture and video write objects
original_video.release()
output.release()

In [None]:
play("videos/" + project_video)

### Challenge Video

In [None]:
# Creating a VideoCapture object to read the video
project_video = "challenge.mp4"
original_video = cv2.VideoCapture(test_data_dir + project_video)
frame_width = int(original_video.get(3))
frame_height = int(original_video.get(4))

# Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
fps = 60
output = cv2.VideoWriter("videos/" + project_video, fourcc, fps, (frame_width,frame_height))

# Process Video
while(original_video.isOpened()):
    ret, frame = original_video.read()

    if ret == True:
        # Write the frame into the file 'output.avi'
        output.write(process_image(frame))

    else:
        break

# When everything done, release the video capture and video write objects
original_video.release()
output.release()

In [None]:
play("videos/" + project_video)

### Challenge Video 2

In [None]:
# Creating a VideoCapture object to read the video
project_video = "challenge_video.mp4"
original_video = cv2.VideoCapture(test_data_dir + project_video)
frame_width = int(original_video.get(3))
frame_height = int(original_video.get(4))

# Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
fps = 60
output = cv2.VideoWriter("videos/" + project_video, fourcc, fps, (frame_width,frame_height))

# Process Video
while(original_video.isOpened()):
    ret, frame = original_video.read()

    if ret == True:
        # Write the frame into the file 'output.avi'
        output.write(process_image(frame))

    else:
        break

# When everything done, release the video capture and video write objects
original_video.release()
output.release()

In [None]:
play("videos/" + project_video)

### Harder Challenge Video

In [None]:
# Creating a VideoCapture object to read the video
project_video = "harder_challenge_video.mp4"
original_video = cv2.VideoCapture(test_data_dir + project_video)
frame_width = int(original_video.get(3))
frame_height = int(original_video.get(4))

# Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
fps = 60
output = cv2.VideoWriter("videos/" + project_video, fourcc, fps, (frame_width,frame_height))


while(original_video.isOpened()):
    ret, frame = original_video.read()

    if ret == True:
        # Write the frame into the file 'output.avi'
        output.write(process_image(frame))

    else:
        break

# When everything done, release the video capture and video write objects
original_video.release()
output.release()

In [None]:
play("videos/" + project_video)

## References

- [Kitti Dataset Processing](http://ronny.rest/blog/post_2017_09_06_kitti_road_data/)
- [Image Segmentation on Keras](https://yann-leguilly.gitlab.io/post/2019-12-14-tensorflow-tfdata-segmentation/)