# Mount Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


# Load JSON data and Images, Extract JSON data

In [None]:
import json

path_folder_annotations = '/content/drive/My Drive/annotations'
path_folder_images = '/content/drive/My Drive/JPEGImages'

# Load the JSON files with annotations
with open(path_folder_annotations+'/train2017.json', 'r') as f:
    train_data = json.load(f)

with open(path_folder_annotations+'/test2017.json', 'r') as f:
    test_data = json.load(f)

# Extract information from JSON
train_images_info = {img['id']: img for img in train_data['images']} # train_images_info[id] gives the 'images' info in the json for that id
train_annotations = train_data['annotations']

test_images_info = {img['id']: img for img in test_data['images']}
test_annotations = test_data['annotations']

# Create Mask Images and add to directory (Run this cell only if train masks and test masks not in drive)

In [None]:
import numpy as np
from PIL import Image, ImageDraw
import os

# Paths in your Google Drive
drive_base_path = '/content/drive/My Drive'
train_masks_dir = os.path.join(drive_base_path, 'train_masks')
test_masks_dir = os.path.join(drive_base_path, 'test_masks')

# Create directories if they do not exist
os.makedirs(train_masks_dir, exist_ok=True)
os.makedirs(test_masks_dir, exist_ok=True)

# Function to create mask from annotations
def create_mask(image_info, annotations):
    image_size=(800, 800)
    mask = Image.new('L', image_size, 0)
    draw = ImageDraw.Draw(mask)
    for annotation in annotations:
        if annotation['image_id'] == image_info['id']:
            for polygon in annotation['segmentation']:
                polygon = [(polygon[i], polygon[i + 1]) for i in range(0, len(polygon), 2)]
                draw.polygon(polygon, outline=1, fill=1)
    return np.array(mask)

def save_grayscale_image(matrix, filename):
    # Ensure the values are in the range 0-255
    matrix = (matrix * 255).astype(np.uint8)
    image = Image.fromarray(matrix)
    image.save(filename, 'JPEG')

# Function to process images and masks
def process_image_and_mask(image_info, annotations, images_path, masks_path):
    image_path = os.path.join(images_path, image_info['file_name'])
    mask = create_mask(image_info, annotations)
    mask_image_path = os.path.join(masks_path, f"{image_info['file_name']}")
    save_grayscale_image(mask, mask_image_path)

    return image_path, mask_image_path

# Generate paths for all images and masks in train and test sets
train_img_mask_paths = [process_image_and_mask(img_info, train_annotations, path_folder_images, train_masks_dir)
                        for img_info in train_images_info.values()]
test_img_mask_paths = [process_image_and_mask(img_info, test_annotations, path_folder_images, test_masks_dir)
                       for img_info in test_images_info.values()]

# Load Test Mask and Train Mask to Google Drive (saved now)

In [None]:
import numpy as np
from PIL import Image, ImageDraw
import os

# Paths in your Google Drive
drive_base_path = '/content/drive/My Drive'
train_masks_dir = os.path.join(drive_base_path, 'train_masks')
test_masks_dir = os.path.join(drive_base_path, 'test_masks')

# Function to process images and masks
def process_image_and_mask(image_info, annotations, images_path, masks_path):
    image_path = os.path.join(images_path, image_info['file_name'])
    mask_image_path = os.path.join(masks_path, f"{image_info['file_name']}")
    return image_path, mask_image_path

# Generate paths for all images and masks in train and test sets
train_img_mask_paths = [process_image_and_mask(img_info, train_annotations, path_folder_images, train_masks_dir)
                        for img_info in train_images_info.values()]
test_img_mask_paths = [process_image_and_mask(img_info, test_annotations, path_folder_images, test_masks_dir)
                       for img_info in test_images_info.values()]


# Create Tensorflow Datasets

In [None]:
import matplotlib.pyplot as plt
import tensorflow as tf
from PIL import ImageOps, Image
from keras.utils import load_img

# Function to load and preprocess images and masks
def load_img_masks(input_img_path, target_img_path, img_size=(800, 800)):
    input_img = tf.io.read_file(input_img_path)
    input_img = tf.image.decode_jpeg(input_img, channels=3)
    input_img = tf.image.convert_image_dtype(input_img, tf.uint8)

    target_img = tf.io.read_file(target_img_path)
    target_img = tf.image.decode_jpeg(target_img, channels=1)
    target_img = tf.image.convert_image_dtype(target_img, tf.uint8)

    # Ground truth labels are 1, 2, 3. Subtract one to make them 0, 1, 2:
    target_img -= 1
    return input_img, target_img

# TO TEST load_img_masks:
# -----------------

# def display_img_and_mask(input_img, target_img): # made this function to test
#     plt.figure(figsize=(10, 5))

#     plt.subplot(1, 2, 1)
#     plt.title("Input Image")
#     plt.imshow(input_img)
#     plt.axis('off')

#     plt.subplot(1, 2, 2)
#     plt.title("Target Mask")
#     plt.imshow(target_img)
#     plt.axis('off')

#     plt.show()

# # Load and preprocess one pair of image and mask
# input_img_path = train_img_paths[1]  # Replace with actual image path
# target_img_path = train_mask_paths[1]  # Replace with actual mask path

# input_img, target_img = load_img_masks(input_img_path, target_img_path)

# # Convert tensors to numpy arrays for display
# input_img_np = input_img.numpy()
# target_img_np = target_img.numpy()

# # Display the input image and target mask
# display_img_and_mask(input_img_np, target_img_np)

# # Actual Dispaly
# img = Image.open(input_img_path)
# display(img)
# img = ImageOps.autocontrast(load_img(target_img_path))
# display(img)

# -----------------

