In [None]:
!pip install gdown >> /dev/null

# Existing Data (already prepared)

---



## 256x256

In [None]:
# !gdown 1UUCGWRSMxGNJn5AmwWd3EavpT39Qzxau # 600 peer samples
!gdown 1AqvKd52QWZt8abG9oZoGYDc6XlfPueCP # 1000 peer samples train
!gdown 1nF5y47OrLXnfq0lyXmwyU0IAQEdv5ZWu # 150 peer samples

object address  : 0x7c17d312bb80
object refcount : 2
object type     : 0x9d5ea0
object type name: KeyboardInterrupt
object repr     : KeyboardInterrupt()
lost sys.stderr
^C
Downloading...
From (original): https://drive.google.com/uc?id=1nF5y47OrLXnfq0lyXmwyU0IAQEdv5ZWu
From (redirected): https://drive.google.com/uc?id=1nF5y47OrLXnfq0lyXmwyU0IAQEdv5ZWu&confirm=t&uuid=574b030c-ab5c-43f1-983b-96b3d936580b
To: /content/val.zip
100% 186M/186M [00:04<00:00, 44.9MB/s]


In [None]:
!unzip train >> /dev/null
!unzip val >> /dev/null

In [None]:
!git clone https://github.com/ucef-b/agri-data.git
!cp -r agri-data/* .

fatal: destination path 'agri-data' already exists and is not an empty directory.


# Train the models

## Train on 256x256 (Resized from 512x512 Patches)

In [None]:

import tensorflow as tf

gpu_available = tf.config.list_physical_devices('GPU')

if gpu_available:
  print("GPU is available")
else:
  print("GPU not available")

GPU is available


In [None]:
import numpy as np
from pathlib import Path
from typing import Tuple, Dict, List, Optional, Union
import random
from keras import layers, Model
import tensorflow.keras.applications as keras_applications

Test load

In [None]:
from DataLoader import DatasetLoader

train_datasets = DatasetLoader(
    working_path="val",
    batch_size=16,
    export_type="NDVI",
    outputs_type="both",
    shuffle=True
)

for x_batch, y_batch in train_datasets:
    print("Input batch shape:", x_batch.shape)
    print("Segmentation output batch shape:", y_batch['segmentation_output'].shape)
    print("Classification output batch shape:", y_batch['classification_output'].shape)
    break

Input batch shape: (16, 512, 512, 1)
Segmentation output batch shape: (16, 512, 512, 1)
Classification output batch shape: (16, 6)


Generate datasets for training

In [None]:
def create_datasets(working_path: str, batch_size: int, export_type: str, input_shape) -> Tuple[tf.data.Dataset, tf.data.Dataset]:

    num_classes = 6
    if input_shape is None:
            # export_type
            segmentation_output_shape = (256, 256, 1)
            if export_type == "NDVI":
                input_shape = (256, 256, 1)
            elif export_type == "RGB":
                input_shape = (256, 256, 3)
            elif export_type == "RGBN":
                input_shape = (256, 256, 4)

            else:
                raise ValueError(f"Unsupported export_type: {export_type}. Choose from 'NDVI', 'RGB', 'RGBN'.")
    else :
        segmentation_output_shape = (input_shape[0], input_shape[1], 1)

    train_loader = DatasetLoader(
        working_path=f"{working_path}/train",
        batch_size=batch_size,
        export_type=export_type,
        outputs_type="both",
        shuffle=True
    )

    val_loader = DatasetLoader(
        working_path=f"{working_path}/val",
        batch_size=batch_size,
        export_type=export_type,
        outputs_type="both",
        shuffle=False
    )

    def train_generator_fn():
        for batch_x, batch_y in train_loader:
            # scale pixel values to [0, 1]
            yield batch_x.astype(np.float32) / 255.0, batch_y

    def val_generator_fn():
        for batch_x, batch_y in val_loader:
             # scale pixel values to [0, 1]
            yield batch_x.astype(np.float32) / 255.0, batch_y

    output_signature = (
        tf.TensorSpec((None, *input_shape), tf.float32),
        {
            'segmentation_output':    tf.TensorSpec((None, *segmentation_output_shape), tf.float32),
            'classification_output':  tf.TensorSpec((None, num_classes), tf.float32),
        }
    )

    train_dataset = tf.data.Dataset.from_generator(
        train_generator_fn,
        output_signature=output_signature,
    ).prefetch(tf.data.AUTOTUNE)

    val_dataset = tf.data.Dataset.from_generator(
        val_generator_fn,
        output_signature=output_signature,
    ).prefetch(tf.data.AUTOTUNE)

    return train_dataset, val_dataset

train_dataset, val_dataset = create_datasets(working_path=".", batch_size=32, export_type="NDVI", input_shape=(256, 256, 1))


Define loss

In [None]:
def binary_dice_coefficient(y_true, y_pred, smooth=1e-6):
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)

def binary_dice_loss(y_true, y_pred):

    return 1 - binary_dice_coefficient(y_true, y_pred)

def weighted_binary_crossentropy(pos_weight=1.0):
    def loss(y_true, y_pred):
        y_pred = tf.clip_by_value(y_pred, 1e-7, 1 - 1e-7)

        pos_loss = pos_weight * y_true * tf.math.log(y_pred)
        neg_loss = (1 - y_true) * tf.math.log(1 - y_pred)

        return -tf.reduce_mean(pos_loss + neg_loss)
    return loss
def combined_segmentation_loss(pos_weight=2.0):

    bce = weighted_binary_crossentropy(pos_weight)

    def loss(y_true, y_pred):
        return bce(y_true, y_pred) + binary_dice_loss(y_true, y_pred)

    return loss

Multitask model (lightweight)

In [None]:
def separable_residual_block(input_tensor, num_filters, kernel_size=3):
    """Lightweight residual block using separable convolutions"""
    x = layers.SeparableConv2D(num_filters, (kernel_size, kernel_size), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.SeparableConv2D(num_filters, (kernel_size, kernel_size), padding='same')(x)
    x = layers.BatchNormalization()(x)

    # Skip connection with 1x1 conv if needed
    if input_tensor.shape[-1] != num_filters:
        input_tensor = layers.Conv2D(num_filters, (1, 1))(input_tensor)

    x = layers.add([x, input_tensor])
    x = layers.Activation('relu')(x)
    return x

def lite_encoder_block(input_tensor, num_filters, dropout_rate=0.1):
    """Lightweight encoder block"""
    x = layers.SeparableConv2D(num_filters, (3, 3), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    if dropout_rate > 0:
        x = layers.SpatialDropout2D(dropout_rate)(x)

    p = layers.MaxPooling2D((2, 2))(x)
    return x, p

def lite_decoder_block(input_tensor, skip_features, num_filters, dropout_rate=0.1):
    """Lightweight decoder block"""
    x = layers.Conv2DTranspose(num_filters, (3, 3), strides=(2, 2), padding='same')(input_tensor)

    # Ensure compatible shapes for concatenation
    if x.shape[1] != skip_features.shape[1] or x.shape[2] != skip_features.shape[2]:
        x = layers.Resizing(skip_features.shape[1], skip_features.shape[2])(x)

    # Concatenate skip features
    x = layers.concatenate([x, skip_features], axis=-1)

    # Reduce parameter count with separable convolution
    x = layers.SeparableConv2D(num_filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    if dropout_rate > 0:
        x = layers.SpatialDropout2D(dropout_rate)(x)
    return x

def build_optimized_multi_task_unet(
    input_shape,
    num_classes,
    filters_base=24,
    dropout_rate=0.1,
    final_activation='sigmoid'
):
    # Input layer
    inputs = layers.Input(shape=input_shape)

    # Initial convolution - using standard conv for better feature extraction at start
    x = layers.Conv2D(filters_base, (3, 3), padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    # Encoder Path - with reduced parameters
    s1, p1 = lite_encoder_block(x, filters_base, dropout_rate)
    s2, p2 = lite_encoder_block(p1, filters_base*2, dropout_rate)
    s3, p3 = lite_encoder_block(p2, filters_base*4, dropout_rate)
    s4, p4 = lite_encoder_block(p3, filters_base*8, dropout_rate)

    # Bottleneck - use separable residual block for performance
    bottleneck = separable_residual_block(p4, filters_base*16)
    bottleneck = layers.SpatialDropout2D(dropout_rate*2)(bottleneck)

    # Decoder Path for Segmentation
    d1 = lite_decoder_block(bottleneck, s4, filters_base*8, dropout_rate)
    d2 = lite_decoder_block(d1, s3, filters_base*4, dropout_rate)
    d3 = lite_decoder_block(d2, s2, filters_base*2, dropout_rate)
    d4 = lite_decoder_block(d3, s1, filters_base, dropout_rate)

    # Segmentation Output
    segmentation_output = layers.Conv2D(1, (1, 1))(d4)
    segmentation_output = layers.Activation(final_activation, name='segmentation_output')(segmentation_output)

    avg_pool = layers.GlobalAveragePooling2D()(bottleneck)
    max_pool = layers.GlobalMaxPooling2D()(bottleneck)
    pooled_features = layers.concatenate([avg_pool, max_pool])

    # Streamlined classification head
    classification_branch = layers.Dense(128)(pooled_features)
    classification_branch = layers.BatchNormalization()(classification_branch)
    classification_branch = layers.Activation('relu')(classification_branch)
    classification_branch = layers.Dropout(0.2)(classification_branch)

    # Classification Output
    classification_output = layers.Dense(num_classes, activation='softmax', name='classification_output')(classification_branch)

    # Create model with multiple outputs
    model = Model(inputs=inputs, outputs=[segmentation_output, classification_output], name='Optimized_MultiTask_UNet')

    return model

def compile_model(model):
    """Compile the model with appropriate losses and metrics"""
    losses = {
        'segmentation_output': 'binary_crossentropy',
        'classification_output': 'binary_crossentropy'
    }

    loss_weights = {
        'segmentation_output': 1.0,
        'classification_output': 0.5
    }

    metrics = {
        'segmentation_output': ['accuracy', binary_dice_coefficient ],
        'classification_output': ['accuracy']
    }

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=losses,
        loss_weights=loss_weights,
        metrics=metrics
    )

    return model

In [None]:
train_dataset, val_dataset = create_datasets(working_path=".", batch_size=32, export_type="RGB")
input_shape = (256, 256, 3) # ndvi rgb rgbn
num_classes = 6



optimized_model = build_optimized_multi_task_unet(input_shape, num_classes)
optimized_model = compile_model(optimized_model)
optimized_model.summary()

In [None]:
h = optimized_model.fit(train_dataset, epochs=20)

Epoch 1/20
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m134s[0m 347ms/step - classification_output_accuracy: 0.2163 - classification_output_loss: 0.5734 - loss: 0.9181 - segmentation_output_accuracy: 0.6599 - segmentation_output_binary_dice_coefficient: 0.2869 - segmentation_output_loss: 0.6314
Epoch 2/20
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 265ms/step - classification_output_accuracy: 0.4513 - classification_output_loss: 0.3814 - loss: 0.7310 - segmentation_output_accuracy: 0.7596 - segmentation_output_binary_dice_coefficient: 0.2650 - segmentation_output_loss: 0.5403
Epoch 3/20
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 265ms/step - classification_output_accuracy: 0.5327 - classification_output_loss: 0.3343 - loss: 0.6830 - segmentation_output_accuracy: 0.7706 - segmentation_output_binary_dice_coefficient: 0.2775 - segmentation_output_loss: 0.5159
Epoch 4/20
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1

In [None]:
h1 = optimized_model.fit(train_dataset, epochs=20)

In [None]:
optimized_model.evaluate(val_dataset)

[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 431ms/step - classification_output_accuracy: 0.6541 - classification_output_loss: 0.2634 - loss: 0.6572 - segmentation_output_accuracy: 0.7480 - segmentation_output_binary_dice_coefficient: 0.3725 - segmentation_output_loss: 0.5253




[0.6339068412780762,
 0.4951084554195404,
 0.2711845338344574,
 0.6313363909721375,
 0.7683383226394653,
 0.3487536311149597]

In [None]:

def separable_residual_block(input_tensor, num_filters, kernel_size=3):
    """Lightweight residual block using separable convolutions"""
    x = layers.SeparableConv2D(num_filters, (kernel_size, kernel_size), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.SeparableConv2D(num_filters, (kernel_size, kernel_size), padding='same')(x)
    x = layers.BatchNormalization()(x)

    # Skip connection with 1x1 conv if needed
    if input_tensor.shape[-1] != num_filters:
        input_tensor = layers.Conv2D(num_filters, (1, 1))(input_tensor)

    x = layers.add([x, input_tensor])
    x = layers.Activation('relu')(x)
    return x

def lite_encoder_block(input_tensor, num_filters, dropout_rate=0.1):
    """Lightweight encoder block"""
    x = layers.SeparableConv2D(num_filters, (3, 3), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    if dropout_rate > 0:
        x = layers.SpatialDropout2D(dropout_rate)(x)

    p = layers.MaxPooling2D((2, 2))(x)
    return x, p

def lite_decoder_block(input_tensor, skip_features, num_filters, dropout_rate=0.1):
    """Lightweight decoder block"""
    x = layers.Conv2DTranspose(num_filters, (3, 3), strides=(2, 2), padding='same')(input_tensor)

    # Ensure compatible shapes for concatenation
    if x.shape[1] != skip_features.shape[1] or x.shape[2] != skip_features.shape[2]:
        x = layers.Resizing(skip_features.shape[1], skip_features.shape[2])(x)

    # Concatenate skip features
    x = layers.concatenate([x, skip_features], axis=-1)

    # Reduce parameter count with separable convolution
    x = layers.SeparableConv2D(num_filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    if dropout_rate > 0:
        x = layers.SpatialDropout2D(dropout_rate)(x)
    return x

def binary_dice_coefficient(y_true, y_pred):
    """Binary Dice coefficient metric"""
    smooth = 1.0
    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth)


def get_backbone(backbone_name, input_tensor, weights='imagenet', trainable=False):
    backbone_dict = {
        'mobilenetv2': {
            'model': keras_applications.MobileNetV2,
            'skip_layers': ['block_13_expand_relu', 'block_6_expand_relu',
                           'block_3_expand_relu', 'block_1_expand_relu'] # Ordered from deeper to shallower
        }
    }

    if backbone_name.lower() not in backbone_dict:
        raise ValueError(f"Backbone {backbone_name} not supported. Choose from: {list(backbone_dict.keys())}")

    backbone_fn = backbone_dict[backbone_name.lower()]['model']
    skip_layer_names = backbone_dict[backbone_name.lower()]['skip_layers']

    backbone = backbone_fn(
        include_top=False,
        weights=weights,
        input_tensor=input_tensor,
        input_shape=None if input_tensor is not None else input_tensor.shape[1:]
    )

    backbone.trainable = trainable

    skip_connections = [backbone.get_layer(layer_name).output for layer_name in skip_layer_names]

    initial_features = input_tensor # Using the input tensor

    return backbone, skip_connections, initial_features

def build_backbone_multi_task_unet(
    input_shape,
    num_classes,
    backbone_name='mobilenetv2',
    backbone_trainable=False,
    filters_base=24,
    dropout_rate=0.1,
    final_activation='sigmoid' # Changed to sigmoid for binary crossentropy
):
    inputs = layers.Input(shape=input_shape)

    backbone, skip_features, initial_features = get_backbone(
        backbone_name=backbone_name,
        input_tensor=inputs,
        trainable=backbone_trainable
    )

    bottleneck = backbone.output

    bottleneck = separable_residual_block(bottleneck, filters_base*16)
    bottleneck = layers.SpatialDropout2D(dropout_rate*2)(bottleneck)

    d1 = lite_decoder_block(bottleneck, skip_features[0], filters_base*8, dropout_rate) # connect with 16x16 skip
    d2 = lite_decoder_block(d1, skip_features[1], filters_base*4, dropout_rate)         # connect with 32x32 skip
    d3 = lite_decoder_block(d2, skip_features[2], filters_base*2, dropout_rate)         # connect with 64x64 skip
    d4 = lite_decoder_block(d3, skip_features[3], filters_base, dropout_rate)           # connect with 128x128 skip

    # add one more decoder block to upsample to 256x256, connecting with initial features
    d5 = lite_decoder_block(d4, initial_features, filters_base, dropout_rate)           # connect with 256x256 skip (input)


    # Segmentation Output
    segmentation_output = layers.Conv2D(1, (1, 1))(d5) # Apply conv to the final decoder output
    segmentation_output = layers.Activation(final_activation, name='segmentation_output')(segmentation_output)

    # Classification branch from bottleneck
    avg_pool = layers.GlobalAveragePooling2D()(bottleneck)
    max_pool = layers.GlobalMaxPooling2D()(bottleneck)
    pooled_features = layers.concatenate([avg_pool, max_pool])

    # Streamlined classification head
    classification_branch = layers.Dense(128)(pooled_features)
    classification_branch = layers.BatchNormalization()(classification_branch)
    classification_branch = layers.Activation('relu')(classification_branch)
    classification_branch = layers.Dropout(0.2)(classification_branch)

    # Classification Output
    classification_output = layers.Dense(num_classes, activation='softmax', name='classification_output')(classification_branch)


    # Create model with multiple outputs
    model = Model(inputs=inputs, outputs=[segmentation_output, classification_output],
                 name=f'Backbone_MultiTask_UNet_{backbone_name}')

    return model

def compile_model(model):
    """Compile the model with appropriate losses and metrics"""
    losses = {
        'segmentation_output': 'binary_crossentropy',
        'classification_output': 'binary_crossentropy'
    }

    loss_weights = {
        'segmentation_output': 1.0,
        'classification_output': 0.5
    }

    metrics = {
        'segmentation_output': ['accuracy', binary_dice_coefficient],
        'classification_output': ['accuracy']
    }

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=losses,
        loss_weights=loss_weights,
        metrics=metrics
    )

    return model

input_shape = (256, 256, 3)
num_classes = 6

backbone_model = build_backbone_multi_task_unet(
    input_shape,
    num_classes,
    backbone_name='mobilenetv2',
    backbone_trainable=False,
    final_activation='sigmoid'
)
backbone_model = compile_model(backbone_model)
backbone_model.summary()

  backbone = backbone_fn(


In [None]:
input_shape = (256, 256, 3) # ndvi rgb rgbn
num_classes = 6
batch_size = 32

train_dataset, val_dataset = create_datasets(working_path=".", batch_size=32, export_type="RGB")

In [None]:
backbone_model.fit(train_dataset, epochs=10)

Epoch 1/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 304ms/step - classification_output_accuracy: 0.3586 - classification_output_loss: 0.4952 - loss: 0.9462 - segmentation_output_accuracy: 0.5583 - segmentation_output_binary_dice_coefficient: 0.3117 - segmentation_output_loss: 0.6986
Epoch 2/10




[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 215ms/step - classification_output_accuracy: 0.6382 - classification_output_loss: 0.2731 - loss: 0.6206 - segmentation_output_accuracy: 0.7786 - segmentation_output_binary_dice_coefficient: 0.3032 - segmentation_output_loss: 0.4841
Epoch 3/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 205ms/step - classification_output_accuracy: 0.7280 - classification_output_loss: 0.2187 - loss: 0.5751 - segmentation_output_accuracy: 0.7782 - segmentation_output_binary_dice_coefficient: 0.3591 - segmentation_output_loss: 0.4657
Epoch 4/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 217ms/step - classification_output_accuracy: 0.7994 - classification_output_loss: 0.1734 - loss: 0.5194 - segmentation_output_accuracy: 0.7954 - segmentation_output_binary_dice_coefficient: 0.3886 - segmentation_output_loss: 0.4327
Epoch 5/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 207

<keras.src.callbacks.history.History at 0x7a59abacdbd0>

In [None]:
h_with_bn= backbone_model.fit(train_dataset, epochs=40)

Epoch 1/40
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 205ms/step - classification_output_accuracy: 0.9706 - classification_output_loss: 0.0337 - loss: 0.3467 - segmentation_output_accuracy: 0.8417 - segmentation_output_binary_dice_coefficient: 0.5515 - segmentation_output_loss: 0.3298
Epoch 2/40




[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 205ms/step - classification_output_accuracy: 0.9770 - classification_output_loss: 0.0299 - loss: 0.3276 - segmentation_output_accuracy: 0.8535 - segmentation_output_binary_dice_coefficient: 0.5629 - segmentation_output_loss: 0.3126
Epoch 3/40
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 206ms/step - classification_output_accuracy: 0.9665 - classification_output_loss: 0.0358 - loss: 0.3225 - segmentation_output_accuracy: 0.8580 - segmentation_output_binary_dice_coefficient: 0.5749 - segmentation_output_loss: 0.3046
Epoch 4/40
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 209ms/step - classification_output_accuracy: 0.9866 - classification_output_loss: 0.0195 - loss: 0.2867 - segmentation_output_accuracy: 0.8719 - segmentation_output_binary_dice_coefficient: 0.6031 - segmentation_output_loss: 0.2770
Epoch 5/40
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 205

KeyboardInterrupt: 

In [None]:
backbone_model.evaluate(val_dataset)

[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 378ms/step - classification_output_accuracy: 0.6692 - classification_output_loss: 0.4497 - loss: 1.1448 - segmentation_output_accuracy: 0.7726 - segmentation_output_binary_dice_coefficient: 0.4422 - segmentation_output_loss: 0.9199




[1.1564478874206543,
 0.8964262008666992,
 0.516542911529541,
 0.6555299758911133,
 0.7758695483207703,
 0.4052949845790863]

In [None]:
backbone_model.save("fine_tuned_mobilenetv2.keras")

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(backbone_model)
tflite_model = converter.convert()
with open('fine_tuned_mobilenetv2.tflite', 'wb') as f:
    f.write(tflite_model)

## Keeping same size

## 512x512

In [None]:
!gdown 1aWNF8y4pBFBok6Pbug5aszC9ox_WVKMo # 150 peer samples val
!gdown 14RBBexZP7OqMD6z7YjsmliHe155q4eTC # 1000 peer samples train

Downloading...
From (original): https://drive.google.com/uc?id=1aWNF8y4pBFBok6Pbug5aszC9ox_WVKMo
From (redirected): https://drive.google.com/uc?id=1aWNF8y4pBFBok6Pbug5aszC9ox_WVKMo&confirm=t&uuid=8789b2f1-30eb-4fb6-8976-2be8bc194aa0
To: /content/val.zip
100% 732M/732M [00:10<00:00, 71.4MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=14RBBexZP7OqMD6z7YjsmliHe155q4eTC
From (redirected): https://drive.google.com/uc?id=14RBBexZP7OqMD6z7YjsmliHe155q4eTC&confirm=t&uuid=6b7e1d67-4a71-4d9c-8b5f-62701b23a18b
To: /content/train.zip
100% 4.97G/4.97G [01:58<00:00, 42.1MB/s]


In [None]:
!unzip train >> /dev/null
!unzip val >> /dev/null

In [None]:
def separable_residual_block(input_tensor, num_filters, kernel_size=3):
    x = layers.SeparableConv2D(num_filters, (kernel_size, kernel_size), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.SeparableConv2D(num_filters, (kernel_size, kernel_size), padding='same')(x)
    x = layers.BatchNormalization()(x)

    if input_tensor.shape[-1] != num_filters:
        input_tensor = layers.Conv2D(num_filters, (1, 1))(input_tensor)

    x = layers.add([x, input_tensor])
    x = layers.Activation('relu')(x)
    return x

def lite_encoder_block(input_tensor, num_filters, dropout_rate=0.1):
    x = layers.SeparableConv2D(num_filters, (3, 3), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    if dropout_rate > 0:
        x = layers.SpatialDropout2D(dropout_rate)(x)

    p = layers.MaxPooling2D((2, 2))(x)
    return x, p

def lite_decoder_block(input_tensor, skip_features, num_filters, dropout_rate=0.1):
    x = layers.Conv2DTranspose(num_filters, (3, 3), strides=(2, 2), padding='same')(input_tensor)

    # Ensure compatible shapes for concatenation
    if x.shape[1] != skip_features.shape[1] or x.shape[2] != skip_features.shape[2]:
        x = layers.Resizing(skip_features.shape[1], skip_features.shape[2])(x)

    # Concatenate skip features
    x = layers.concatenate([x, skip_features], axis=-1)

    # Reduce parameter count with separable convolution
    x = layers.SeparableConv2D(num_filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    if dropout_rate > 0:
        x = layers.SpatialDropout2D(dropout_rate)(x)
    return x

def build_optimized_multi_task_unet_512(
    input_shape,
    num_classes,
    filters_base=16,  # Reduced base filters to manage memory for 512x512
    dropout_rate=0.1,
    final_activation='sigmoid'
):
    inputs = layers.Input(shape=input_shape)

    # Initial convolution - using standard conv for better feature extraction at start
    x = layers.Conv2D(filters_base, (3, 3), padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    # Encoder Path - 5 levels for 512x512 (vs 4 levels for 256x256)
    # Level 1: 512x512 -> 256x256
    s1, p1 = lite_encoder_block(x, filters_base, dropout_rate)

    # Level 2: 256x256 -> 128x128
    s2, p2 = lite_encoder_block(p1, filters_base*2, dropout_rate)

    # Level 3: 128x128 -> 64x64
    s3, p3 = lite_encoder_block(p2, filters_base*4, dropout_rate)

    # Level 4: 64x64 -> 32x32
    s4, p4 = lite_encoder_block(p3, filters_base*8, dropout_rate)

    # Level 5: 32x32 -> 16x16 (NEW LEVEL)
    s5, p5 = lite_encoder_block(p4, filters_base*16, dropout_rate)

    # Bottleneck at 16x16 - use separable residual block for performance
    bottleneck = separable_residual_block(p5, filters_base*32)  # Increased filters for bottleneck
    bottleneck = layers.SpatialDropout2D(dropout_rate*2)(bottleneck)

    # Decoder Path for Segmentation - 5 levels to match encoder
    # Level 5: 16x16 -> 32x32 (NEW LEVEL)
    d1 = lite_decoder_block(bottleneck, s5, filters_base*16, dropout_rate)

    # Level 4: 32x32 -> 64x64
    d2 = lite_decoder_block(d1, s4, filters_base*8, dropout_rate)

    # Level 3: 64x64 -> 128x128
    d3 = lite_decoder_block(d2, s3, filters_base*4, dropout_rate)

    # Level 2: 128x128 -> 256x256
    d4 = lite_decoder_block(d3, s2, filters_base*2, dropout_rate)

    # Level 1: 256x256 -> 512x512
    d5 = lite_decoder_block(d4, s1, filters_base, dropout_rate)

    # Segmentation Output at 512x512
    segmentation_output = layers.Conv2D(1, (1, 1))(d5)
    segmentation_output = layers.Activation(final_activation, name='segmentation_output')(segmentation_output)

    # Classification branch - using bottleneck features
    avg_pool = layers.GlobalAveragePooling2D()(bottleneck)
    max_pool = layers.GlobalMaxPooling2D()(bottleneck)
    pooled_features = layers.concatenate([avg_pool, max_pool])

    # Streamlined classification head
    classification_branch = layers.Dense(128)(pooled_features)
    classification_branch = layers.BatchNormalization()(classification_branch)
    classification_branch = layers.Activation('relu')(classification_branch)
    classification_branch = layers.Dropout(0.2)(classification_branch)

    # Classification Output
    classification_output = layers.Dense(num_classes, activation='softmax', name='classification_output')(classification_branch)

    model = Model(inputs=inputs, outputs=[classification_output, segmentation_output], name='Optimized_MultiTask_UNet_512')

    return model

def compile_model(model):
    losses = {
        'segmentation_output': 'binary_crossentropy',
        'classification_output': 'binary_crossentropy'
    }

    loss_weights = {
        'segmentation_output': 1.0,
        'classification_output': 0.5
    }

    metrics = {
        'segmentation_output': [binary_dice_coefficient, tf.keras.metrics.MeanIoU(num_classes=2)],
        'classification_output': ['accuracy']
    }

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=losses,
        loss_weights=loss_weights,
        metrics=metrics
    )

    return model

# For 512x512 single-channel input (like NDVI)

model = build_optimized_multi_task_unet_512(
    input_shape=(512, 512, 1),
    num_classes=6,  # Your 6 anomaly classes
    filters_base=16,  # Reduced to manage memory
    dropout_rate=0.1
)
model = compile_model(model)
model.summary()

In [None]:
train_dataset, val_dataset = create_datasets(working_path=".", batch_size=32, export_type="NDVI", input_shape=(512, 512, 1))

model.fit(train_dataset, epochs=10, validation_split=0.1)

Epoch 1/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m316s[0m 1s/step - classification_output_accuracy: 0.2206 - classification_output_loss: 0.5531 - loss: 0.8449 - segmentation_output_accuracy: 0.7418 - segmentation_output_binary_dice_coefficient: 0.2647 - segmentation_output_loss: 0.5684
Epoch 2/10




[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m203s[0m 1s/step - classification_output_accuracy: 0.3881 - classification_output_loss: 0.4073 - loss: 0.7294 - segmentation_output_accuracy: 0.7735 - segmentation_output_binary_dice_coefficient: 0.2551 - segmentation_output_loss: 0.5258
Epoch 3/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m200s[0m 1s/step - classification_output_accuracy: 0.4342 - classification_output_loss: 0.3854 - loss: 0.7186 - segmentation_output_accuracy: 0.7717 - segmentation_output_binary_dice_coefficient: 0.2571 - segmentation_output_loss: 0.5259
Epoch 4/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m200s[0m 1s/step - classification_output_accuracy: 0.4746 - classification_output_loss: 0.3615 - loss: 0.7014 - segmentation_output_accuracy: 0.7727 - segmentation_output_binary_dice_coefficient: 0.2592 - segmentation_output_loss: 0.5206
Epoch 5/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m204s[0m 1s/step 

<keras.src.callbacks.history.History at 0x7d3b5a9d0e50>

In [None]:
import tensorflow.keras.applications as keras_applications

def separable_residual_block(input_tensor, num_filters, kernel_size=3):
    """Lightweight residual block using separable convolutions"""
    x = layers.SeparableConv2D(num_filters, (kernel_size, kernel_size), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.SeparableConv2D(num_filters, (kernel_size, kernel_size), padding='same')(x)
    x = layers.BatchNormalization()(x)


    if input_tensor.shape[-1] != num_filters:
        input_tensor = layers.Conv2D(num_filters, (1, 1))(input_tensor)

    x = layers.add([x, input_tensor])
    x = layers.Activation('relu')(x)
    return x

def lite_encoder_block(input_tensor, num_filters, dropout_rate=0.1):
    """Lightweight encoder block"""
    x = layers.SeparableConv2D(num_filters, (3, 3), padding='same')(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    if dropout_rate > 0:
        x = layers.SpatialDropout2D(dropout_rate)(x)

    p = layers.MaxPooling2D((2, 2))(x)
    return x, p

def lite_decoder_block(input_tensor, skip_features, num_filters, dropout_rate=0.1):
    """Lightweight decoder block"""
    x = layers.Conv2DTranspose(num_filters, (3, 3), strides=(2, 2), padding='same')(input_tensor)


    if x.shape[1] != skip_features.shape[1] or x.shape[2] != skip_features.shape[2]:
        x = layers.Resizing(skip_features.shape[1], skip_features.shape[2])(x)


    x = layers.concatenate([x, skip_features], axis=-1)


    x = layers.SeparableConv2D(num_filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    if dropout_rate > 0:
        x = layers.SpatialDropout2D(dropout_rate)(x)
    return x

def binary_dice_coefficient(y_true, y_pred):
    """Binary Dice coefficient metric"""
    smooth = 1.0
    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth)

def get_backbone(backbone_name, input_tensor, weights='imagenet', trainable=False):
    backbone_dict = {
        'mobilenetv2': {
            'model': keras_applications.MobileNetV2,

            'skip_layers': ['block_16_expand_relu',
                           'block_13_expand_relu',
                           'block_6_expand_relu',
                           'block_3_expand_relu',
                           'block_1_expand_relu']
        }
    }

    if backbone_name.lower() not in backbone_dict:
        raise ValueError(f"Backbone {backbone_name} not supported. Choose from: {list(backbone_dict.keys())}")

    backbone_fn = backbone_dict[backbone_name.lower()]['model']
    skip_layer_names = backbone_dict[backbone_name.lower()]['skip_layers']

    backbone = backbone_fn(
        include_top=False,
        weights=weights,
        input_tensor=input_tensor,
        input_shape=None if input_tensor is not None else input_tensor.shape[1:]
    )

    backbone.trainable = trainable

    skip_connections = [backbone.get_layer(layer_name).output for layer_name in skip_layer_names]

    initial_features = input_tensor

    return backbone, skip_connections, initial_features

def build_backbone_multi_task_unet_512(
    input_shape,
    num_classes,
    backbone_name='mobilenetv2',
    backbone_trainable=False,
    filters_base=16,
    dropout_rate=0.1,
    final_activation='sigmoid'
):

    inputs = layers.Input(shape=input_shape)

    backbone, skip_features, initial_features = get_backbone(
        backbone_name=backbone_name,
        input_tensor=inputs,
        trainable=backbone_trainable
    )

    bottleneck = backbone.output

    bottleneck = separable_residual_block(bottleneck, filters_base*16)
    bottleneck = layers.SpatialDropout2D(dropout_rate*2)(bottleneck)

    d1 = lite_decoder_block(bottleneck, skip_features[0], filters_base*16, dropout_rate)

    d2 = lite_decoder_block(d1, skip_features[1], filters_base*8, dropout_rate)

    d3 = lite_decoder_block(d2, skip_features[2], filters_base*4, dropout_rate)

    d4 = lite_decoder_block(d3, skip_features[3], filters_base*2, dropout_rate)

    d5 = lite_decoder_block(d4, skip_features[4], filters_base, dropout_rate)

    d6 = lite_decoder_block(d5, initial_features, filters_base, dropout_rate)

    segmentation_output = layers.Conv2D(1, (1, 1))(d6)
    segmentation_output = layers.Activation(final_activation, name='segmentation_output')(segmentation_output)


    avg_pool = layers.GlobalAveragePooling2D()(bottleneck)
    max_pool = layers.GlobalMaxPooling2D()(bottleneck)
    pooled_features = layers.concatenate([avg_pool, max_pool])


    classification_branch = layers.Dense(128)(pooled_features)
    classification_branch = layers.BatchNormalization()(classification_branch)
    classification_branch = layers.Activation('relu')(classification_branch)
    classification_branch = layers.Dropout(0.2)(classification_branch)


    classification_output = layers.Dense(num_classes, activation='softmax', name='classification_output')(classification_branch)


    model = Model(inputs=inputs, outputs=[classification_output, segmentation_output],
                 name=f'Backbone_MultiTask_UNet_512_{backbone_name}')

    return model

def compile_model(model):
    losses = {
        'segmentation_output': 'binary_crossentropy',
        'classification_output': 'binary_crossentropy'
    }

    loss_weights = {
        'segmentation_output': 1.0,
        'classification_output': 0.5
    }

    metrics = {
        'segmentation_output': [binary_dice_coefficient, tf.keras.metrics.MeanIoU(num_classes=2)],
        'classification_output': ['accuracy']
    }

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=losses,
        loss_weights=loss_weights,
        metrics=metrics
    )

    return model

input_shape = (512, 512, 3)
num_classes = 6

backbone_model_512 = build_backbone_multi_task_unet_512(
    input_shape,
    num_classes,
    backbone_name='mobilenetv2',
    backbone_trainable=False,
    filters_base=16,
    final_activation='sigmoid'
)
backbone_model_512 = compile_model(backbone_model_512)

  backbone = backbone_fn(


In [None]:
train_dataset, val_dataset = create_datasets(working_path=".", batch_size=32, export_type="RGB", input_shape=(512, 512, 3))

backbone_model_512.fit(train_dataset, epochs=10)

Epoch 1/10
    182/Unknown [1m260s[0m 1s/step - classification_output_accuracy: 0.3219 - classification_output_loss: 0.4978 - loss: 0.9272 - segmentation_output_binary_dice_coefficient: 0.2954 - segmentation_output_loss: 0.6783 - segmentation_output_mean_io_u_1: 0.3846



[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m262s[0m 1s/step - classification_output_accuracy: 0.3222 - classification_output_loss: 0.4974 - loss: 0.9266 - segmentation_output_binary_dice_coefficient: 0.2953 - segmentation_output_loss: 0.6779 - segmentation_output_mean_io_u_1: 0.3846
Epoch 2/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m253s[0m 1s/step - classification_output_accuracy: 0.6034 - classification_output_loss: 0.2959 - loss: 0.6493 - segmentation_output_binary_dice_coefficient: 0.2844 - segmentation_output_loss: 0.5013 - segmentation_output_mean_io_u_1: 0.3887
Epoch 3/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m239s[0m 1s/step - classification_output_accuracy: 0.7090 - classi

<keras.src.callbacks.history.History at 0x7d3b4cb6b810>

In [None]:
backbone_model_512.fit(train_dataset, epochs=10)

Epoch 1/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m237s[0m 1s/step - classification_output_accuracy: 0.9560 - classification_output_loss: 0.0454 - loss: 0.4077 - segmentation_output_binary_dice_coefficient: 0.4481 - segmentation_output_loss: 0.3850 - segmentation_output_mean_io_u_1: 0.3880
Epoch 2/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m233s[0m 1s/step - classification_output_accuracy: 0.9663 - classification_output_loss: 0.0364 - loss: 0.3863 - segmentation_output_binary_dice_coefficient: 0.4688 - segmentation_output_loss: 0.3681 - segmentation_output_mean_io_u_1: 0.3869
Epoch 3/10
[1m182/182[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m231s[0m 1s/step - classification_output_accuracy: 0.9663 - classification_output_loss: 0.0351 - loss: 0.3823 - segmentation_output_binary_dice_coefficient: 0.4756 - segmentation_output_loss: 0.3648 - segmentation_output_mean_io_u_1: 0.3880
Epoch 4/10
[1m 79/182[0m [32m━━━━━━━━[0m[37m━━━━━━━━━━━━[0m 