In [None]:
import numpy as np
import time
import os
import matplotlib.pyplot as plt
import cv2
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow import keras
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Convolution2D, BatchNormalization, ReLU, LeakyReLU, Add, Activation
from tensorflow.keras.layers import GlobalAveragePooling2D, AveragePooling2D, UpSampling2D


In [None]:
from google.colab import drive
drive.mount('/content/drive')
print(os.listdir('/content/drive/MyDrive'))


# # Step 1: Specify the directory containing the extracted Cityscapes data
# data_dir = '/content/drive/MyDrive/dataset'  # Replace with your actual path

# # Step 2: Manually register the Cityscapes data with tfds
# # Assuming tfds version >= 4.4.0 for register_from_path function
# tfds.register_from_path('cityscapes', data_dir)


# # Step 3: Now load the dataset
# dataset = tfds.load('cityscapes', split='train')
# color_map = dataset.info.features['label'].int2str
# print(color_map)


train_data_folder = '/content/drive/MyDrive/dataset/train'
valid_data_folder = '/content/drive/MyDrive/dataset/val'
# print(os.listdir(train_data_folder))
# print(os.listdir(valid_data_folder))


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
['geckodriver', 'geckodriver-v0.34.0-linux64.tar.gz', 'gittoken.txt', 'hackathon_coep', 'final_Project', 'CN', 'SE', 'Colab Notebooks', 'dataset']


In [None]:
# Load a subset of training and validation data
from sklearn.model_selection import train_test_split


num_train_samples = 2000  #more the train samples better the model, use about 500 - 700 later
num_valid_samples = 70   # mazhe valid samples train samples peksha kami pahijet, pan 50-100 chalel

def load_image_mask_subset(path, num_samples=None):
    file_names = os.listdir(path)[:num_samples]
    image_list, mask_list = [], []
    for file_name in file_names:
        image = cv2.imread(os.path.join(path, file_name))
        image = cv2.normalize(image, None, 0, 1, cv2.NORM_MINMAX, cv2.CV_32F)
        image = image[:, :, ::-1]

        # Convert Color Space (BGR to RGB): Since OpenCV loads images in BGR format by default, this reverses the color channels to RGB format (standard in deep learning libraries like TensorFlow and PyTorch).

        image_list.append(image[:, :256])  # All image masks are in 256*256 format

        # the input image contains both the image and mask concatenated side-by-side in a single file (where the full width would be 512 pixels: 256 for the image and 256 for the mask).

        mask_list.append(np.reshape(image[:, 256:], (256 * 256 * 3)))  # All image masks are in 256*256 format


        del image
    del file_names
    return image_list, mask_list

train_images, train_masks = load_image_mask_subset(train_data_folder, num_samples=num_train_samples)
valid_images, valid_masks = load_image_mask_subset(valid_data_folder, num_samples=num_valid_samples)

train_images, test_images, train_masks, test_masks = train_test_split(train_images, train_masks, test_size=0.2, random_state=42)


