In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, LearningRateScheduler
import numpy as np
import math

# Helper functions for individual components

def apply_clahe(inputs):
    """Simulate CLAHE operation as a differentiable layer"""
    # In practice, you'd want to use tf.py_function for actual CLAHE
    # This is a simplified approximation using local normalization
    avg_pool = layers.AveragePooling2D(pool_size=(7, 7), strides=(1, 1), padding='same')(inputs)
    normalized = layers.Lambda(lambda x: (x[0] - x[1]) * 0.5 + 0.5)([inputs, avg_pool])
    return normalized

def custom_instance_normalization(inputs):
    """Custom implementation of instance normalization without tensorflow_addons"""
    # Create a custom layer for instance normalization
    class InstanceNormalization(layers.Layer):
        def __init__(self, **kwargs):
            super(InstanceNormalization, self).__init__(**kwargs)

        def build(self, input_shape):
            # Create learnable parameters
            channels = input_shape[-1]
            self.gamma = self.add_weight(
                shape=(channels,),
                initializer='random_uniform',
                trainable=True,
                name='gamma'
            )
            self.beta = self.add_weight(
                shape=(channels,),
                initializer='zeros',
                trainable=True,
                name='beta'
            )
            super(InstanceNormalization, self).build(input_shape)

        def call(self, inputs):
            # Calculate mean and variance for each instance across spatial dimensions
            mean, variance = tf.nn.moments(inputs, axes=[1, 2], keepdims=True)

            # Normalize and scale
            epsilon = 1e-5
            normalized = (inputs - mean) / tf.sqrt(variance + epsilon)

            # Reshape gamma and beta for broadcasting
            gamma_broadcasted = tf.reshape(self.gamma, [1, 1, 1, -1])
            beta_broadcasted = tf.reshape(self.beta, [1, 1, 1, -1])

            # Apply scaling and shifting
            return normalized * gamma_broadcasted + beta_broadcasted

    # Create and apply the custom layer
    norm_layer = InstanceNormalization()
    return norm_layer(inputs)

def adaptive_input_normalization(inputs):
    """Adaptive input normalization layer using custom implementation"""
    # Replace tfa.layers.InstanceNormalization with custom implementation
    x = custom_instance_normalization(inputs)
    return x

