In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -U segmentation-models

In [None]:
!pip install keras==2.3.1

In [None]:
!pip install tensorflow --upgrade

In [None]:
!pip install keras

In [None]:
import os
os.environ["SM_FRAMEWORK"] = "tf.keras"

from tensorflow import keras
import segmentation_models as sm

In [None]:
!pip install -U albumentations>=0.3.0 --user
!pip install -U --pre segmentation-models --user

In [None]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

import os
import cv2
import keras
import numpy as np
import matplotlib.pyplot as plt
import segmentation_models as sm
import albumentations as A

# Data directory on Google Drive
DATA_DIR = '/content/drive/My Drive/'

# Update paths for training, validation, and test sets
x_train_dir = os.path.join(DATA_DIR, 'train/images')
y_train_dir = os.path.join(DATA_DIR, 'train/masks')

x_valid_dir = os.path.join(DATA_DIR, 'val/images')
y_valid_dir = os.path.join(DATA_DIR, 'val/masks')

x_test_dir = os.path.join(DATA_DIR, 'test/images')
y_test_dir = os.path.join(DATA_DIR, 'test/masks')


# Our helper function used for visualizing images
def visualize(**images):
    """Plot images in one row."""
    n = len(images)
    plt.figure(figsize=(16, 5)) #A plot figure is created using the Matplotlib library. The figsize parameter determines the drawing size.
    for i, (name, image) in enumerate(images.items()):#Each image is shown in a separate subplot.
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()

# Dataset class
# It is used to process the dataset and provide the necessary data samples for training or evaluation of the model.
class Dataset:
    def __init__(self, images_dir, masks_dir, classes=None, augmentation=None, preprocessing=None):
        self.ids = os.listdir(images_dir)  #ids: List containing the names of image and mask files.
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids] #images_fps: List containing the full path of image files.
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids] #masks_fps: List containing the full path to mask files.
        self.class_values = {cls.lower(): idx for idx, cls in enumerate(classes)} #A dictionary where classes are named with lowercase letters and paired with indices.
        self.augmentation = augmentation
        self.preprocessing = preprocessing

    def __getitem__(self, i): #This method returns a specific element from the dataset. The i parameter represents the index of the item to retrieve.
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (512, 512))

        mask = np.zeros((image.shape[0], image.shape[1], len(self.class_values)), dtype=np.uint8)

        for cls, idx in self.class_values.items():
            if cls.lower() == 'minor':
                cls = 'no damage'
            elif cls.lower() == 'major':
                cls = 'damaged'

            cls_mask = cv2.imread(self.masks_fps[i], cv2.IMREAD_COLOR)
            cls_mask = cv2.resize(cls_mask, (512, 512))
            mask[..., idx] = (cls_mask == np.array(eval(cls))[None, None, :]).all(axis=-1).astype('uint8')

        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        return image, mask.astype('float32')

    def __len__(self): #This method returns the total number of elements of the dataset. This method runs when len(dataset) is called.
        return len(self.ids)

# Dataloader
# Dataloader class is used to retrieve mini-batches from the dataset and organize these mini-groups during training or evaluation of the model.

class Dataloader(keras.utils.Sequence):
    def __init__(self, dataset, batch_size=1, shuffle=False):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indexes = np.arange(len(dataset))

        self.on_epoch_end()

    def __getitem__(self, i):
        start = i * self.batch_size
        stop = (i + 1) * self.batch_size
        data = [self.dataset[j] for j in range(start, stop)]

        images = [item[0] for item in data]
        masks = [item[1] for item in data]

        batch = [np.stack(images, axis=0), np.stack(masks, axis=0)]
        return batch

    def __len__(self):
        return len(self.indexes) // self.batch_size

    def on_epoch_end(self):
        if self.shuffle:
            self.indexes = np.random.permutation(self.indexes)

# Input dimensions
INPUT_HEIGHT = 512
INPUT_WIDTH = 512
CHANNELS = 3 #It is stated that since the images are in color, each pixel contains 3 color channels (RGB).

# Specify data preprocessing functions
def get_preprocessing(preprocessing_fn=None):
    if preprocessing_fn:
        _transform = [
            A.Lambda(image=preprocessing_fn),
        ]
    else:
        _transform = []

    return A.Compose(_transform)

# Specify augmentation functions for training
def get_training_augmentation():
    train_transform = [
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(scale_limit=0.5, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0),
        A.PadIfNeeded(min_height=INPUT_HEIGHT, min_width=INPUT_WIDTH, always_apply=True, border_mode=0),
        A.RandomCrop(height=INPUT_HEIGHT, width=INPUT_WIDTH, always_apply=True),
        A.IAAAdditiveGaussianNoise(p=0.2),
        A.IAAPerspective(p=0.5),
        A.OneOf(
            [
                A.CLAHE(p=1),
                A.RandomBrightness(p=1),
                A.RandomGamma(p=1),
            ],
            p=0.9,
        ),
        A.OneOf(
            [
                A.IAASharpen(p=1),
                A.Blur(blur_limit=3, p=1),
                A.MotionBlur(blur_limit=3, p=1),
            ],
            p=0.9,
        ),
        A.OneOf(
            [
                A.RandomContrast(p=1),
                A.HueSaturationValue(p=1),
            ],
            p=0.9,
        ),
    ]
    return A.Compose(train_transform)

# Create an instance of the training dataset
damage_classes = ['(0, 255, 0)', '(0, 255, 255)', '(255, 0, 0)', '(0, 0, 255)']
train_dataset = Dataset(x_train_dir, y_train_dir, classes=damage_classes, augmentation=get_training_augmentation(),
                        preprocessing=get_preprocessing())train_dataset

