In [2]:
!nvidia-smi

In [3]:
import tensorflow as tf
print("TensorFlow version:", tf.__version__)
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))  
print("GPU Details:", tf.config.list_physical_devices('GPU'))

In [None]:
# The following code will only execute
# successfully when compression is complete

import kagglehub

# Download latest version
path = kagglehub.dataset_download("kumarkushagra7/merge-dataset")

print("Path to dataset files:", path)

In [4]:
import sys
sys.path.append("..")

from load_dataset import load_dataset

train_gen, val_gen, train_gen_class_indc = load_dataset(batch_size=256)

# Model == ResNet

In [5]:
from tensorflow.keras import layers, models

# ----- Residual blocks -----

def basic_block(x, filters, stride=1):
    shortcut = x

    x = layers.Conv2D(filters, 3, stride, padding="same", use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    x = layers.Conv2D(filters, 3, 1, padding="same", use_bias=False)(x)
    x = layers.BatchNormalization()(x)

    if stride != 1 or shortcut.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, 1, stride, use_bias=False)(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Add()([x, shortcut])
    return layers.ReLU()(x)


def bottleneck_block(x, filters, stride=1):
    shortcut = x
    f = filters

    x = layers.Conv2D(f, 1, use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    x = layers.Conv2D(f, 3, stride, padding="same", use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    x = layers.Conv2D(f*4, 1, use_bias=False)(x)
    x = layers.BatchNormalization()(x)

    if stride != 1 or shortcut.shape[-1] != f*4:
        shortcut = layers.Conv2D(f*4, 1, stride, use_bias=False)(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Add()([x, shortcut])
    return layers.ReLU()(x)


# ----- Builder -----

def build_spatial_core(num_classes=8, depth=18):
    cfg = {
        18: ([2,2,2,2], basic_block),
        34: ([3,4,6,3], basic_block),
        50: ([3,4,6,3], bottleneck_block),
        101:([3,4,23,3], bottleneck_block),
    }

    layers_per_stage, block = cfg[depth]

    inp = layers.Input((None,None,3))

    x = layers.Conv2D(64,7,2,padding="same",use_bias=False)(inp)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling2D(3,2,padding="same")(x)

    filters = [64,128,256,512]

    for stage, (n,f) in enumerate(zip(layers_per_stage, filters)):
        for i in range(n):
            stride = 2 if stage>0 and i==0 else 1
            x = block(x,f,stride)

    # FCN head
    x = layers.Conv2D(num_classes,1)(x)

    return models.Model(inp,x,name=f"resnet{depth}_spatial_core")


In [6]:
def build_training_model(spatial_core, num_classes=8):

    inp = layers.Input((224,224,3))
    x = spatial_core(inp)
    x = layers.GlobalAveragePooling2D()(x)

    out = layers.Activation("softmax")(x)

    return models.Model(inp,out)


In [12]:
spatial_core = build_spatial_core(num_classes=len([1,2,3,4,5]),depth=50)   # 18 / 34 / 50 / 101


# spatial_core = build_spatial_core(num_classes=len(train_gen_class_indc))
training_model = build_training_model(spatial_core, num_classes=len([1,2,3,4,5]))

In [13]:
spatial_core.summary()

In [14]:
training_model.summary()

In [15]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

callbacks = [
    ModelCheckpoint(
        filepath="best_AlexNet.keras",
        monitor="val_loss",
        save_best_only=True,
        save_weights_only=False,
        verbose=1
    ),
    EarlyStopping(
        monitor="val_loss",
        patience=10,
        restore_best_weights=True,
        verbose=1
    ),
    ReduceLROnPlateau(
        monitor="val_loss",
        factor=0.3,
        patience=7,
        min_lr=1e-7,
        verbose=1
    )
]


In [16]:
training_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=2e-4),
    loss="categorical_crossentropy",
    metrics=["accuracy" ]
)


In [None]:
# ensuring model fits on GPU
history = training_model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=1,
    callbacks=callbacks
)

In [None]:
# Training Model 
history = training_model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=100,
    callbacks=callbacks
)

In [None]:
spatial_core.summary()

In [None]:
# assume these already exist
# train_model   -> model used for training / classification
# spatial_core  -> sliding FCN (same weights)
# history       -> returned by model.fit(...)
# val_gen       -> validation ImageDataGenerator
# class_indices -> from train_gen.class_indices

from test_model import ModelTester

tester = ModelTester(model=training_model,history=history,val_gen=val_gen,class_indices=train_gen_class_indc)

# training diagnostics
tester.plot_history()
tester.plot_confmat()

# spatial diagnostics (ONE image, all plots)
tester.show_all(spatial_core)


In [None]:
tester.show_all(spatial_core)