In [1]:
!pip cache purge

Files removed: 2 (38 kB)


In [2]:
!pip install --upgrade pip



In [5]:
import pickle
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
import datetime
import tensorflow as tf

**DATA LOAD**

In [6]:
data_path = "/kaggle/input/data-preprocessing/"
print(os.listdir(data_path))

['x_test.pkl', 'x_valid.pkl', 'y_train.pkl', '__results__.html', 'x_train.pkl', '__notebook__.ipynb', '__results___files', '__output__.json', 'y_test.pkl', 'label_names.csv', 'y_valid.pkl', 'custom.css']


In [7]:
with open(data_path + "x_train.pkl", "rb") as f:
    x_train = pickle.load(f)

with open(data_path + "y_train.pkl", "rb") as f:
    y_train = pickle.load(f)

with open(data_path + "x_valid.pkl", "rb") as f:
    x_valid = pickle.load(f)

with open(data_path + "y_valid.pkl", "rb") as f:
    y_valid = pickle.load(f)

with open(data_path + "x_test.pkl", "rb") as f:
    x_test = pickle.load(f)

with open(data_path + "y_test.pkl", "rb") as f:
    y_test = pickle.load(f)


print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)

(782901, 32, 32, 1) (782901, 43)
(39690, 32, 32, 1) (39690, 43)
(113670, 32, 32, 1) (113670, 43)


In [8]:
print(f"Min value of x_train dataset: {x_train.min()}")
print(f"Max value of x_train dataset: {x_train.max()}")

print(f"Min value of x_valid dataset: {x_valid.min()}")
print(f"Max value of x_validdataset: {x_valid.max()}")

print(f"Min value of x_test dataset: {x_test.min()}")
print(f"Max value of x_test dataset: {x_test.max()}")

Min value of x_train dataset: -1.9063234329223633
Max value of x_train dataset: 255.0
Min value of x_valid dataset: -1.845461368560791
Max value of x_validdataset: 255.0
Min value of x_test dataset: -1.8758924007415771
Max value of x_test dataset: 255.0


In [9]:
# Read classes
label_path = os.path.join(data_path, 'label_names.csv')
labels = pd.read_csv(label_path)
num_classes = len(labels)

print(labels)

    ClassId                                           SignName
0         0                               Speed limit (20km/h)
1         1                               Speed limit (30km/h)
2         2                               Speed limit (50km/h)
3         3                               Speed limit (60km/h)
4         4                               Speed limit (70km/h)
5         5                               Speed limit (80km/h)
6         6                        End of speed limit (80km/h)
7         7                              Speed limit (100km/h)
8         8                              Speed limit (120km/h)
9         9                                         No passing
10       10       No passing for vehicles over 3.5 metric tons
11       11              Right-of-way at the next intersection
12       12                                      Priority road
13       13                                              Yield
14       14                                            

**MODEL DEFINITION AND HYPERPARAMETER OPTIMIZATION**

In [10]:
def create_cnn_model(learning_rate=0.001):
    """
    Creates the CNN model structure for tuning purposes.
    """
    model = Sequential([
        # Grayscale Input (32, 32, 1)
        Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 1), padding='same'),
        BatchNormalization(),
        Conv2D(32, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),

        Conv2D(64, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),

        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])

    optimizer = Adam(learning_rate=learning_rate)
    
    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model


In [11]:
custom_model = create_cnn_model(learning_rate=0.001)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
I0000 00:00:1758910931.146199     287 gpu_device.cc:2020] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 13942 MB memory:  -> device: 0, name: Tesla T4, pci bus id: 0000:00:04.0, compute capability: 7.5
I0000 00:00:1758910931.146938     287 gpu_device.cc:2020] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 13942 MB memory:  -> device: 1, name: Tesla T4, pci bus id: 0000:00:05.0, compute capability: 7.5


In [None]:
# Parameters to test
learning_rates = [0.0005, 0.0001]
batch_sizes = [32, 64] 
test_epoch_no = 2

results = []

