In [None]:
#Importing Required Libraries and Modules
import os  # For interacting with the operating system
import tifffile  # For working with TIFF image files
import numpy as np  # For numerical computations and array operations
import pandas as pd  # For data manipulation and analysis
import tensorflow as tf  # For building and training neural networks
import warnings  # For filtering out warnings
warnings.filterwarnings('ignore')
import cv2  # For computer vision tasks and image processing
from keras.models import Model, load_model  # For creating and loading neural network models
import pickle  # For serializing and deserializing Python objects
import tensorflow.keras.backend as K  # For low-level TensorFlow operations
from matplotlib import pyplot as plt  # For creating plots and visualizations
from tqdm import tqdm_notebook  # For displaying progress bars in loops
import random  # For generating random numbers and operations
from skimage.io import imread, imshow, imread_collection, concatenate_images  # For image reading and manipulation
import h5py  # For working with HDF5 files (Hierarchical Data Format)

In [None]:
# Defining Dataset Paths
train_path = "/kaggle/input/massachusetts-roads-dataset/tiff/train"
train_mask_path = "/kaggle/input/massachusetts-roads-dataset/tiff/train_labels"
valid_path = "/kaggle/input/massachusetts-roads-dataset/tiff/val"
valid_mask_path = "/kaggle/input/massachusetts-roads-dataset/tiff/val_labels"
test_path = "/kaggle/input/massachusetts-roads-dataset/tiff/test"

In [None]:
#Reading metadata.csv file
meta = pd.read_csv("/kaggle/input/massachusetts-roads-dataset/metadata.csv")

In [None]:
#Reading label_class_dict.csv file
label = pd.read_csv('/kaggle/input/massachusetts-roads-dataset/label_class_dict.csv')

In [None]:
meta.head(2)

In [None]:
#Setting Root Directory
root_dir='/kaggle/input/massachusetts-roads-dataset'

#Displaying Random Images and Masks
for i in range(10):
  rand = random.randint(0,len(meta))
  img_path = os.path.join(root_dir, meta['tiff_image_path'][rand])
  mask_path = os.path.join(root_dir, meta['tif_label_path'][rand])  
  image = cv2.imread(img_path)
  mask = cv2.imread(mask_path)
  plt.figure(figsize=(10,10))
  plt.subplot(131)
  plt.title("image")
  plt.imshow(image)
  plt.subplot(132)
  plt.title("mask")
  plt.imshow(mask)
  plt.savefig("figure.png")

## Data Preprocessing for Images and Masks

In [None]:
#Function for saving images with a missing size of less than 10%.
def preprocess(meta, thereshold):
  complete_image_path=[]
  mask_path=[]
  for i in range(len(meta)):
    img_path = os.path.join(root_dir, meta['tiff_image_path'][i])
    msk_path = os.path.join(root_dir, meta['tif_label_path'][i])
    image = cv2.imread(img_path)
    #converting the image to grayscale
    gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

    #counting number of white pixels to size of blank part of the image
    if (np.sum(gray_image==255)/(2250000)) <thereshold:
      complete_image_path.append(img_path)
      mask_path.append(msk_path)
  #print(mask_path)
  return(complete_image_path, mask_path)

In [None]:
image_path, mask_path = preprocess(meta, 0.1)

In [None]:
print('Total number of images remaining: ', len(image_path))

In [None]:
#plotting random images after filtering the dataset
for i in range(4):
  rand = random.randint(0,len(image_path)) 
  image = cv2.imread(image_path[rand])
  mask = cv2.imread(mask_path[rand])
  plt.figure(figsize=(10,15))
  plt.subplot(131)
  plt.title("image")
  plt.imshow(image)
  plt.subplot(132)
  plt.title("mask")
  plt.imshow(mask)