# Create an instance of the validation dataset
valid_dataset = Dataset(x_valid_dir, y_valid_dir, classes=damage_classes, augmentation=get_training_augmentation(),
                        preprocessing=get_preprocessing())

# Define UNet model
BACKBONE = 'efficientnetb3'
CLASSES = ['No Damage', 'Minor', 'Major', 'Damaged']
LR = 0.0001
EPOCHS = 100
BATCH_SIZE = 4

# Create the UNet model
model = sm.Unet(BACKBONE, classes=len(CLASSES), activation='softmax', input_shape=(INPUT_HEIGHT, INPUT_WIDTH, CHANNELS))

# Define optimizer and loss functions
optim = keras.optimizers.Adam(LR)

# Loss functions
dice_loss = sm.losses.DiceLoss()
focal_loss = sm.losses.CategoricalFocalLoss()
total_loss = dice_loss + (1 * focal_loss)

# Metrics
metrics = [sm.metrics.IOUScore(name='iou_score', threshold=0.5), sm.metrics.FScore(name='f1-score', threshold=0.5)]

# Compile the model
model.compile(optim, total_loss, metrics)

# Create an instance of the training dataset without augmentation for DataLoader NO AUGMENTATION
#train_dataset = Dataset(x_train_dir, y_train_dir, classes=damage_classes, augmentation=None,
#                        preprocessing=get_preprocessing())

# Create an instance of the validation dataset without augmentation for DataLoader NO AUGMENTATION
#valid_dataset = Dataset(x_valid_dir, y_valid_dir, classes=damage_classes, augmentation=None,
#                        preprocessing=get_preprocessing())

# Dataloaders without augmentation
train_dataloader = Dataloader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_dataloader = Dataloader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Model Checkpoint, save the best weights to Google Drive
model_checkpoint = keras.callbacks.ModelCheckpoint(
    '/content/drive/My Drive/best_model.h5',
    save_weights_only=True,
    save_best_only=True,
    monitor='val_iou_score',
    mode='max',
)

# Learning Rate Reduction
lr_reduce = keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.1,
    patience=5,
    min_lr=0.00001,
    verbose=1,
)

# Train the model
history = model.fit(
    train_dataloader,
    steps_per_epoch=len(train_dataloader),
    epochs=EPOCHS,
    callbacks=[model_checkpoint, lr_reduce],
    validation_data=valid_dataloader,
    validation_steps=len(valid_dataloader),
)

# Create an instance of the test dataset
test_dataset = Dataset(
    x_test_dir,
    y_test_dir,
    classes=damage_classes,
    augmentation=get_training_augmentation(),  # Use training augmentation for testing as well
    preprocessing=get_preprocessing()
)

# Create test dataloader
test_dataloader = Dataloader(test_dataset, batch_size=1, shuffle=False)

# Load the best weights
model.load_weights('/content/drive/My Drive/best_model.h5')

# Evaluate the model
scores = model.evaluate(test_dataloader)



In [None]:
# Plot training & validation iou_score values
plt.figure(figsize=(30, 5))
plt.subplot(121)
if 'iou_score' in history.history:
    plt.plot(history.history['iou_score'])
if 'val_iou_score' in history.history:
    plt.plot(history.history['val_iou_score'])
plt.title('Model IOU Score')
plt.ylabel('IOU Score')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper left')

# Plot training & validation loss values
plt.subplot(122)
if 'loss' in history.history:
    plt.plot(history.history['loss'])
if 'val_loss' in history.history:
    plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper left')
plt.show()


In [None]:
# Create an instance of the test dataset
test_dataset = Dataset(
    x_test_dir,
    y_test_dir,
    classes=damage_classes,
    augmentation=None,  # Use training augmentation for testing as well
    preprocessing=get_preprocessing()
)
# Number of random samples to visualize
n = 20
# Randomly select 'n' indices from the test dataset
ids = np.random.choice(np.arange(len(test_dataset)), size=n)

for i in ids:
    image, gt_mask = test_dataset[i]
    # Resize the image to the expected size of the model from the test dataset
    image = cv2.resize(image, (512, 512))
    # Add an extra dimension to match the model's input shape
    image = np.expand_dims(image, axis=0)
    # Predict the mask using the trained model
    pr_mask = model.predict(image)

    # Resize the ground truth mask
    gt_mask_resized = cv2.resize(gt_mask, (512, 512))

    # Specify class titles
    titles = ['No Damage', 'Minor', 'Major', 'Damaged']

    # Visualize for each class, skipping 'Minor' and 'Major'
    for class_idx in range(pr_mask.shape[-1]):
        if titles[class_idx] in ['Minor', 'Major']:
            continue  # Skip visualization for 'Minor' and 'Major' classes

        # Show the original image
        plt.figure(figsize=(16, 5))
        plt.subplot(1, 3, 1)
        plt.imshow(image.squeeze())
        plt.title('Original Image')

        # Show the ground truth mask
        plt.subplot(1, 3, 2)
        plt.imshow(gt_mask_resized[..., class_idx].squeeze(), cmap='gray')
        plt.title(f'Ground Truth {titles[class_idx]} Mask')

        # Show the predicted mask
        pr_mask_bw = (pr_mask[..., class_idx] > 0.5).astype(np.uint8)
        pr_mask_resized = cv2.resize(pr_mask_bw.squeeze(), (512, 512))
        plt.subplot(1, 3, 3)
        plt.imshow(pr_mask_resized, cmap='gray')
        plt.title(f'Predicted {titles[class_idx]} Mask (BW)')

        plt.show()


In [None]:
# Specify the directory to save the model
model_save_path = '/content/drive/MyDrive/model_v15/model2.h5'

# Save the model
model.save(model_save_path)