In [2]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
from tensorflow.keras.datasets import cifar10, cifar100
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import seaborn as sns
import cv2
import time
import pandas as pd
import os

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Check GPU availability
print("TensorFlow version:", tf.__version__)
print("GPU Available:", tf.config.list_physical_devices('GPU'))

# Set memory growth to avoid OOM errors
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    for device in physical_devices:
        tf.config.experimental.set_memory_growth(device, True)
    print("Memory growth set to True")

# Function to load and preprocess CIFAR datasets
def load_and_preprocess_data(dataset='cifar10', validation_split=0.1):
    """
    Loads CIFAR-10 or CIFAR-100, creates a validation split,
    normalizes pixel values, and returns data and one-hot labels.
    """
    # Load the dataset
    if dataset == 'cifar10':
        (x_train, y_train), (x_test, y_test) = cifar10.load_data()
        num_classes = 10
    else:  # cifar100
        (x_train, y_train), (x_test, y_test) = cifar100.load_data(label_mode='fine')
        num_classes = 100

    # Create a validation set
    val_size = int(len(x_train) * validation_split)
    indices = np.random.permutation(len(x_train))
    train_indices, val_indices = indices[val_size:], indices[:val_size]

    x_val, y_val = x_train[val_indices], y_train[val_indices]
    x_train, y_train = x_train[train_indices], y_train[train_indices]

    # Convert data to float32 and normalize
    # IMPROVED: Use mean and std normalization instead of just dividing by 255
    x_train = x_train.astype('float32') / 255.0
    x_val = x_val.astype('float32') / 255.0
    x_test = x_test.astype('float32') / 255.0

    # Calculate mean and std for better normalization
    mean = np.mean(x_train, axis=(0, 1, 2))
    std = np.std(x_train, axis=(0, 1, 2))

    # Apply normalization with mean and std
    x_train = (x_train - mean) / (std + 1e-7)
    x_val = (x_val - mean) / (std + 1e-7)
    x_test = (x_test - mean) / (std + 1e-7)

    # Print data ranges to verify normalization
    print(f"Training data range: {x_train.min():.4f} to {x_train.max():.4f}")
    print(f"Validation data range: {x_val.min():.4f} to {x_val.max():.4f}")
    print(f"Test data range: {x_test.min():.4f} to {x_test.max():.4f}")

    # Convert labels to one-hot encoding
    y_train = to_categorical(y_train, num_classes)
    y_val = to_categorical(y_val, num_classes)
    y_test = to_categorical(y_test, num_classes)

    print(f'Dataset: {dataset}')
    print(f'Training set shape: {x_train.shape}, {y_train.shape}')
    print(f'Validation set shape: {x_val.shape}, {y_val.shape}')
    print(f'Test set shape: {x_test.shape}, {y_test.shape}')

    return x_train, y_train, x_val, y_val, x_test, y_test, num_classes

# Create a data augmentation generator with improved augmentation parameters
def create_data_generator():
    """
    Returns an ImageDataGenerator with more diverse but not extreme augmentations
    """
    return ImageDataGenerator(
        rotation_range=10,        # Reduced from 15 to avoid too extreme transformations
        width_shift_range=0.1,
        height_shift_range=0.1,
        horizontal_flip=True,
        zoom_range=0.1,
        shear_range=0.05,         # Reduced from 0.1
        brightness_range=[0.9, 1.1],  # Less extreme brightness changes
        fill_mode='nearest'
    )