In [None]:
#Cropping images of 1500x1500 t0 512X512
def crop_images(image_path, mask_path, directory, crop):
  #Creating new directory
  if os.path.exists(directory):
    print('Directory alredy exists')
  else:
    os.mkdir(directory)
    print("Directory '% s' created" % directory)

  #Creating new directory to store cropped images
  if os.path.exists(directory+'/images'):
    print('Directory alredy exists')
  else:
    os.mkdir(directory+'/images')
    print("Directory '% s'/images created" % directory)

  #Creating new directory to store cropped masks
  if os.path.exists(directory+'/lables'):
    print('Directory already exists')
  else:
    os.mkdir(directory+'/lables')
    print("Directory '% s'/lables created" % directory)

  cropped_image_paths = []
  cropped_mask_paths = []

  for i in tqdm(range(len(image_path))):
    image = cv2.imread(image_path[i])
    mask = cv2.imread(mask_path[i])
    a=0
    for j in [0,2,4]:
      for k in [0,2,4]:
        cropped_image_path = directory+'/images/'+str(a)+'_' + image_path[i].split('/')[-1]
        croppped_mask_path = directory+'/lables/'+str(a)+'_' + image_path[i].split('/')[-1]
        a+=1

        cv2.imwrite(cropped_image_path, image[crop[j]:crop[j+1], crop[k]:crop[k+1]])
        cv2.imwrite(croppped_mask_path, mask[crop[j]:crop[j+1], crop[k]:crop[k+1]])

        cropped_image_paths.append(cropped_image_path)
        cropped_mask_paths.append(croppped_mask_path)

  return(cropped_image_paths, cropped_mask_paths)

In [None]:
#cropping image size
crop = [0,512,500,1012,988,1500]

#creating new folder to save cropped images
directory = '/kaggle/working/cropped'

In [None]:
from tqdm import tqdm
cropped_images, cropped_masks = crop_images(image_path, mask_path, directory, crop)

In [None]:
#checking length of cropped images
len(cropped_masks)

In [None]:
#plotting original image with its cropped images

#Original image 1500*1500
image = cv2.imread(image_path[0])
mask = cv2.imread(mask_path[0])
plt.figure(figsize=(7,10))
plt.subplot(121)
plt.title('Original satellite image 1500*1500')
plt.imshow(image)
plt.subplot(122)
plt.title('Original mask 1500*1500')
plt.imshow(mask)

# cropped image 512*512
for i in range(9): 
  image = cv2.imread(cropped_images[i])
  mask = cv2.imread(cropped_masks[i])
  plt.figure(figsize=(5,7))
  plt.subplot(121)
  plt.title('cropped image 512*512')
  plt.imshow(image)
  plt.subplot(122)
  plt.title('cropped mask 512*512')
  plt.imshow(mask)

In [None]:
#Image Dimensions and Shape Constants.

IMG_HEIGHT = 512
IMG_WIDTH  = 512
IMG_CHANNELS = 3

input_shape = (IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)
image_shape = (IMG_HEIGHT, IMG_WIDTH)

In [None]:
#Image Preprocessing Function 
def xyz(cropped_images):
    x_batch = []

    for i in cropped_images: 
        img = cv2.imread(i)
        x_batch += [img]
        
    x_batch = np.array(x_batch) /255.

    return x_batch
    

In [None]:
def preprocess_mask_image2(image, class_num, color_limit):
  pic = np.array(image)
  img = np.zeros((pic.shape[0], pic.shape[1], 1))
  np.place(img[ :, :, 0], pic[ :, :, 0] >= color_limit, 1)
  return img

In [None]:
#Mask Preprocessing Function 
def xyz_01(cropped_masks):

    y_batch = []
    
    for i in cropped_masks: 
        mask = cv2.imread(i)
        mask = preprocess_mask_image2(mask, 2, 50)
        y_batch += [mask]

    
  
    y_batch = np.array(y_batch)
    
    return y_batch

In [None]:
images=xyz(cropped_images)
images.shape

In [None]:
masks=xyz_01(cropped_masks)
masks.shape

In [None]:
 plt.figure(figsize=(20,16))
x, y = 5,4
for i in range(y):  
    for j in range(x):
        plt.subplot(y*2, x, i*2*x+j+1)
        pos = i*120 + j*10
        plt.imshow(images[pos])
        plt.title('Sat img #{}'.format(pos))
        plt.axis('off')
        plt.subplot(y*2, x, (i*2+1)*x+j+1)
           
        #We display the associated mask we just generated above with the training image
        plt.imshow(masks[pos])
        plt.title('Mask #{}'.format(pos))
        plt.axis('off')
        
plt.show()

In [None]:
from keras import backend as K