In [None]:
def convolutional_block(input_tensor, filters, block_identifier):
    # Dilated convolution block
    block_name = 'block_' + str(block_identifier) + '_'
    filter1, filter2, filter3 = filters
    skip_connection = input_tensor

    # Block A
    input_tensor = Convolution2D(filters=filter1, kernel_size=(1, 1), dilation_rate=(1, 1),
                      padding='same', kernel_initializer='he_normal', name=block_name + 'a')(input_tensor)
    input_tensor = BatchNormalization(name=block_name + 'batch_norm_a')(input_tensor)
    input_tensor = LeakyReLU(alpha=0.2, name=block_name + 'leakyrelu_a')(input_tensor)

    # Block B
    input_tensor = Convolution2D(filters=filter2, kernel_size=(3, 3), dilation_rate=(2, 2),
                      padding='same', kernel_initializer='he_normal', name=block_name + 'b')(input_tensor)
    input_tensor = BatchNormalization(name=block_name + 'batch_norm_b')(input_tensor)
    input_tensor = LeakyReLU(alpha=0.2, name=block_name + 'leakyrelu_b')(input_tensor)

    # Block C
    input_tensor = Convolution2D(filters=filter3, kernel_size=(1, 1), dilation_rate=(1, 1),
                      padding='same', kernel_initializer='he_normal', name=block_name + 'c')(input_tensor)
    input_tensor = BatchNormalization(name=block_name + 'batch_norm_c')(input_tensor)

    # Skip convolutional block for residual
    skip_connection = Convolution2D(filters=filter3, kernel_size=(3, 3), padding='same', name=block_name + 'skip_conv')(skip_connection)
    skip_connection = BatchNormalization(name=block_name + 'batch_norm_skip_conv')(skip_connection)

    # Block C + Skip Convolution
    input_tensor = Add(name=block_name + 'add')([input_tensor, skip_connection])
    input_tensor = ReLU(name=block_name + 'relu')(input_tensor)
    return input_tensor



def base_convolutional_block(input_layer):
    # Base convolutional block to obtain input image feature maps

    # Base Block 1
    base_result = convolutional_block(input_layer, [32, 32, 64], '1')

    # Base Block 2
    base_result = convolutional_block(base_result, [64, 64, 128], '2')

    # Base Block 3
    base_result = convolutional_block(base_result, [128, 128, 256], '3')

    return base_result

def pyramid_pooling_module(input_layer):
    # Pyramid pooling module
    base_result = base_convolutional_block(input_layer)

    # Red Pixel Pooling
    red_result = GlobalAveragePooling2D(name='red_pool')(base_result)
    red_result = tf.keras.layers.Reshape((1, 1, 256))(red_result)
    red_result = Convolution2D(filters=64, kernel_size=(1, 1), name='red_1_by_1')(red_result)
    red_result = UpSampling2D(size=256, interpolation='bilinear', name='red_upsampling')(red_result)

    # Yellow Pixel Pooling
    yellow_result = AveragePooling2D(pool_size=(2, 2), name='yellow_pool')(base_result)
    yellow_result = Convolution2D(filters=64, kernel_size=(1, 1), name='yellow_1_by_1')(yellow_result)
    yellow_result = UpSampling2D(size=2, interpolation='bilinear', name='yellow_upsampling')(yellow_result)

    # Blue Pixel Pooling
    blue_result = AveragePooling2D(pool_size=(4, 4), name='blue_pool')(base_result)
    blue_result = Convolution2D(filters=64, kernel_size=(1, 1), name='blue_1_by_1')(blue_result)
    blue_result = UpSampling2D(size=4, interpolation='bilinear', name='blue_upsampling')(blue_result)

    # Green Pixel Pooling
    green_result = AveragePooling2D(pool_size=(8, 8), name='green_pool')(base_result)
    green_result = Convolution2D(filters=64, kernel_size=(1, 1), name='green_1_by_1')(green_result)
    green_result = UpSampling2D(size=8, interpolation='bilinear', name='green_upsampling')(green_result)

    # Final Pyramid Pooling
    return tf.keras.layers.concatenate([base_result, red_result, yellow_result, blue_result, green_result])


def pyramid_based_conv(input_layer):
    result = pyramid_pooling_module(input_layer)
    result = Convolution2D(filters=3, kernel_size=3, padding='same', name='last_conv_3_by_3')(result)
    result = BatchNormalization(name='last_conv_3_by_3_batch_norm')(result)
    result = Activation('sigmoid', name='last_conv_relu')(result)
    result = tf.keras.layers.Flatten(name='last_conv_flatten')(result)
    return result

input_layer = tf.keras.Input(shape=np.squeeze(train_images[0]).shape, name='input')
output_layer = pyramid_based_conv(input_layer)