def squeeze_excitation_block(inputs, ratio=16):
    """Squeeze and Excitation block for channel attention"""
    channel_axis = -1
    filters = inputs.shape[channel_axis]

    se = layers.GlobalAveragePooling2D()(inputs)
    se = layers.Reshape((1, 1, filters))(se)
    se = layers.Dense(filters // ratio, activation='relu', use_bias=False)(se)
    se = layers.Dense(filters, activation='sigmoid', use_bias=False)(se)

    return layers.multiply([inputs, se])

def dual_path_compression_stem(inputs, filters=64):
    """Dual-Path Compression Stem"""
    # Path A: Standard convolutions
    path_a = layers.Conv2D(filters, kernel_size=3, strides=2, padding='same')(inputs)
    path_a = layers.BatchNormalization()(path_a)
    path_a = layers.ReLU()(path_a)
    path_a = layers.Conv2D(filters, kernel_size=3, padding='same')(path_a)
    path_a = layers.BatchNormalization()(path_a)
    path_a = layers.ReLU()(path_a)

    # Path B: Dilated convolutions - FIX: Separate stride and dilation
    # First downsample with strided conv
    path_b = layers.Conv2D(filters, kernel_size=3, strides=2, padding='same')(inputs)
    path_b = layers.BatchNormalization()(path_b)
    path_b = layers.ReLU()(path_b)
    # Then apply dilated conv without stride
    path_b = layers.Conv2D(filters, kernel_size=3, padding='same', dilation_rate=2)(path_b)
    path_b = layers.BatchNormalization()(path_b)
    path_b = layers.ReLU()(path_b)

    # Feature fusion through channel attention
    concat = layers.Concatenate()([path_a, path_b])
    attention = squeeze_excitation_block(concat)

    return attention

def biforked_residual_block(inputs, filters, downsample=False):
    """BiForked Residual Block (BFRB)"""
    stride = 2 if downsample else 1
    channel_dim = inputs.shape[-1]

    # Shortcut connection
    if downsample or channel_dim != filters:
        shortcut = layers.Conv2D(filters, kernel_size=1, strides=stride, padding='same')(inputs)
        shortcut = layers.BatchNormalization()(shortcut)
    else:
        shortcut = inputs

    # Standard pathway
    std_path = layers.Conv2D(filters, kernel_size=3, strides=stride, padding='same')(inputs)
    std_path = layers.BatchNormalization()(std_path)
    std_path = layers.ReLU()(std_path)
    std_path = layers.Conv2D(filters, kernel_size=1, padding='same')(std_path)
    std_path = layers.BatchNormalization()(std_path)

    # Efficient pathway
    eff_path = layers.Conv2D(filters//2, kernel_size=1, strides=1, padding='same')(inputs)
    eff_path = layers.BatchNormalization()(eff_path)
    eff_path = layers.ReLU()(eff_path)
    eff_path = layers.DepthwiseConv2D(kernel_size=3, strides=stride, padding='same')(eff_path)
    eff_path = layers.BatchNormalization()(eff_path)
    eff_path = layers.Conv2D(filters, kernel_size=1, padding='same')(eff_path)
    eff_path = layers.BatchNormalization()(eff_path)

    # Apply SE to each path
    std_path = squeeze_excitation_block(std_path)
    eff_path = squeeze_excitation_block(eff_path)

    # Combine paths with learnable weights
    alpha = tf.Variable(0.5, trainable=True, dtype=tf.float32)
    beta = 1.0 - alpha
    weighted_paths = layers.Lambda(lambda x: alpha * x[0] + beta * x[1])([std_path, eff_path])

    # Add residual connection
    output = layers.Add()([weighted_paths, shortcut])
    output = layers.ReLU()(output)

    return output

def progressive_feature_resolution_module(inputs, filters):
    """Progressive Feature Resolution (PFR) Module"""
    # Max pooling branch
    pool_branch = layers.MaxPooling2D(pool_size=2, strides=2)(inputs)
    pool_branch = layers.Conv2D(filters, kernel_size=1, padding='same')(pool_branch)
    pool_branch = layers.BatchNormalization()(pool_branch)
    pool_branch = layers.ReLU()(pool_branch)

    # Strided convolution branch
    conv_branch = layers.Conv2D(filters, kernel_size=3, strides=2, padding='same')(inputs)
    conv_branch = layers.BatchNormalization()(conv_branch)
    conv_branch = layers.ReLU()(conv_branch)

    # Feature selection gate
    gate = layers.Concatenate()([pool_branch, conv_branch])
    gate = layers.Conv2D(2, kernel_size=1, padding='same')(gate)
    gate = layers.Softmax(axis=-1)(gate)

    # Split gate channels
    gate_0 = layers.Lambda(lambda x: x[..., 0:1])(gate)
    gate_1 = layers.Lambda(lambda x: x[..., 1:2])(gate)

    # Apply gates
    gated_pool = layers.Multiply()([pool_branch, gate_0])
    gated_conv = layers.Multiply()([conv_branch, gate_1])

    output = layers.Add()([gated_pool, gated_conv])
    return output

def multi_receptive_field_inception_block(inputs, filters):
    """Multi-Receptive Field Inception (MRFI) Block"""
    # 1x1 convolution branch
    branch_1x1 = layers.Conv2D(filters//4, kernel_size=1, padding='same')(inputs)
    branch_1x1 = layers.BatchNormalization()(branch_1x1)
    branch_1x1 = layers.ReLU()(branch_1x1)

    # 3x3 convolution branch
    branch_3x3 = layers.Conv2D(filters//4, kernel_size=1, padding='same')(inputs)
    branch_3x3 = layers.BatchNormalization()(branch_3x3)
    branch_3x3 = layers.ReLU()(branch_3x3)
    branch_3x3 = layers.Conv2D(filters//4, kernel_size=3, padding='same')(branch_3x3)
    branch_3x3 = layers.BatchNormalization()(branch_3x3)
    branch_3x3 = layers.ReLU()(branch_3x3)

    # 5x5 convolution branch (implemented as two 3x3 convs)
    branch_5x5 = layers.Conv2D(filters//4, kernel_size=1, padding='same')(inputs)
    branch_5x5 = layers.BatchNormalization()(branch_5x5)
    branch_5x5 = layers.ReLU()(branch_5x5)
    branch_5x5 = layers.Conv2D(filters//4, kernel_size=3, padding='same')(branch_5x5)
    branch_5x5 = layers.BatchNormalization()(branch_5x5)
    branch_5x5 = layers.ReLU()(branch_5x5)
    branch_5x5 = layers.Conv2D(filters//4, kernel_size=3, padding='same')(branch_5x5)
    branch_5x5 = layers.BatchNormalization()(branch_5x5)
    branch_5x5 = layers.ReLU()(branch_5x5)

    # 7x7 convolution branch (implemented as three 3x3 convs)
    branch_7x7 = layers.Conv2D(filters//4, kernel_size=1, padding='same')(inputs)
    branch_7x7 = layers.BatchNormalization()(branch_7x7)
    branch_7x7 = layers.ReLU()(branch_7x7)
    branch_7x7 = layers.Conv2D(filters//4, kernel_size=3, padding='same')(branch_7x7)
    branch_7x7 = layers.BatchNormalization()(branch_7x7)
    branch_7x7 = layers.ReLU()(branch_7x7)
    branch_7x7 = layers.Conv2D(filters//4, kernel_size=3, padding='same')(branch_7x7)
    branch_7x7 = layers.BatchNormalization()(branch_7x7)
    branch_7x7 = layers.ReLU()(branch_7x7)
    branch_7x7 = layers.Conv2D(filters//4, kernel_size=3, padding='same')(branch_7x7)
    branch_7x7 = layers.BatchNormalization()(branch_7x7)
    branch_7x7 = layers.ReLU()(branch_7x7)

    # Spatial pyramid pooling
    avg_pool = layers.AveragePooling2D(pool_size=(2, 2), strides=1, padding='same')(inputs)
    avg_pool = layers.Conv2D(filters//4, kernel_size=1, padding='same')(avg_pool)
    avg_pool = layers.BatchNormalization()(avg_pool)
    avg_pool = layers.ReLU()(avg_pool)

    # Concatenate all branches
    output = layers.Concatenate()([branch_1x1, branch_3x3, branch_5x5, branch_7x7, avg_pool])

    # Channel-wise dynamic weighting
    output = squeeze_excitation_block(output)

    return output

def cross_stitch_feature_fusion(feature_map_1, feature_map_2):
    """Cross-Stitch Feature Fusion between feature maps"""
    # Ensure same dimensions by properly adjusting both feature maps

    # Get dimensions for debugging
    h1, w1 = feature_map_1.shape[1:3]
    h2, w2 = feature_map_2.shape[1:3]

    # Determine which feature map needs resizing and adjust accordingly
    if h1 > h2:  # feature_map_1 is larger -> downsample it
        feature_map_1_adjusted = layers.AveragePooling2D(
            pool_size=(h1 // h2, w1 // w2)
        )(feature_map_1)
        feature_map_2_adjusted = feature_map_2
    elif h2 > h1:  # feature_map_2 is larger -> downsample it
        feature_map_2_adjusted = layers.AveragePooling2D(
            pool_size=(h2 // h1, w2 // w1)
        )(feature_map_2)
        feature_map_1_adjusted = feature_map_1
    else:  # Same size, no adjustment needed
        feature_map_1_adjusted = feature_map_1
        feature_map_2_adjusted = feature_map_2

    # Ensure channels are compatible
    c1 = feature_map_1_adjusted.shape[-1]
    c2 = feature_map_2_adjusted.shape[-1]

    if c1 != c2:
        # Make channels compatible via 1x1 convolutions
        if c1 > c2:
            feature_map_2_adjusted = layers.Conv2D(c1, kernel_size=1, padding='same')(feature_map_2_adjusted)
            feature_map_2_adjusted = layers.BatchNormalization()(feature_map_2_adjusted)
            target_channels = c1
        else:
            feature_map_1_adjusted = layers.Conv2D(c2, kernel_size=1, padding='same')(feature_map_1_adjusted)
            feature_map_1_adjusted = layers.BatchNormalization()(feature_map_1_adjusted)
            target_channels = c2
    else:
        target_channels = c1  # They're already the same

    # Cross-stitch connections with learnable weights
    alpha_11 = tf.Variable(0.9, trainable=True, dtype=tf.float32)
    alpha_12 = tf.Variable(0.1, trainable=True, dtype=tf.float32)
    alpha_21 = tf.Variable(0.1, trainable=True, dtype=tf.float32)
    alpha_22 = tf.Variable(0.9, trainable=True, dtype=tf.float32)

    # Apply cross-stitch
    output_1 = layers.Lambda(lambda x: alpha_11 * x[0] + alpha_12 * x[1])([feature_map_1_adjusted, feature_map_2_adjusted])
    output_2 = layers.Lambda(lambda x: alpha_21 * x[0] + alpha_22 * x[1])([feature_map_1_adjusted, feature_map_2_adjusted])

    return output_1, output_2

def self_calibrated_convolution(inputs, filters):
    """Self-Calibrated Convolutions"""
    # Create separate paths using Conv2D to split features instead of tf.split
    # First path - create half the filters
    part1 = layers.Conv2D(filters//2, kernel_size=1, padding='same')(inputs)
    part1 = layers.BatchNormalization()(part1)

    # First part for spatial context modeling
    context = layers.Conv2D(filters//2, kernel_size=1, padding='same')(part1)
    context = layers.BatchNormalization()(context)

    # Apply spatial transformation
    context_1 = layers.Conv2D(filters//2, kernel_size=3, padding='same', dilation_rate=1)(context)
    context_1 = layers.BatchNormalization()(context_1)

    context_2 = layers.Conv2D(filters//2, kernel_size=3, padding='same', dilation_rate=2)(context)
    context_2 = layers.BatchNormalization()(context_2)

    context = layers.Add()([context_1, context_2])
    context = layers.ReLU()(context)

    # Second path - create other half of features
    part2 = layers.Conv2D(filters//2, kernel_size=1, padding='same')(inputs)
    part2 = layers.BatchNormalization()(part2)

    # Apply feature transformation
    transform = layers.Conv2D(filters//2, kernel_size=3, padding='same')(part2)
    transform = layers.BatchNormalization()(transform)
    transform = layers.ReLU()(transform)

    # Calibration through context
    calibrated = layers.Multiply()([transform, context])
    calibrated = layers.Conv2D(filters//2, kernel_size=1, padding='same')(calibrated)
    calibrated = layers.BatchNormalization()(calibrated)

    # Combine transformed parts
    output = layers.Concatenate()([calibrated, context])
    output = layers.ReLU()(output)

    return output

def global_context_modeling(inputs, filters, reduction=16):
    """Global Context Modeling with efficient attention"""
    batch_size, h, w, c = inputs.shape

    # Ensure input channels match the specified filters by using a 1x1 conv if needed
    if c != filters:
        inputs = layers.Conv2D(filters, kernel_size=1, padding='same')(inputs)
        inputs = layers.BatchNormalization()(inputs)
        inputs = layers.ReLU()(inputs)
        c = filters  # Update c to reflect the new number of channels

    # Generate query, key, value projections
    queries = layers.Conv2D(filters // reduction, kernel_size=1, padding='same')(inputs)
    keys = layers.Conv2D(filters // reduction, kernel_size=1, padding='same')(inputs)
    values = layers.Conv2D(filters, kernel_size=1, padding='same')(inputs)

    # Reshape for matrix multiplication
    queries = layers.Reshape((-1, filters // reduction))(queries)  # (B, HW, C//R)
    keys = layers.Reshape((-1, filters // reduction))(keys)        # (B, HW, C//R)
    values = layers.Reshape((-1, filters))(values)                # (B, HW, C)

    # Transpose keys for attention matrix computation
    keys_transposed = layers.Permute((2, 1))(keys)  # (B, C//R, HW)

    # Compute attention scores (low-rank approximation)
    attn = layers.Lambda(lambda x: tf.matmul(x[0], x[1]) / tf.sqrt(tf.cast(filters // reduction, tf.float32)))([queries, keys_transposed])
    attn = layers.Softmax(axis=-1)(attn)  # (B, HW, HW)

    # Apply attention to values
    context = layers.Lambda(lambda x: tf.matmul(x[0], x[1]))([attn, values])  # (B, HW, C)
    context = layers.Reshape((h, w, filters))(context)  # (B, H, W, C)

    # Combine with input feature map
    output = layers.Add()([inputs, context])

    # Use custom layer normalization implemented as a proper Keras layer
    class CustomLayerNorm(layers.Layer):
        def __init__(self, **kwargs):
            super(CustomLayerNorm, self).__init__(**kwargs)

        def build(self, input_shape):
            # Create learnable parameters
            channels = input_shape[-1]
            self.gamma = self.add_weight(
                shape=(channels,),
                initializer='ones',
                trainable=True,
                name='gamma'
            )
            self.beta = self.add_weight(
                shape=(channels,),
                initializer='zeros',
                trainable=True,
                name='beta'
            )
            super(CustomLayerNorm, self).build(input_shape)

        def call(self, inputs):
            # Calculate mean and variance for each instance across channel dimension
            mean, variance = tf.nn.moments(inputs, axes=[-1], keepdims=True)

            # Normalize and scale
            epsilon = 1e-5
            normalized = (inputs - mean) / tf.sqrt(variance + epsilon)

            # Reshape gamma and beta for broadcasting
            gamma_broadcasted = tf.reshape(self.gamma, [1, 1, 1, -1])
            beta_broadcasted = tf.reshape(self.beta, [1, 1, 1, -1])

            # Apply scaling and shifting
            return normalized * gamma_broadcasted + beta_broadcasted

    # Apply layer normalization
    output = CustomLayerNorm()(output)

    return output

def stochastic_depth(x, training, drop_rate=0.2):
    """Apply stochastic depth to the input tensor during training"""
    if not training or drop_rate == 0:
        return x

    # Create binary tensor for random dropping
    batch_size = tf.shape(x)[0]
    random_tensor = tf.random.uniform(shape=[batch_size, 1, 1, 1], minval=0, maxval=1)
    binary_tensor = tf.floor(random_tensor + 1 - drop_rate)

    # Scale the kept features
    output = tf.math.divide(x, 1 - drop_rate) * binary_tensor
    return output

def build_brin_model(input_shape=(224, 224, 3), num_classes=4):
    """Build the complete BiForked Residual-Inception Network (BRIN)"""
    inputs = Input(shape=input_shape)

    # === Input Processing ===
    # Adaptive input normalization
    x = adaptive_input_normalization(inputs)

    # Multi-scale image enhancement with CLAHE
    clahe_branch = apply_clahe(x)
    x = layers.Concatenate()([x, clahe_branch])
    x = layers.Conv2D(64, kernel_size=1, padding='same')(x)

    # === Main Network ===
    # Dual-Path Compression Stem
    x = dual_path_compression_stem(x, filters=64)

    # Store intermediate features for cross-stitch connections
    features = []

    # Stage 1: BiForked Residual Blocks
    x = biforked_residual_block(x, filters=64)
    x = biforked_residual_block(x, filters=64)
    features.append(x)  # Store for cross-stitch

    # Progressive Feature Resolution - downsampling
    x = progressive_feature_resolution_module(x, filters=128)

    # Stage 2: BiForked Residual Blocks with MRFI
    x = biforked_residual_block(x, filters=128)
    x = multi_receptive_field_inception_block(x, filters=128)
    x = biforked_residual_block(x, filters=128)
    features.append(x)  # Store for cross-stitch

    # Progressive Feature Resolution - downsampling
    x = progressive_feature_resolution_module(x, filters=256)

    # Stage 3: More complex blocks with self-calibration
    x = biforked_residual_block(x, filters=256)
    x = self_calibrated_convolution(x, filters=256)
    x = multi_receptive_field_inception_block(x, filters=256)
    features.append(x)  # Store for cross-stitch

    # Apply cross-stitch feature fusion between feature maps
    # FIX: Only do cross-stitch between features with most similar spatial dimensions
    # and make sure to properly resize before attempting to add the result
    if len(features) >= 2:
        # For this specific case, we know cs_output2 and x will have compatible dimensions
        # because we're applying cross-stitch between the latest two feature maps
        cs_output1, cs_output2 = cross_stitch_feature_fusion(features[-2], features[-1])

        # Verify that dimensions match before adding
        if cs_output2.shape[1:3] == x.shape[1:3]:
            x = layers.Add()([x, cs_output2])
        else:
            # If they still don't match (just to be safe), resize cs_output2 to match x
            if cs_output2.shape[1] > x.shape[1]:
                # Downsample cs_output2
                cs_output2 = layers.AveragePooling2D(
                    pool_size=(cs_output2.shape[1] // x.shape[1], cs_output2.shape[2] // x.shape[2])
                )(cs_output2)
            else:
                # Upsample cs_output2
                cs_output2 = layers.UpSampling2D(
                    size=(x.shape[1] // cs_output2.shape[1], x.shape[2] // cs_output2.shape[2])
                )(cs_output2)

            # Now they should be compatible
            x = layers.Add()([x, cs_output2])

    # Progressive Feature Resolution - downsampling
    x = progressive_feature_resolution_module(x, filters=512)

    # Stage 4: Final stage with global context
    x = biforked_residual_block(x, filters=512)
    x = self_calibrated_convolution(x, filters=512)
    x = multi_receptive_field_inception_block(x, filters=512)
    x = global_context_modeling(x, filters=512)

    # === Classification Head ===
    # Adaptive pooling
    x = layers.GlobalAveragePooling2D()(x)

    # Advanced dropout with scheduled rate
    x = layers.Dropout(0.5)(x)

    # Classification layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    # Create model
    model = Model(inputs=inputs, outputs=outputs, name="BRIN")

    return model

def build_training_components(model, learning_rate=0.001):
    """Set up training components for the BRIN model"""
    # Model compilation with mixed precision
    optimizer = Adam(learning_rate=learning_rate)

    # Loss with label smoothing
    loss = tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1)

    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=['accuracy', tf.keras.metrics.AUC(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
    )

    # Learning rate scheduler with warmup and cosine annealing
    def lr_scheduler(epoch, lr):
        warmup_epochs = 5
        if epoch < warmup_epochs:
            return learning_rate * ((epoch + 1) / warmup_epochs)
        else:
            # Cosine annealing with restarts
            cycle_length = 30
            cycle = (epoch - warmup_epochs) // cycle_length
            cycle_epoch = (epoch - warmup_epochs) % cycle_length
            return learning_rate * 0.5 * (1 + math.cos(math.pi * cycle_epoch / cycle_length)) * (0.8 ** cycle)

    callbacks = [
        ModelCheckpoint('brin_model_best.h5', save_best_only=True, monitor='val_accuracy'),
        LearningRateScheduler(lr_scheduler),
        EarlyStopping(patience=15, restore_best_weights=True),
    ]

    return callbacks

def progressive_layer_activation_training(model, train_data, val_data, epochs=100, batch_size=32):
    """Train with progressive layer activation strategy"""
    # This is a conceptual implementation - actual implementation would require
    # modifying the model architecture to support layer freezing/unfreezing dynamically

    # Initial training with only early layers
    print("Phase 1: Training early layers...")
    # Freeze later layers (conceptual)

    # Gradually unfreeze and train deeper layers
    print("Phase 2: Progressive unfreezing and training...")
    # Unfreeze middle layers (conceptual)

    # Final phase with all layers
    print("Phase 3: Full network training...")
    # All layers unfrozen (conceptual)

    # In actual implementation, you would build callback logic to handle this
    model.fit(
        train_data,
        validation_data=val_data,
        epochs=epochs,
        batch_size=batch_size,
        callbacks=build_training_components(model)
    )

    return model

# Example usage:

if __name__ == "__main__":
    # Create the model
    model = build_brin_model(input_shape=(224, 224, 3), num_classes=4)

    # Print model summary
    model.summary()

    # Example of setting up a training pipeline
    # Note: Replace with your actual data pipeline
    callbacks = build_training_components(model)

    # For visualization with TensorBoard
    # tf.keras.utils.plot_model(model, to_file='brin_model.png', show_shapes=True)

    print("Model built successfully!")

Model built successfully!


In [4]:

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, Conv2D, MaxPooling2D, BatchNormalization,
                                     Activation, Dropout, GlobalAveragePooling2D,
                                     Dense, Reshape, GRU, Bidirectional, LSTM, Flatten)
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os

# ------------------------------
# Dataset Path
# ------------------------------
dataset_path = '/content/drive/MyDrive/DL_dataser/Blood cell Cancer [ALL]'  # Replace with your dataset path

# ------------------------------
# Data Generators
# ------------------------------
datagen = ImageDataGenerator(
    rescale=1.0/255.0,
    rotation_range=20,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.1,
    horizontal_flip=True,
    brightness_range=[0.8, 1.2],
    fill_mode='nearest',
    validation_split=0.2
)

train_gen = datagen.flow_from_directory(
    dataset_path,
    target_size=(224, 224),
    batch_size=32,
    class_mode='categorical',
    subset='training',
    shuffle=True,
    seed=42
)

val_gen = datagen.flow_from_directory(
    dataset_path,
    target_size=(224, 224),
    batch_size=32,
    class_mode='categorical',
    subset='validation',
    shuffle=False,
    seed=42
)


Found 2595 images belonging to 4 classes.
Found 647 images belonging to 4 classes.


In [5]:
model = build_brin_model(input_shape=(224, 224, 3), num_classes=4)

# ------------------------------
# Train the model
# ------------------------------

checkpoint_path = "build_brin_model.keras"
checkpoint = ModelCheckpoint(checkpoint_path, monitor='val_accuracy', save_best_only=True, verbose=1, mode='max')

# Compile the model before training
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) # Add compile step

# 🚀 Train
history = model.fit(
    train_gen,
    epochs=31,
    validation_data=val_gen,
    callbacks=[checkpoint]
)



  self._warn_if_super_not_called()


Epoch 1/31
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24s/step - accuracy: 0.6963 - loss: 1.2037 
Epoch 1: val_accuracy improved from -inf to 0.30139, saving model to build_brin_model.keras
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2644s[0m 30s/step - accuracy: 0.6977 - loss: 1.1972 - val_accuracy: 0.3014 - val_loss: 6.5676
Epoch 2/31
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1s/step - accuracy: 0.8724 - loss: 0.3741
Epoch 2: val_accuracy did not improve from 0.30139
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m111s[0m 1s/step - accuracy: 0.8726 - loss: 0.3735 - val_accuracy: 0.2952 - val_loss: 8.1233
Epoch 3/31
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1s/step - accuracy: 0.9239 - loss: 0.2344
Epoch 3: val_accuracy improved from 0.30139 to 0.30603, saving model to build_brin_model.keras
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m108s[0m 1s/step - accuracy: 0.9239 - loss: 0.2346