# Improved residual block function with BN->ReLU->Conv order (pre-activation)
def residual_block(x, filters, kernel_size=3, strides=1, block_name='res_block'):
    """
    Builds an improved residual block with pre-activation and identity mappings
    following ResNet v2 design principles.
    """
    # Save input for the skip connection
    shortcut = x
    input_filters = x.shape[-1]

    # Pre-activation (BN -> ReLU -> Conv) for better gradient flow
    x = layers.BatchNormalization(name=f'{block_name}_bn1')(x)
    x = layers.Activation('relu', name=f'{block_name}_relu1')(x)

    # First convolution
    x = layers.Conv2D(
        filters,
        kernel_size=kernel_size,
        strides=strides,
        padding='same',
        kernel_initializer='he_normal',
        name=f'{block_name}_conv1'
    )(x)

    # Second pre-activation
    x = layers.BatchNormalization(name=f'{block_name}_bn2')(x)
    x = layers.Activation('relu', name=f'{block_name}_relu2')(x)

    # Second convolution
    x = layers.Conv2D(
        filters,
        kernel_size=kernel_size,
        padding='same',
        kernel_initializer='he_normal',
        name=f'{block_name}_conv2'
    )(x)

    # Handle shortcut connection if dimensions change
    if strides > 1 or input_filters != filters:
        shortcut = layers.Conv2D(
            filters,
            kernel_size=1,
            strides=strides,
            padding='same',
            kernel_initializer='he_normal',
            name=f'{block_name}_shortcut_conv'
        )(shortcut)

    # Add the shortcut to the output (identity or projection shortcut)
    x = layers.Add(name=f'{block_name}_add')([shortcut, x])

    return x

# Build improved 5-block ResNet architecture
def build_improved_resnet(input_shape, num_classes):
    """
    Builds an improved 5-block ResNet architecture with better gradient flow
    and regularization.
    """
    # Input layer
    inputs = layers.Input(shape=input_shape, name='input_layer')

    # Initial processing
    x = layers.Conv2D(
        64, kernel_size=3, padding='same',
        kernel_initializer='he_normal',
        name='initial_conv'
    )(inputs)
    x = layers.BatchNormalization(name='initial_bn')(x)
    x = layers.Activation('relu', name='initial_relu')(x)

    # Block 1 - 64 filters (no downsampling)
    x = residual_block(x, filters=64, kernel_size=3, block_name='block1_res1')
    # Reduced dropout rate for early layers
    x = layers.Dropout(0.05, name='block1_dropout')(x)

    # Block 2 - 128 filters (with downsampling)
    x = residual_block(x, filters=128, kernel_size=3, strides=2, block_name='block2_res1')
    x = layers.Dropout(0.1, name='block2_dropout')(x)

    # Block 3 - 256 filters (with downsampling)
    x = residual_block(x, filters=256, kernel_size=3, strides=2, block_name='block3_res1')
    x = layers.Dropout(0.2, name='block3_dropout')(x)

    # Block 4 - 512 filters (with downsampling)
    x = residual_block(x, filters=512, kernel_size=3, strides=2, block_name='block4_res1')
    x = layers.Dropout(0.25, name='block4_dropout')(x)

    # Block 5 - 512 filters (no downsampling)
    x = residual_block(x, filters=512, kernel_size=3, block_name='block5_res1')
    x = layers.Dropout(0.3, name='block5_dropout')(x)

    # Global pooling and classification layers
    x = layers.GlobalAveragePooling2D(name='global_pool')(x)

    # Additional dense layer with L2 regularization
    x = layers.Dense(
        512,
        kernel_initializer='he_normal',
        kernel_regularizer=tf.keras.regularizers.l2(1e-4),
        name='dense1'
    )(x)
    x = layers.BatchNormalization(name='bn_dense')(x)
    x = layers.Activation('relu', name='relu_dense')(x)
    x = layers.Dropout(0.5, name='dropout_dense')(x)

    # Output layer
    outputs = layers.Dense(
        num_classes,
        activation='softmax',
        kernel_initializer='glorot_uniform',
        name='output'
    )(x)

    model = models.Model(inputs=inputs, outputs=outputs, name='cifar_resnet_5block_improved')
    return model

# Improved learning rate scheduler with gentler decay
def lr_scheduler(epoch, initial_lr=0.001, warmup_epochs=5):
    """
    Creates a learning rate schedule with warmup and gentler decay.
    """
    # Warmup phase
    if epoch < warmup_epochs:
        return initial_lr * ((epoch + 1) / warmup_epochs)

    # Decay phase - using cosine decay instead of step decay
    decay_epochs = 50  # Total expected training epochs
    decay_rate = 0.5 * (1 + np.cos(np.pi * (epoch - warmup_epochs) / (decay_epochs - warmup_epochs)))
    return initial_lr * decay_rate