print("\n" + "="*60)
print("STARTING SEQUENTIAL HYPERPARAMETER SWEEP (Resource Safe)")
print("="*60)

# Data Augmentation setup (needed for the sweep)
train_datagen = ImageDataGenerator(rotation_range=15, width_shift_range=0.1, height_shift_range=0.1, zoom_range=0.1)
valid_datagen = ImageDataGenerator()

# Loop through all combinations (runs sequentially)
for lr in learning_rates:
    for bs in batch_sizes:
        print(f"\n--- Trial: LR={lr}, Batch Size={bs} ---")

        # Create a FRESH model instance for each trial
        trial_model = create_cnn_model(learning_rate=lr) 
        
        # Train the model
        history_sweep = trial_model.fit(
            train_datagen.flow(x_train, y_train, batch_size=bs),
            validation_data=valid_datagen.flow(x_valid, y_valid, batch_size=bs),
            epochs=test_epoch_no,
            callbacks=[EarlyStopping(monitor='val_loss', patience=2, verbose=0)], 
            verbose=0
        )

        # Record results
        best_val_acc = max(history_sweep.history['val_accuracy'])
        
        results.append({
            'Learning Rate': lr,
            'Batch Size': bs,
            f'Max Val Acc (in {EPOCHS_TO_TEST} epochs)': best_val_acc
        })
        print(f"  -> Trial complete. Max Validation Accuracy: {best_val_acc:.4f}")


STARTING SEQUENTIAL HYPERPARAMETER SWEEP (Resource Safe)

--- Trial: LR=0.0005, Batch Size=32 ---


  self._warn_if_super_not_called()
I0000 00:00:1758910940.559468     353 device_compiler.h:196] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


In [None]:
results_df = pd.DataFrame(results)
results_df_sorted = results_df.sort_values(by=f'Max Val Acc (in {test_epoch_no} epochs)', ascending=False)

# Extract best parameters
best_params = results_df_sorted.iloc[0]
best_lr = best_params['Learning Rate']
best_batch_size = int(best_params['Batch Size'])

print("\n" + "*"*60)
print(f"Optimal LR found: {best_lr}")
print(f"Optimal Batch Size found: {best_batch_size}")
print("*"*60)

**MODEL TRAINING**

In [None]:
#!rm -rf /kaggle/working

In [None]:
# Use the optimal LR to create the new Adam optimizer instance

optimal_optimizer = Adam(learning_rate=best_lr)

# Re-compile your main model ('custom_model') with the optimized learning rate
custom_model.compile(
    optimizer=optimal_optimizer,
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

custom_model.summary()

In [None]:
# Data augmentation
train_datagen = ImageDataGenerator(
    rotation_range=15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    zoom_range=0.1
)

valid_datagen = ImageDataGenerator()
test_datagen = ImageDataGenerator()

In [None]:
checkpoint_filepath = "best_model.keras"

checkpoint = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor="val_accuracy",
    save_best_only=True,
    save_weights_only=False,
    mode="max",
    verbose=1
)

In [None]:
# TensorBoard log directory
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir=log_dir,
    histogram_freq=1,
    write_graph=True,
    write_images=True
)

callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6, verbose=1),
    checkpoint,
    tensorboard_callback
]

In [None]:
# Check if a checkpoint exists
if os.path.exists(checkpoint_filepath):
    print(f"✅ Checkpoint found at {checkpoint_filepath}. Loading model and continuing training...")
    try:
        # Load the entire model, including optimizer state
        custom_model = load_model(checkpoint_filepath)
        print("Model loaded successfully. Summary:")
        custom_model.summary()
    except Exception as e:
        print(f"❌ Error loading model: {e}. Re-creating model from scratch.")
        # If loading fails (e.g., due to compatibility issues), re-create the model
        custom_model = create_cnn_model(learning_rate=0.0005) 
        custom_model.compile(optimizer=Adam(learning_rate=0.0005), loss='categorical_crossentropy', metrics=['accuracy'])