#calculates the Intersection over Union (IoU) coefficient between two sets of binary masks
def iou_coef(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=[1,2,3])
    union = K.sum(y_true,[1,2,3])+K.sum(y_pred,[1,2,3])-intersection
    iou = K.mean((intersection + smooth) / (union + smooth), axis=0)
    return iou

#calculates the Dice coefficient between two sets of binary masks
def dice_coef(y_true, y_pred, smooth = 1):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

#Computes the soft Dice loss by subtracting the Dice coefficient from 1.
def soft_dice_loss(y_true, y_pred):
    return 1-dice_coef(y_true, y_pred)

In [None]:
print(masks.shape)
print(images.shape)

## Splitting data


In [None]:

#Splitting dataset in 80/20 ratio, where Train=80%, Test=20%
from sklearn.model_selection import train_test_split
train_images, test_images, train_masks, test_masks = train_test_split(images, masks, test_size=0.2, random_state=56)
print("TRAIN SET")
print(train_images.shape)
print(train_masks.shape)
print("TEST SET")
print(test_images.shape)
print(test_masks.shape)

In [None]:
# Print the sizes of the resulting sets
print("Training set size:", len(train_images))
#print("Validation set size:", len(valid_images))
print("Testing set size:", len(test_images))

## Define The Model

In [None]:
# Import required libraries
from keras.models import Model, load_model
import tensorflow as tf
from keras.layers import Input
from keras.layers.core import Dropout, Lambda
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.pooling import MaxPooling2D
from tensorflow.keras.layers import concatenate
from keras import optimizers
from keras.layers import BatchNormalization
from tensorflow.keras.metrics import MeanIoU
import keras

In [None]:
# Define the input shape for the model
IMAGE_HEIGHT = IMAGE_WIDTH = 512
NUM_CHANNELS = 3

In [None]:
# Define the inputs to the model
inputs = Input((IMAGE_HEIGHT, IMAGE_WIDTH, 3))
s = Lambda(lambda x: x / 255) (inputs)


# Define the U-Net architecture
# Encoder
conv1 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (inputs)
conv1 = BatchNormalization() (conv1)
conv1 = Dropout(0.1) (conv1)
conv1 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv1)
conv1 = BatchNormalization() (conv1)
pooling1 = MaxPooling2D((2, 2)) (conv1)

conv2 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (pooling1)
conv2 = BatchNormalization() (conv2)
conv2 = Dropout(0.1) (conv2)
conv2 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv2)
conv2 = BatchNormalization() (conv2)
pooling2 = MaxPooling2D((2, 2)) (conv2)

conv3 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (pooling2)
conv3 = BatchNormalization() (conv3)
conv3 = Dropout(0.2) (conv3)
conv3 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv3)
conv3 = BatchNormalization() (conv3)
pooling3 = MaxPooling2D((2, 2)) (conv3)

conv4 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (pooling3)
conv4 = BatchNormalization() (conv4)
conv4 = Dropout(0.2) (conv4)
conv4 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv4)
conv4 = BatchNormalization() (conv4)
pooling4 = MaxPooling2D(pool_size=(2, 2)) (conv4)

conv5 = Conv2D(256, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (pooling4)
conv5 = BatchNormalization() (conv5)
conv5 = Dropout(0.3) (conv5)
conv5 = Conv2D(256, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv5)
conv5 = BatchNormalization() (conv5)

# Decoder
# Upsampling and concatenation
upsample6 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same') (conv5)
upsample6 = concatenate([upsample6, conv4])

# Convolutional layers in the decoder
conv6 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (upsample6)
conv6 = BatchNormalization() (conv6)
conv6 = Dropout(0.2) (conv6)
conv6 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv6)
conv6 = BatchNormalization() (conv6)

upsample7 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same') (conv6)
upsample7 = concatenate([upsample7, conv3])
conv7 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (upsample7)
conv7 = BatchNormalization() (conv7)
conv7 = Dropout(0.2) (conv7)
conv7 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv7)
conv7 = BatchNormalization() (conv7)

upsample8 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same') (conv7)
upsample8 = concatenate([upsample8, conv2])
conv8 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (upsample8)
conv8 = BatchNormalization() (conv8)
conv8 = Dropout(0.1) (conv8)
conv8 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv8)
conv8 = BatchNormalization() (conv8)