# Improved training function
def train_model(dataset='cifar10', batch_size=128, epochs=100, fine_tune=False, model_path=None, save_dir='.'):
    """
    Trains or fine-tunes the improved ResNet model with better learning dynamics.
    """
    start_time = time.time()

    # Make sure save directory exists
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    # Load and preprocess data
    x_train, y_train, x_val, y_val, x_test, y_test, num_classes = load_and_preprocess_data(dataset)

    print("Data Statistics:")
    print(f"Training samples: {x_train.shape[0]}")
    print(f"Validation samples: {x_val.shape[0]}")
    print(f"Test samples: {x_test.shape[0]}")
    print(f"Data range: {x_train.min():.4f} to {x_train.max():.4f}")

    # Get class names
    if dataset == 'cifar10':
        class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
                       'dog', 'frog', 'horse', 'ship', 'truck']
    else:  # cifar100
        class_names = get_cifar100_class_names()

    # Build or load model
    if fine_tune and model_path and os.path.exists(model_path):
        print(f"Loading model from {model_path} for fine-tuning")
        try:
            model = models.load_model(model_path, compile=False)

            # If moving from cifar10 to cifar100, replace the output layer
            if dataset == 'cifar100' and model.output_shape[-1] != num_classes:
                print(f"Replacing output layer to match {num_classes} classes")
                x = model.layers[-2].output
                new_output = layers.Dense(
                    num_classes,
                    activation='softmax',
                    kernel_initializer='glorot_uniform',
                    name='new_output'
                )(x)
                model = models.Model(inputs=model.input, outputs=new_output)

        except Exception as e:
            print(f"Error during model loading: {e}")
            print("Building a new model instead...")
            model = build_improved_resnet(x_train.shape[1:], num_classes)
    else:
        print("Building a new improved ResNet model...")
        model = build_improved_resnet(x_train.shape[1:], num_classes)

    # Higher initial learning rate for better exploration
    initial_learning_rate = 0.01

    # Learning rate schedule function - use partial to pass the initial_lr
    from functools import partial
    lr_schedule = partial(
        lr_scheduler,
        initial_lr=initial_learning_rate,
        warmup_epochs=5
    )

    # Compile model with label smoothing
    loss_fn = tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1)

    # Use SGD with momentum instead of Adam for better generalization
    optimizer = optimizers.SGD(
        learning_rate=initial_learning_rate,
        momentum=0.9,
        nesterov=True
    )

    model.compile(
        optimizer=optimizer,
        loss=loss_fn,
        metrics=['accuracy']
    )

    model.summary()

    print(f"\nTraining {dataset} with improved 5-block ResNet")
    print(f"Batch size: {batch_size}, Epochs: {epochs}")
    print(f"Initial learning rate: {initial_learning_rate}")

    # Callbacks
    callbacks_list = [
        callbacks.LearningRateScheduler(lr_schedule, verbose=1),
        callbacks.EarlyStopping(
            monitor='val_accuracy',
            patience=15,  # Increased patience
            restore_best_weights=True,
            verbose=1
        ),
        callbacks.ModelCheckpoint(
            filepath=os.path.join(save_dir, f'best_model_{dataset}_improved.keras'),
            monitor='val_accuracy',
            save_best_only=True,
            verbose=1
        ),
        # Add ReduceLROnPlateau for adaptive learning rate reduction
        callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-6,
            verbose=1
        )
    ]

    # Data generator with augmentation
    datagen = create_data_generator()
    datagen.fit(x_train)

    # Training
    print("\nStarting training...")
    history = model.fit(
        datagen.flow(x_train, y_train, batch_size=batch_size),
        epochs=epochs,
        validation_data=(x_val, y_val),
        callbacks=callbacks_list,
        verbose=1
    )

    training_time = time.time() - start_time
    print(f"\nTraining completed in {training_time/60:.2f} minutes")

    # Evaluate on test set
    print("\nEvaluating model on test set...")
    test_loss, test_acc = model.evaluate(x_test, y_test, verbose=1)
    print(f"Test accuracy: {test_acc:.4f}")

    # Generate predictions
    y_pred = model.predict(x_test, verbose=1)
    y_true_classes = np.argmax(y_test, axis=1)
    y_pred_classes = np.argmax(y_pred, axis=1)

    # Classification report
    print("\nClassification Report:")
    if dataset == 'cifar100':
        from collections import Counter
        most_common_classes = [cls for cls, _ in Counter(y_true_classes).most_common(20)]
        selected_class_names = [class_names[i] for i in most_common_classes]
        mask = np.isin(y_true_classes, most_common_classes)
        if np.any(mask):
            y_true_filtered = y_true_classes[mask]
            y_pred_filtered = y_pred_classes[mask]
            print(classification_report(
                y_true_filtered,
                y_pred_filtered,
                labels=most_common_classes,
                target_names=selected_class_names
            ))
    else:
        print(classification_report(y_true_classes, y_pred_classes, target_names=class_names))

    # Save model summary and metrics
    with open(os.path.join(save_dir, f'model_summary_{dataset}_improved.txt'), 'w') as f:
        model.summary(print_fn=lambda x: f.write(x + '\n'))

        f.write(f"\nDataset: {dataset}\n")
        f.write(f"Model: Improved 5-Block ResNet\n")
        f.write(f"Training time: {training_time/60:.2f} minutes\n\n")
        f.write(f"Test accuracy: {test_acc:.4f}\n")

    return model, history