else:
    print(f"🛑 No checkpoint found at {checkpoint_filepath}. Starting training from scratch.")
    # Re-compile your main model ('custom_model') with the optimized learning rate (already done in your script)
    optimal_optimizer = Adam(learning_rate=0.0005)
    custom_model = create_cnn_model(learning_rate=0.0005) # Ensure it's a fresh model if starting new
    custom_model.compile(
        optimizer=optimal_optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    custom_model.summary()


In [None]:
epochs = 50 

history_final = custom_model.fit(
    train_datagen.flow(x_train, y_train, batch_size=best_batch_size),
    validation_data=valid_datagen.flow(x_valid, y_valid, batch_size=best_batch_size),
    epochs=epochs,
    callbacks=callbacks,
    verbose=1
)

**TRAINING RESULTS**

In [None]:
plt.figure(figsize=(12, 5))

# Accuracy Plot
plt.subplot(1, 2, 1)
plt.plot(history_final.history['accuracy'], label='Training Accuracy')
plt.plot(history_final.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy Graph')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# Loss Plot (Used to evaluate overfitting/underfitting)
plt.subplot(1, 2, 2)
plt.plot(history_final.history['loss'], label='Training Loss')
plt.plot(history_final.history['val_loss'], label='Validation Loss')
plt.title('Loss Graph')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.show()

**MODEL EVALUATION**

In [None]:
# Final Performance on Test Set
test_loss, test_accuracy = custom_model.evaluate(x_test, y_test, verbose=0)
print(f"\nTest Set Accuracy: {test_accuracy:.4f}")
print(f"Test Set Loss: {test_loss:.4f}")

In [None]:
# Confusion Matrix & Classification Report

# Get predictions
y_pred_probs = custom_model.predict(x_test)
y_pred_classes = np.argmax(y_pred_probs, axis=1)
y_true_classes = np.argmax(y_test, axis=1)

print("\n" + "="*50)
print("CLASSIFICATION REPORT")
print("="*50)
print(classification_report(y_true_classes, y_pred_classes, target_names=labels['SignName'].tolist()))

# Confusion Matrix Heatmap
cm = confusion_matrix(y_true_classes, y_pred_classes)
plt.figure(figsize=(16, 14))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=labels['SignName'].tolist(), 
            yticklabels=labels['SignName'].tolist())
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

In [None]:
# Grad-CAM Heatmap

def gradcam(img, model, layer_name):
    # Add batch dimension
    img_array = np.expand_dims(img, axis=0)

    # Build grad model
    grad_model = tf.keras.models.Model(
        [model.inputs], 
        [model.get_layer(layer_name).output, model.output]
    )

    # Forward + backward pass
    with tf.GradientTape() as tape:
        conv_out, preds = grad_model(img_array)
        class_idx = tf.argmax(preds[0])
        loss = preds[:, class_idx]

    grads = tape.gradient(loss, conv_out)[0]
    weights = tf.reduce_mean(grads, axis=(0, 1))
    cam = tf.reduce_sum(tf.multiply(weights, conv_out[0]), axis=-1)

    # Normalize
    cam = np.maximum(cam, 0) / (cam.max() + 1e-8)
    cam = cv2.resize(cam.numpy(), (img.shape[1], img.shape[0]))
    return cam

def show_gradcam(img, cam, label, alpha=0.4):
    heatmap = cm.jet(cam)[:, :, :3]
    if img.ndim == 2 or img.shape[-1] == 1:  
        img_rgb = np.repeat(img, 3, axis=-1)
    else:
        img_rgb = img
    overlay = np.clip(img_rgb + alpha * heatmap, 0, 1)

    plt.figure(figsize=(6,3))
    plt.subplot(1,2,1); plt.imshow(img_rgb); plt.title(f"True: {label}"); plt.axis("off")
    plt.subplot(1,2,2); plt.imshow(overlay); plt.title("Grad-CAM"); plt.axis("off")
    plt.show()

# Example usage
idx = np.random.randint(len(x_test))
img, label = x_test[idx], labels['SignName'].iloc[np.argmax(y_test[idx])]
cam = gradcam(img, custom_model, "conv2d_3")
show_gradcam(img, cam, label)