# Function to create a TensorFlow dataset
def create_dataset(img_target_paths, batch_size=32, img_size=(800, 800)):
    dataset = tf.data.Dataset.from_tensor_slices(img_target_paths)
    dataset = dataset.map(lambda x: load_img_masks(x[0], x[1]), num_parallel_calls=tf.data.AUTOTUNE)
    return dataset


# Create train and test datasets
train_dataset = create_dataset(train_img_mask_paths)
print("Train dataset created")
test_dataset = create_dataset(test_img_mask_paths)
print("Test dataset created")

print(train_dataset)


def preprocess(image, label):
    # Resize the image and label
    image = tf.image.resize(image, (800,800))
    label = tf.image.resize(label, (800,800))

    # Ensure the shapes are correct
    image.set_shape([800,800, 3])
    label.set_shape([800,800, 1])

    # Convert the data types
    image = tf.cast(image, tf.uint8)
    label = tf.cast(label, tf.uint8)

    return image, label

processed_train_dataset = train_dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
processed_test_dataset = test_dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)

print(processed_train_dataset)

Train dataset created
Test dataset created
<_ParallelMapDataset element_spec=(TensorSpec(shape=(None, None, 3), dtype=tf.uint8, name=None), TensorSpec(shape=(None, None, 1), dtype=tf.uint8, name=None))>
<_ParallelMapDataset element_spec=(TensorSpec(shape=(800, 800, 3), dtype=tf.uint8, name=None), TensorSpec(shape=(800, 800, 1), dtype=tf.uint8, name=None))>


# Split Dataset

In [None]:


BATCH_SIZE = 4
BUFFER_SIZE = 1000

train_batches = processed_train_dataset.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
train_batches = train_batches.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
validation_batches = test_dataset.take(3000).batch(BATCH_SIZE)
test_batches = processed_test_dataset.skip(3000).take(669).batch(BATCH_SIZE)

print(train_batches)



<_PrefetchDataset element_spec=(TensorSpec(shape=(None, 800, 800, 3), dtype=tf.uint8, name=None), TensorSpec(shape=(None, 800, 800, 1), dtype=tf.uint8, name=None))>


### Data Visualization

In [None]:
def display(display_list):
  plt.figure(figsize=(15, 15))

  title = ["Input Image", "True Mask", "Predicted Mask"]

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i+1)
    plt.title(title[i])
    plt.imshow(tf.keras.utils.array_to_img(display_list[i]))
    plt.axis("off")
  plt.show()

In [None]:
sample_batch = next(iter(test_batches))
random_index = np.random.choice(sample_batch[0].shape[0])
sample_image, sample_mask = sample_batch[0][random_index], sample_batch[1][random_index]
display([sample_image, sample_mask])


# Prepare U-Net model

In [None]:
from keras import layers
import tensorflow as tf

def double_conv_block(x, n_filters):

    # Conv2D then ReLU activation
    x = layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
    # Conv2D then ReLU activation
    x = layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)

    return x

def downsample_block(x, n_filters):
    f = double_conv_block(x, n_filters)
    p = layers.MaxPool2D(2)(f)
    p = layers.Dropout(0.2)(p) # 20 percent dropout rate

    return f, p

def upsample_block(x, conv_features, n_filters):
    # upsample
    x = layers.Conv2DTranspose(n_filters, 3, 2, padding="same")(x)
    # concatenate
    x = layers.concatenate([x, conv_features])
    # dropout
    x = layers.Dropout(0.2)(x) # 20 percent dropout rate
    # Conv2D twice with ReLU activation
    x = double_conv_block(x, n_filters)

    return x

def build_unet_model():

    # inputs
    inputs = layers.Input(shape=(800,800,3))

    p = inputs
    i = 0
    f = []

    # encoder: contracting path - downsample
    for n_filter in [64,128,256]:
      i += 1
      f_val, p = downsample_block(p, n_filter)
      f.append(f_val)

    # 5 - bottleneck
    bottleneck = double_conv_block(p, 512)

    # decoder: expanding path - upsample
    u = bottleneck
    i = 2
    for n_filter in [256,128,64]:
      u = upsample_block(u, f[i], n_filter)
      i -= 1


    # outputs
    outputs = layers.Conv2D(1, 1, padding="same", activation = "softmax")(u)

    # unet model with Keras Functional API
    unet_model = tf.keras.Model(inputs, outputs, name="U-Net")

    return unet_model

unet_model = build_unet_model()
unet_model.summary()


Model: "U-Net"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 800, 800, 3)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 800, 800, 64)         1792      ['input_1[0][0]']             
                                                                                                  
 conv2d_1 (Conv2D)           (None, 800, 800, 64)         36928     ['conv2d[0][0]']              
                                                                                                  
 max_pooling2d (MaxPooling2  (None, 400, 400, 64)         0         ['conv2d_1[0][0]']            
 D)                                                                                           

# Train and Save the Model

In [None]:
import tensorflow as tf
import cv2


unet_model.compile(optimizer=tf.keras.optimizers.Adam(),
                   loss="sparse_categorical_crossentropy",
                   metrics=[tf.keras.metrics.Precision()])
NUM_EPOCHS = 20

TRAIN_LENGTH = 3642
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE

VAL_SUBSPLITS = 5
TEST_LENTH = 1962
VALIDATION_STEPS = TEST_LENTH // BATCH_SIZE // VAL_SUBSPLITS

# # # Train the model
model_history = unet_model.fit(train_batches,
                               epochs=NUM_EPOCHS,
                               steps_per_epoch=STEPS_PER_EPOCH,
                               validation_steps=VALIDATION_STEPS,
                               validation_data=validation_batches, resume='true')
# # # Save the model
unet_model.save("unet_model.h5")

# Predict and Visualise