# Get CIFAR-100 class names
def get_cifar100_class_names():
    return [
        'apple', 'aquarium_fish', 'baby', 'bear', 'beaver', 'bed', 'bee', 'beetle',
        'bicycle', 'bottle', 'bowl', 'boy', 'bridge', 'bus', 'butterfly', 'camel',
        'can', 'castle', 'caterpillar', 'cattle', 'chair', 'chimpanzee', 'clock',
        'cloud', 'cockroach', 'couch', 'crab', 'crocodile', 'cup', 'dinosaur',
        'dolphin', 'elephant', 'flatfish', 'forest', 'fox', 'girl', 'hamster',
        'house', 'kangaroo', 'keyboard', 'lamp', 'lawn_mower', 'leopard', 'lion',
        'lizard', 'lobster', 'man', 'maple_tree', 'motorcycle', 'mountain', 'mouse',
        'mushroom', 'oak_tree', 'orange', 'orchid', 'otter', 'palm_tree', 'pear',
        'pickup_truck', 'pine_tree', 'plain', 'plate', 'poppy', 'porcupine',
        'possum', 'rabbit', 'raccoon', 'ray', 'road', 'rocket', 'rose',
        'sea', 'seal', 'shark', 'shrew', 'skunk', 'skyscraper', 'snail', 'snake',
        'spider', 'squirrel', 'streetcar', 'sunflower', 'sweet_pepper', 'table',
        'tank', 'telephone', 'television', 'tiger', 'tractor', 'train', 'trout',
        'tulip', 'turtle', 'wardrobe', 'whale', 'willow_tree', 'wolf', 'woman',
        'worm'
    ]

# Main execution
if __name__ == "__main__":
    # Save directory for models and results
    import os
    save_dir = './CIFAR_CNN_Improved'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    # Train on CIFAR-10
    print("=== Training improved ResNet on CIFAR-10 ===")
    cifar10_model, _ = train_model(
        dataset='cifar10',
        batch_size=128,  # Increased batch size
        epochs=100,      # More epochs with earlier stopping
        save_dir=save_dir
    )

    # Fine-tune on CIFAR-100 using the best model from CIFAR-10 training
    print("\n=== Fine-tuning on CIFAR-100 ===")
    best_model_path = os.path.join(save_dir, 'best_model_cifar10_improved.keras')

    if os.path.exists(best_model_path):
        print(f"Using best model from {best_model_path} for fine-tuning")
        cifar100_model, _ = train_model(
            dataset='cifar100',
            batch_size=128,
            epochs=100,
            fine_tune=True,
            model_path=best_model_path,
            save_dir=save_dir
        )
    else:
        print(f"Best model file {best_model_path} not found. Training CIFAR-100 from scratch.")
        cifar100_model, _ = train_model(
            dataset='cifar100',
            batch_size=128,
            epochs=100,
            save_dir=save_dir
        )

    print("Training and evaluation completed!")