In [None]:
def pyramid_based_conv(input_layer):
    result = pyramid_pooling_module(input_layer)
    result = Convolution2D(filters=3, kernel_size=3, padding='same', name='last_conv_3_by_3')(result)
    result = BatchNormalization(name='last_conv_3_by_3_batch_norm')(result)
    result = Activation('sigmoid', name='last_conv_relu')(result)
    result = tf.keras.layers.Flatten(name='last_conv_flatten')(result)
    return result

input_layer = tf.keras.Input(shape=np.squeeze(train_images[0]).shape, name='input')
output_layer = pyramid_based_conv(input_layer)

In [None]:
model = tf.keras.Model(inputs=input_layer, outputs=output_layer)

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate= 0.01),loss='mse')   #atta parenta 0.01 best hota
history = model.fit(np.array(train_images,dtype='float16'),np.array(train_masks,dtype='float16'),
          validation_data=(np.array(valid_images,dtype='float16'),np.array(valid_masks,dtype='float16')),
          epochs=50,steps_per_epoch=5,verbose=1,batch_size=8) # increase epochs to minimun 50 and steps per epoch to minimum 40 for better model if you have resources

Epoch 1/50

ValueError: Expected input data to be non-empty.

In [None]:
# After training the model
model.save('/content/drive/MyDrive/my_model.h5')


In [None]:
# Later, you can load the model like this
from tensorflow.keras.models import load_model
model = load_model('/content/drive/MyDrive/my_model.h5')

In [None]:
# Plot Loss vs. Accuracy
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 4))

# Plot Training vs. validation Loss
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training vs. Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()


In [None]:
# model evaluation based on comparison
def plot_imgs(img,mask,pred):
    mask = np.reshape(mask,(256,256,3))
    pred = np.reshape(pred,(256,256,3))
    fig,(ax1,ax2,ax3) = plt.subplots(1,3,figsize=(15,10))
    ax1.imshow(img)
    ax1.axis('off')
    ax2.imshow(mask)
    ax2.axis('off')
    ax3.imshow(pred)
    ax3.axis('off')
pred_masks = model.predict(np.array(valid_images,dtype='float16'))
print('-------------Input---------------Actual mask--------------Predicted mask-------')
for i in range(1):
    x = np.random.randint(0,10,size=1)[0]
    plot_imgs(valid_images[x],valid_masks[x],pred_masks[x])


In [None]:
test_data_folder = '/content/drive/MyDrive/dataset/test'


In [None]:
# def load_test_data(path, num_samples=None):
#     file_names = os.listdir(path)[:num_samples]
#     image_list, mask_list = [], []
#     for file_name in file_names:
#         image = cv2.imread(os.path.join(path, file_name))
#         image = cv2.normalize(image, None, 0, 1, cv2.NORM_MINMAX, cv2.CV_32F)
#         image = image[:, :, ::-1]

#         image_list.append(image[:, :256])  # All image masks are in 256*256 format
#         mask_list.append(np.reshape(image[:, 256:], (256 * 256 * 3)))  # All image masks are in 256*256 format

#         del image
#     return image_list, mask_list

# test_images, test_masks = load_test_data(test_data_folder, num_samples=40)  # Adjust number of test samples


In [None]:
test_pred_masks = model.predict(np.array(test_images, dtype='float16'))


In [None]:
# Test the model on test data
def plot_test_imgs(img, mask, pred):
    mask = np.reshape(mask, (256, 256, 3))
    pred = np.reshape(pred, (256, 256, 3))
    fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 10))
    ax1.imshow(img)
    ax1.axis('off')
    ax2.imshow(mask)
    ax2.axis('off')
    ax3.imshow(pred)
    ax3.axis('off')

# Visualize a random test image
for i in range(1):
    x = np.random.randint(0, len(test_images), size=1)[0]
    plot_test_imgs(test_images[x], test_masks[x], test_pred_masks[x])


In [None]:
test_loss = model.evaluate(np.array(test_images, dtype='float16'), np.array(test_masks, dtype='float16'))
print("Test Loss:", test_loss)
