In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import random

from sklearn.model_selection import train_test_split
from PIL import Image

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils import Sequence
# Check GPU
device_name = tf.test.gpu_device_name()
print("GPU Device:", device_name if device_name else "Using CPU")

GPU Device: Using CPU


In [None]:
#function for save and load in python/json objects the dictionaries
def save_pickle(dic, path):
    with open(f"{path}", 'wb') as f:
        pickle.dump(dic, f, pickle.HIGHEST_PROTOCOL)

def load_pickle(path):
    with open(f"{path}", 'rb',) as f:
        return pickle.load(f)

# create a dataframe with id of the images without extensions (.jpg)
def create_df():
    name = []
    mask = []
    for dirname, _, filenames in os.walk(IMAGE_PATH): # given a directory iterates over the files
        for filename in filenames:
            f = filename.split('.')[0]
            name.append(f)

    return pd.DataFrame({'id': name}, index = np.arange(0, len(name))).sort_values('id').reset_index(drop=True)

In [12]:
# Set paths for images and masks
IMAGE_PATH = r"C:\Users\Rudra Thakar\Downloads\archive (5)\classes_dataset\classes_dataset\original_images"
MASK_PATH  = r"C:\Users\Rudra Thakar\Downloads\archive (5)\classes_dataset\classes_dataset\label_images_semantic"

X = create_df()['id'].values

In [13]:
# Train Test Split
X_train, X_val = train_test_split(X, test_size=0.25, random_state=123)
X_test, X_val = train_test_split(X_val, test_size=0.4, random_state=123) # array of indexes

print('Train Size   : ', len(X_train))
print('Val Size     : ', len(X_val))
print('Test Size    : ', len(X_test))

Train Size   :  300
Val Size     :  40
Test Size    :  60


In [14]:
class DroneDatasetTF(Sequence):

    def __init__(self, img_path, mask_path, X, transform=None, batch_size=8):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X
        self.transform = transform
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.X) / self.batch_size))

    def __getitem__(self, idx):
        batch_ids = self.X[idx * self.batch_size:(idx + 1) * self.batch_size]
        images = []
        masks = []

        for id in batch_ids:
            image = np.array(Image.open(self.img_path + id + '.png'))
            mask = np.array(Image.open(self.mask_path + id + '.png'))

            if self.transform is not None:
                aug = self.transform(image=image, mask=mask)
                image = aug['image']
                mask = aug['mask']

            norm = A.Normalize()(image=image, mask=np.expand_dims(mask, 0))

            image = norm['image'].astype(np.float32) / 255.0
            image = image.transpose(2, 0, 1)  # (C, H, W) → same layout if needed for model

            images.append(image)
            masks.append(norm['mask'].astype(np.float32))

        return np.array(images), np.array(masks)


In [16]:
import albumentations as A

train_transforms = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.GaussNoise(),
    A.GridDistortion(p=0.2),
    A.RandomBrightnessContrast(brightness_limit=0.5, contrast_limit=0.5, p=0.4)
])

valid_transforms = None  # no augmentation on validation/test

batch_size = 3

train_dataset = DroneDatasetTF(IMAGE_PATH, MASK_PATH, X_train, transform=train_transforms, batch_size=batch_size)
valid_dataset = DroneDatasetTF(IMAGE_PATH, MASK_PATH, X_val, transform=valid_transforms, batch_size=batch_size)
test_dataset = DroneDatasetTF(IMAGE_PATH, MASK_PATH, X_test, transform=valid_transforms, batch_size=batch_size)


ModuleNotFoundError: No module named 'albumentations'

Visualization

In [None]:
# lets look at some samples
image, mask = train_dataset[0]

plt.figure(figsize = (14, 10))
plt.subplot(1,2,1)
plt.imshow(np.array(image).transpose(1, 2, 0)) # for visualization we have to transpose back to HWC
plt.subplot(1,2,2)
plt.imshow(np.array(mask).squeeze())  # for visualization we have to remove 3rd dimension of mask
plt.show()

image, mask = valid_dataset[0]