TensorFlow version: 2.18.0
GPU Available: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Memory growth set to True
=== Training improved ResNet on CIFAR-10 ===
Training data range: -1.9896 to 2.1261
Validation data range: -1.9896 to 2.1261
Test data range: -1.9896 to 2.1261
Dataset: cifar10
Training set shape: (45000, 32, 32, 3), (45000, 10)
Validation set shape: (5000, 32, 32, 3), (5000, 10)
Test set shape: (10000, 32, 32, 3), (10000, 10)
Data Statistics:
Training samples: 45000
Validation samples: 5000
Test samples: 10000
Data range: -1.9896 to 2.1261
Building a new improved ResNet model...



Training cifar10 with improved 5-block ResNet
Batch size: 128, Epochs: 100
Initial learning rate: 0.01

Starting training...

Epoch 1: LearningRateScheduler setting learning rate to 0.002.
Epoch 1/100
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 140ms/step - accuracy: 0.2064 - loss: 2.5283
Epoch 1: val_accuracy improved from -inf to 0.37200, saving model to ./CIFAR_CNN_Improved/best_model_cifar10_improved.keras
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m75s[0m 152ms/step - accuracy: 0.2065 - loss: 2.5278 - val_accuracy: 0.3720 - val_loss: 1.9169 - learning_rate: 0.0020

Epoch 2: LearningRateScheduler setting learning rate to 0.004.
Epoch 2/100
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 99ms/step - accuracy: 0.3233 - loss: 2.0567
Epoch 2: val_accuracy improved from 0.37200 to 0.43400, saving model to ./CIFAR_CNN_Improved/best_model_cifar10_improved.keras
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 102ms


=== Fine-tuning on CIFAR-100 ===
Using best model from ./CIFAR_CNN_Improved/best_model_cifar10_improved.keras for fine-tuning
Training data range: -1.8959 to 2.0253
Validation data range: -1.8959 to 2.0253
Test data range: -1.8959 to 2.0253
Dataset: cifar100
Training set shape: (45000, 32, 32, 3), (45000, 100)
Validation set shape: (5000, 32, 32, 3), (5000, 100)
Test set shape: (10000, 32, 32, 3), (10000, 100)
Data Statistics:
Training samples: 45000
Validation samples: 5000
Test samples: 10000
Data range: -1.8959 to 2.0253
Loading model from ./CIFAR_CNN_Improved/best_model_cifar10_improved.keras for fine-tuning
Replacing output layer to match 100 classes



Training cifar100 with improved 5-block ResNet
Batch size: 128, Epochs: 100
Initial learning rate: 0.01

Starting training...


  self._warn_if_super_not_called()



Epoch 1: LearningRateScheduler setting learning rate to 0.002.
Epoch 1/100
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 128ms/step - accuracy: 0.0375 - loss: 4.7374
Epoch 1: val_accuracy improved from -inf to 0.16200, saving model to ./CIFAR_CNN_Improved/best_model_cifar100_improved.keras
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m68s[0m 146ms/step - accuracy: 0.0376 - loss: 4.7366 - val_accuracy: 0.1620 - val_loss: 3.7821 - learning_rate: 0.0020

Epoch 2: LearningRateScheduler setting learning rate to 0.004.
Epoch 2/100
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 99ms/step - accuracy: 0.1281 - loss: 3.9302
Epoch 2: val_accuracy improved from 0.16200 to 0.23440, saving model to ./CIFAR_CNN_Improved/best_model_cifar100_improved.keras
[1m352/352[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 102ms/step - accuracy: 0.1281 - loss: 3.9299 - val_accuracy: 0.2344 - val_loss: 3.4519 - learning_rate: 0.0040

Epoch 3: Learning

Training and evaluation completed!