upsample9 = Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same') (conv8)
upsample9 = concatenate([upsample9, conv1], axis=3)
conv9 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (upsample9)
conv9 = BatchNormalization() (conv9)
conv9 = Dropout(0.1) (conv9)
conv9 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (conv9)
conv9 = BatchNormalization() (conv9)

# Final convolutional layer
outputs = Conv2D(1, (1, 1), activation='sigmoid') (conv9)

# Create the model
model = Model(inputs=[inputs], outputs=[outputs])

# Print the summary of the model
model.summary()

### Hyperparameters

In [None]:
EPOCHS = 100
LEARNING_RATE = 0.0001
BATCH_SIZE = 16

### Callbacks

In [None]:
#installing required libraries
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from datetime import datetime

In [None]:
#Model Callbacks and Checkpoints Configuration
model_path = "./Models/road_segmentation_2.h5"

checkpointer = ModelCheckpoint(model_path, monitor="val_loss", mode="min", save_best_only = True, verbose=1)

earlystopper = EarlyStopping(monitor = 'val_loss', min_delta = 0, patience = 5, verbose = 1, restore_best_weights = True)

lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=4, verbose=1, epsilon=1e-4)

### Compiling the Model

In [None]:
model.compile(optimizer="adam", loss=soft_dice_loss, metrics=[iou_coef])

In [None]:
#Training the Segmentation Model
history = model.fit(train_images, train_masks, validation_split = 0.1, epochs=EPOCHS, batch_size = BATCH_SIZE, callbacks = [checkpointer, earlystopper, lr_reducer])

In [None]:
#saving model
model.save("./Models/road_segmentation_final.h5")

## Testing our Model

In [None]:
model = load_model("./Models/road_segmentation_final.h5", custom_objects={'soft_dice_loss': soft_dice_loss, 'iou_coef': iou_coef})

In [None]:
#Evaluating on test dataset
model.evaluate(test_images, test_masks)

In [None]:
#prediction
predictions = model.predict(test_images, verbose=1)

In [None]:
#Applying Prediction Threshold
thresh_val = 0.1
predicton_threshold = (predictions > thresh_val).astype(np.uint8)

In [None]:
#Comparison of Ground Truth and Predictions for Random Samples
ix = random.randint(0, len(predictions))
num_samples = 10

f = plt.figure(figsize = (15, 25))
for i in range(1, num_samples*4, 4):
  ix = random.randint(0, len(predictions))

  f.add_subplot(num_samples, 4, i)
  imshow(test_images[ix][:,:,0])
  plt.title("Image")
  plt.axis('off')

  f.add_subplot(num_samples, 4, i+1)
  imshow(np.squeeze(test_masks[ix][:,:,0]))
  plt.title("Groud Truth")
  plt.axis('off')

  f.add_subplot(num_samples, 4, i+2)
  imshow(np.squeeze(predictions[ix][:,:,0]))
  plt.title("Prediction")
  plt.axis('off')

  f.add_subplot(num_samples, 4, i+3)
  imshow(np.squeeze(predicton_threshold[ix][:,:,0]))
  plt.title("thresholded at {}".format(thresh_val))
  plt.axis('off')

plt.show()


In [None]:
# Example IoU coefficients for each epoch
iou_coefficients = history.history['iou_coef']
val_iou_coefficients = history.history['val_iou_coef']
epochs = range(1, len(iou_coefficients) + 1)

plt.figure(figsize=(8, 5))
plt.plot(epochs, iou_coefficients,val_iou_coefficients)
plt.xlabel('Epoch')
plt.ylabel('IoU Coefficient')
plt.title('IoU Coefficient vs. Epoch')
#plt.grid(True)
plt.xticks(epochs)
plt.ylim(0, 1)  # Adjust the y-axis range if needed
plt.show()


In [None]:

# Example Loss for each epoch 
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)

plt.figure(figsize=(8, 5))
plt.plot(epochs, loss,val_loss)
plt.xlabel('Epoch')
plt.ylabel('loss')
plt.title('loss vs. Epoch')
#plt.grid(True)
plt.xticks(epochs)
plt.ylim(0, 1) 
plt.show()