plt.figure(figsize = (14, 10))
plt.subplot(1,2,1)
plt.imshow(np.array(image).transpose(1, 2, 0)) # for visualization we have to transpose back to HWC
plt.subplot(1,2,2)
plt.imshow(np.array(mask).squeeze())  # for visualization we have to remove 3rd dimension of mask
plt.show()

image, mask = test_dataset[0]

plt.figure(figsize = (14, 10))
plt.subplot(1,2,1)
plt.imshow(np.array(image).transpose(1, 2, 0)) # for visualization we have to transpose back to HWC
plt.subplot(1,2,2)
plt.imshow(np.array(mask).squeeze())  # for visualization we have to remove 3rd dimension of mask
plt.show()

In [None]:
import tensorflow as tf
from tensorflow.keras import Model

class DroneModelTF(Model):
    def __init__(self, model, criterion):
        super().__init__()
        self.model = model
        self.criterion = criterion
        self.train_iou = tf.keras.metrics.Mean(name='train_iou')
        self.val_iou = tf.keras.metrics.Mean(name='val_iou')

    def call(self, inputs, training=False):
        return self.model(inputs, training=training)

    def compute_iou(self, y_true, y_pred, num_classes=5):
        # y_true shape: (batch, h, w, 1) or (batch, h, w)
        # y_pred shape: (batch, h, w, num_classes)
        
        y_pred_labels = tf.argmax(y_pred, axis=-1)
        y_pred_labels = tf.expand_dims(y_pred_labels, axis=-1)  # shape (batch, h, w, 1)

        y_true = tf.cast(y_true, tf.int32)

        # Calculate intersection and union per class
        ious = []
        for i in range(num_classes):
            true_class = tf.equal(y_true, i)
            pred_class = tf.equal(y_pred_labels, i)
            intersection = tf.reduce_sum(tf.cast(tf.logical_and(true_class, pred_class), tf.float32))
            union = tf.reduce_sum(tf.cast(tf.logical_or(true_class, pred_class), tf.float32))
            iou = tf.cond(tf.equal(union, 0), lambda: tf.constant(1.0), lambda: intersection / union)
            ious.append(iou)
        return tf.reduce_mean(ious)

    def train_step(self, data):
        images, masks = data
        with tf.GradientTape() as tape:
            predictions = self(images, training=True)
            loss = self.criterion(masks, predictions)

        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        iou = self.compute_iou(masks, predictions)

        self.compiled_metrics.update_state(masks, predictions)
        self.train_iou.update_state(iou)

        results = {m.name: m.result() for m in self.metrics}
        results['train_iou'] = self.train_iou.result()
        results['loss'] = loss
        return results

    def test_step(self, data):
        images, masks = data
        predictions = self(images, training=False)
        loss = self.criterion(masks, predictions)

        iou = self.compute_iou(masks, predictions)

        self.compiled_metrics.update_state(masks, predictions)
        self.val_iou.update_state(iou)

        results = {m.name: m.result() for m in self.metrics}
        results['val_iou'] = self.val_iou.result()
        results['loss'] = loss
        return results

    def reset_metrics(self):
        self.train_iou.reset_states()
        self.val_iou.reset_states()


In [None]:
import segmentation_models as sm

arch = 'unet'
enc_name = 'efficientnetb0'
classes = 5
input_shape = (None, None, 3)  # Change None if fixed size (height, width)

# Create model
model = sm.Unet(
    backbone_name=enc_name,
    input_shape=input_shape,
    classes=classes,
    activation=None,  # output logits, no activation for loss with from_logits=True
    encoder_weights='imagenet'
)

# Optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

# Dice Loss (multiclass)
# segmentation_models has DiceLoss for multiclass, you can use this:
loss = sm.losses.DiceLoss(class_weights=None, from_logits=True, mode='multiclass')

# Compile model
model.compile(optimizer=optimizer, loss=loss, metrics=[sm.metrics.IOUScore(threshold=0.5)])

# ModelCheckpoint callback
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(
    filepath=f'./checkpoints_{arch}/' + arch + '_best.h5',
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    verbose=1
)


In [None]:
history = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=2,
    callbacks=[checkpoint_cb],
    verbose=1
)