# Task 4: Game Title Detection

Karena Thailand operates five distinct games but can't automatically identify which game is being played from screenshots. The customer support team receives 2,000 daily tickets with images and wastes 10+ minutes per ticket figuring out which game before routing to specialists. The "Play Multiple Games for Rewards" campaign can't verify 50,000+ screenshot submissions, forcing manual checks of only 3% (leading to fraud). Players post 100,000+ gameplay images weekly across platforms that can't be auto-tagged for contests or moderation. Marketing can't track which games are trending without manual review. An automated detection system would enable faster support, verified campaigns, scalable content moderation, and cross-game insights.

Since this task used up lots of the resources and wouldn't complete in just one run time so the code you will see onward might be lots of redundant statement just to decrease only my confusion during the working process, but i try to improve the readbility and code format so far. 

you can find all the model loaded through out this notebook in this gg drive:


https://drive.google.com/file/d/14cxNf-pA3Xe1jWeFH3qrFUhnYvqJEYbu/view?usp=sharing

# Step 1: File Prep

In [None]:
# Install Kaggle API client
! pip install -q kaggle

In [None]:
from google.colab import files

# Upload Kaggle API credentials file (kaggle.json)
uploaded = files.upload()

In [None]:
# Create a .kaggle directory if it doesn't exist
!mkdir -p ~/.kaggle

# Copy the uploaded kaggle.json to the .kaggle directory
! cp kaggle.json ~/.kaggle/

In [None]:
# Set appropriate permissions for the kaggle.json file
!chmod 600 /root/.kaggle/kaggle.json

In [None]:
# Download the competition dataset from Kaggle
!kaggle competitions download -c cpe342-karena

In [None]:
# Unzip the downloaded dataset into a new directory
!unzip cpe342-karena.zip -d /content/cpe342-karena

In [None]:
# List the contents of the 'task4' directory to verify the unzipped files
# Note: The output indicates 'task4' is not directly under cpe342-karena, but under public_dataset/task4.
!ls -R /content/cpe342-karena/public_dataset/task4

In [None]:
! pip install -q tensorflow_hub keras_cv keras-hub

# Step 2: EDA

In [None]:
import pandas as pd

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# Load the training, testing, and validation dataframes
train_df = pd.read_csv(f'{dataset_base_path}/train.csv')
test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv')
val_df = pd.read_csv(f'{dataset_base_path}/val.csv')

# Print the shapes of the loaded dataframes to confirm successful loading
print("train_df loaded with shape:", train_df.shape)
print("test_df loaded with shape:", test_df.shape)
print("val_df loaded with shape:", val_df.shape)

In [None]:
print("First 5 rows of train_df:")
print(train_df.head())
print("\nColumn information for train_df:")
print(train_df.info())

In [None]:
print("First 5 rows of test_df:")
print(test_df.head())
print("\nColumn information for test_df:")
print(test_df.info())

In [None]:
print("First 5 rows of val_df:")
print(val_df.head())
print("\nColumn information for val_df:")
print(val_df.info())

In [None]:
print("Label counts for train_df:")
print(train_df['label'].value_counts().sort_index())

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# Re-load train_df to ensure it's defined (redundant if previous cell ran, but good for standalone execution)
train_df = pd.read_csv(f'{dataset_base_path}/train.csv')

# Set the figure size for the plot
plt.figure(figsize=(8, 6))

# Create a bar plot of label distribution
sns.barplot(
    x=train_df['label'].value_counts().index,
    y=train_df['label'].value_counts().values,
    palette='viridis'
)

# Add title and labels for clarity
plt.title('Label Distribution in Training Set')
plt.xlabel('Label')
plt.ylabel('Count')

# Display the plot
plt.show()

In [None]:
print(train_df['label'].value_counts().values)

In [None]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import random
import os

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# Get the directory where the training images are stored
image_dir = f'{dataset_base_path}/train'

# Select a few random images from the training DataFrame
sample_images = train_df.sample(n=25, random_state=42) # Using random_state for reproducibility

# Set up a figure to display the images
plt.figure(figsize=(15, 10))

# Iterate through the sample images and display them
for i, row in enumerate(sample_images.iterrows()):
    file_name = row[1]['file_name']
    label = row[1]['label']
    image_path = os.path.join(image_dir, file_name)

    # Create a subplot for each image (5 rows, 5 columns)
    plt.subplot(5, 5, i + 1)

    # Read and display the image
    img = mpimg.imread(image_path)
    plt.imshow(img)

    # Add label as title and turn off axes
    plt.title(f"Label: {label}")
    plt.axis('off')

# Adjust layout to prevent titles/labels from overlapping
plt.tight_layout()
plt.show()

# VGG 16

# Step 3: Train Model

## Initial

In [None]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import ModelCheckpoint
import numpy as np
import cv2
import os
import albumentations as A

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# Define AlbumentationsSequence class for data loading and augmentation
class AlbumentationsSequence(tf.keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, transform, shuffle=True):
        self.df = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.transform = transform
        self.shuffle = shuffle
        self.indices = np.arange(len(self.df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        # Get indices for the current batch
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]

        batch_images = []
        batch_labels = []

        for i in batch_indices:
            row = self.df.iloc[i]
            file_name = row["file_name"]
            label = int(row["label"])

            # Construct image path and load image
            img_path = os.path.join(self.image_dir, file_name)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            # Apply transformations if provided
            if self.transform:
                augmented = self.transform(image=img)
                img = augmented["image"]

            batch_images.append(img)
            batch_labels.append(label)

        # Convert lists to numpy arrays
        batch_x = np.stack(batch_images, axis=0).astype("float32")
        batch_y = np.array(batch_labels, dtype="int32")  # For sparse_categorical_crossentropy

        return batch_x, batch_y

# Define ImageNet mean and standard deviation for normalization
imagenet_mean = (0.485, 0.456, 0.406)
imagenet_std  = (0.229, 0.224, 0.225)

# Define training image transformations with data augmentation
train_transform = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Affine(
    scale=(1-0.1, 1+0.1),
    translate_percent=(0.05, 0.05),
    rotate=(-15, 15),
    p=0.5),
    A.GaussNoise(p=0.3),
    A.Normalize(mean=imagenet_mean, std=imagenet_std),
])

# Define validation image transformations (resizing and normalization only)
val_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=imagenet_mean, std=imagenet_std),
])

# 1. Reload the dataframes for consistency
try:
    train_df = pd.read_csv(f'{dataset_base_path}/train.csv')
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv')
    val_df = pd.read_csv(f'{dataset_base_path}/val.csv')
    print("DataFrames reloaded.")
except FileNotFoundError as e:
    print(f"Error reloading dataframes: {e}. Please ensure the dataset is unzipped and available at '{dataset_base_path}/'.")

# Define image directories using absolute paths
train_image_dir = f'{dataset_base_path}/train'
val_image_dir = f'{dataset_base_path}/val'

# Define batch size
BATCH_SIZE = 32

# 2. Calculate class weights to handle potential class imbalance
if 'train_df' in locals() and not train_df.empty:
    class_counts = train_df['label'].value_counts().sort_index().values.astype(np.float32)
    num_classes = len(class_counts)
    total = class_counts.sum()
    class_weights_array = total / (num_classes * class_counts)
    class_weight = {i: float(w) for i, w in enumerate(class_weights_array)}
    print("Class weights calculated:", class_weight)
else:
    print("train_df is not loaded, cannot calculate class weights. Using dummy weights.")
    class_weight = {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0}

# Re-instantiate data generators to use the reloaded dataframes and absolute paths
if 'train_df' in locals() and 'val_df' in locals() and not train_df.empty and not val_df.empty:
    train_gen = AlbumentationsSequence(
        train_df, train_image_dir,
        batch_size=BATCH_SIZE,
        transform=train_transform,
        shuffle=True
    )
    val_gen = AlbumentationsSequence(
        val_df, val_image_dir,
        batch_size=BATCH_SIZE,
        transform=val_transform,
        shuffle=False
    )
    print("Data generators re-initialized.")
else:
    print("Cannot re-initialize data generators due to missing dataframes.")

NUM_CLASSES = 5

# 3. Re-initialize the VGG16 base model, excluding the top classification layer
base_model = VGG16(
    include_top=False,
    weights='imagenet',
    input_shape=(224, 224, 3)
)
base_model.trainable = False  # Freeze the base model to train only the new top layers initially
print("VGG16 base model re-initialized.")

# 4. Create the full model by adding a custom classification head
inputs = layers.Input(shape=(224, 224, 3))
x = base_model(inputs, training=False) # Pass inputs through the frozen base model
x = layers.GlobalAveragePooling2D()(x) # Add a Global Average Pooling layer
x = layers.Dropout(0.3)(x)             # Add a Dropout layer for regularization
outputs = layers.Dense(NUM_CLASSES, activation='softmax')(x) # Output Dense layer with softmax activation

model = models.Model(inputs, outputs)

# 5. Compile the model
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

print("Model rebuilt and compiled. Summary:")
model.summary()

# 6. Instantiate ModelCheckpoint to save the best model based on validation loss
checkpoint_callback = ModelCheckpoint(
    filepath='vgg16_initial_best.keras',
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    save_weights_only=False,
    verbose=1
)

# 7. Train the model for an initial number of epochs
EPOCHS_INITIAL = 15

if 'train_gen' in locals() and 'val_gen' in locals():
    print(f"Training the initial model for {EPOCHS_INITIAL} epochs...")
    history = model.fit(
        train_gen,
        validation_data=val_gen,
        epochs=EPOCHS_INITIAL,
        class_weight=class_weight,
        callbacks=[checkpoint_callback]
    )
    print("Initial model training complete and best model saved to 'vgg16_initial_best.keras'.")
else:
    print("Skipping model training due to uninitialized data generators.")

## Fine Tune

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint

# 1. Load the pre-trained model named 'vgg16_initial_best.keras'
model = tf.keras.models.load_model('vgg16_initial_best.keras')
print("Loaded model 'vgg16_initial_best.keras'.")

# Find the VGG16 base model layer within the loaded model
vgg16_base_model = None
for layer in model.layers:
    if 'vgg16' in layer.name.lower(): # Check if the layer name contains 'vgg16'
        vgg16_base_model = layer
        break

if vgg16_base_model is None:
    raise ValueError("VGG16 base model not found as a layer in the loaded model.")

print(f"Identified VGG16 base model layer: {vgg16_base_model.name}")

# 2. Set the base model's `trainable` attribute to `True` to allow unfreezing of its sub-layers
vgg16_base_model.trainable = True

# 3. Iterate through the layers of the VGG16 base model and selectively unfreeze
#    Unfreezing blocks 3, 4, and 5 for fine-tuning
for layer in vgg16_base_model.layers:
    if any(block_name in layer.name for block_name in ['block3', 'block4', 'block5']):
        layer.trainable = True
    else:
        layer.trainable = False

print("Selective unfreezing applied to VGG16 convolutional blocks 3, 4, and 5.")

# 4. Recompile the entire model with a lower learning rate for fine-tuning
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), # Use a smaller LR for fine-tuning
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

print("Model recompiled with Adam optimizer (LR=1e-5).")
model.summary()

# 5. Create a ModelCheckpoint callback for saving the best fine-tuned model
fine_tune_checkpoint_callback = ModelCheckpoint(
    filepath='vgg16_fine_tuned_best.keras',
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    save_weights_only=False,
    verbose=1
)
print("ModelCheckpoint for fine-tuning initialized ('vgg16_fine_tuned_best.keras').")

# 6. Continue training the model for additional epochs (fine-tuning phase)
EPOCHS_FINE_TUNE = 10

# Check if data generators are initialized before starting training
if 'train_gen' in locals() and 'val_gen' in locals():
    print(f"Continuing training the fine-tuned model for {EPOCHS_FINE_TUNE} epochs...")
    fine_history = model.fit(
        train_gen,
        validation_data=val_gen,
        epochs=EPOCHS_FINE_TUNE,
        class_weight=class_weight,
        callbacks=[fine_tune_checkpoint_callback]
    )
    print("Fine-tuned model training complete and best model saved to 'vgg16_fine_tuned_best.keras'.")
else:
    print("Skipping fine-tuning due to uninitialized data generators.")

## Cross-Validation

In [None]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, classification_report
import numpy as np
import os
import cv2
import albumentations as A

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# Define AlbumentationsSequence class for data loading and augmentation
class AlbumentationsSequence(tf.keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, transform, shuffle=True):
        self.df = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.transform = transform
        self.shuffle = shuffle
        self.indices = np.arange(len(self.df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        # Get indices for the current batch
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]

        batch_images = []
        batch_labels = []

        for i in batch_indices:
            row = self.df.iloc[i]
            file_name = row["file_name"]
            label = int(row["label"])

            # Construct image path and load image
            img_path = os.path.join(self.image_dir, file_name)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            # Apply transformations if provided
            if self.transform:
                augmented = self.transform(image=img)
                img = augmented["image"]

            batch_images.append(img)
            batch_labels.append(label)

        # Convert lists to numpy arrays
        batch_x = np.stack(batch_images, axis=0).astype("float32")
        batch_y = np.array(batch_labels, dtype="int32")  # For sparse_categorical_crossentropy

        return batch_x, batch_y

# Define ImageNet mean and standard deviation for normalization
imagenet_mean = (0.485, 0.456, 0.406)
imagenet_std  = (0.229, 0.224, 0.225)

# Define training image transformations with data augmentation
train_transform = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Affine(
    scale=(1-0.1, 1+0.1),
    translate_percent=(0.05, 0.05),
    rotate=(-15, 15),
    p=0.5),
    A.GaussNoise(p=0.3),
    A.Normalize(mean=imagenet_mean, std=imagenet_std),
])

# Define validation image transformations (resizing and normalization only)
val_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=imagenet_mean, std=imagenet_std),
])

# Reload the dataframes to ensure they are available
try:
    train_df = pd.read_csv(f'{dataset_base_path}/train.csv')
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv')
    val_df = pd.read_csv(f'{dataset_base_path}/val.csv')
    print("DataFrames reloaded for cross-validation.")
except FileNotFoundError as e:
    print(f"Error reloading dataframes: {e}. Please ensure the dataset is unzipped and available at '{dataset_base_path}/'.")
    # Exit or handle error appropriately if files are missing
    # In a notebook context, we'll just print and continue, assuming previous steps were successful.

# Define image directories
train_image_dir = f'{dataset_base_path}/train'
val_image_dir = f'{dataset_base_path}/val' # For final ensemble validation

BATCH_SIZE = 32
NUM_CLASSES = 5

# Main validation generator for ensemble prediction later
main_val_gen = AlbumentationsSequence(
    val_df, val_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False
)
# True labels for the main validation set
main_y_true = val_df['label'].values
print("Main validation data generator and true labels prepared for ensemble prediction.")

### Initial Cross Validation

In [None]:
# --- Cross-Validation Setup ---
# Number of epochs for each fold during initial CV training
EPOCHS_CV = 5
# Number of splits for StratifiedKFold
K = 3
# Initialize StratifiedKFold for maintaining class distribution across folds
skf = StratifiedKFold(n_splits=K, shuffle=True, random_state=42)

# Prepare data for K-Fold splitting
X_train_cv = train_df['file_name'].values
y_train_cv = train_df['label'].values

# Lists to store results and model paths for later use
fold_f1_scores = []
cv_models_paths = [] # To store paths of best models from each fold

print("\n--- Starting Cross-Validation ---")

# Iterate through each fold
for fold_idx, (train_idx, valid_idx) in enumerate(skf.split(X_train_cv, y_train_cv)):
    print(f"\n=== Fold {fold_idx+1}/{K} ===")

    # a. Create dataframes for the current fold's training and validation sets
    fold_train_df = train_df.iloc[train_idx].reset_index(drop=True)
    fold_valid_df = train_df.iloc[valid_idx].reset_index(drop=True)

    # b. Instantiate data generators for the current fold
    fold_train_gen = AlbumentationsSequence(
        fold_train_df, train_image_dir,
        batch_size=BATCH_SIZE,
        transform=train_transform,
        shuffle=True
    )

    fold_valid_gen = AlbumentationsSequence(
        fold_valid_df, train_image_dir, # Validation images for CV folds are also from 'train' folder
        batch_size=BATCH_SIZE,
        transform=val_transform,
        shuffle=False
    )

    # c. Calculate class weights for the current fold's training data to address imbalance
    fold_counts = fold_train_df['label'].value_counts().sort_index().values.astype('float32')
    total_fold = fold_counts.sum()
    fold_class_weights_array = total_fold / (NUM_CLASSES * fold_counts)
    fold_class_weight = {i: float(w) for i, w in enumerate(fold_class_weights_array)}
    print("Fold class weights:", fold_class_weight)

    # d. Initialize a fresh VGG16 model for this fold
    base_model_cv = VGG16(
        include_top=False,
        weights='imagenet',
        input_shape=(224, 224, 3)
    )
    base_model_cv.trainable = False # Start by training only the new top layers

    # e. Build a new classification head on top of the VGG16 base
    inputs_cv = layers.Input(shape=(224, 224, 3))
    x_cv = base_model_cv(inputs_cv, training=False) # Important: set training=False when using frozen base_model
    x_cv = layers.GlobalAveragePooling2D()(x_cv)
    x_cv = layers.Dropout(0.3)(x_cv)
    outputs_cv = layers.Dense(NUM_CLASSES, activation='softmax')(x_cv)

    cv_model = models.Model(inputs_cv, outputs_cv)

    # f. Compile the cross-validation model
    cv_model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    print("Fresh VGG16 model initialized and compiled for the current fold.")

    # g. ModelCheckpoint for the current fold to save the best model
    checkpoint_filepath = f'vgg16_cv_fold_{fold_idx+1}_best.keras'
    cv_checkpoint_callback = ModelCheckpoint(
        filepath=checkpoint_filepath,
        monitor='val_loss', # Monitor validation loss for best model
        mode='min',
        save_best_only=True,
        save_weights_only=False,
        verbose=1
    )
    cv_models_paths.append(checkpoint_filepath) # Store path for ensembling later

    # h. Train the cross-validation model for a specified number of epochs
    print(f"Training Fold {fold_idx+1} model for {EPOCHS_CV} epochs...")
    cv_model.fit(
        fold_train_gen,
        validation_data=fold_valid_gen,
        epochs=EPOCHS_CV,
        class_weight=fold_class_weight,
        callbacks=[cv_checkpoint_callback],
        verbose=1
    )

    # i. Load the best model from this fold for evaluation
    best_fold_model = tf.keras.models.load_model(checkpoint_filepath)

    # j. Predict on the fold's validation set and convert probabilities to class labels
    fold_probs = best_fold_model.predict(fold_valid_gen)
    fold_pred = np.argmax(fold_probs, axis=1)
    fold_y_true = fold_valid_df['label'].values

    # k. Calculate the macro F1-score for the fold
    fold_f1 = f1_score(fold_y_true, fold_pred, average='macro')
    print(f"Fold {fold_idx+1} F1 (macro) on its validation set: {fold_f1}")
    fold_f1_scores.append(fold_f1)

print("\n--- Cross-Validation Complete ---")
print("CV F1 (macro) per fold:", fold_f1_scores)
print("Mean CV F1 (macro):",

### Cross Validation + Finetune

In [None]:
from google.colab import files

# Upload the saved Keras models from the initial cross-validation stage
uploaded = files.upload()
print("Uploaded files:", list(uploaded.keys()))

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score

# Define paths to the best models from the first cross-validation stage
cv_stage1_paths = [
    'vgg16_cv_fold_1_best.keras',
    'vgg16_cv_fold_2_best.keras',
    'vgg16_cv_fold_3_best.keras',
]

K = len(cv_stage1_paths)  # Number of folds / models
# Re-initialize StratifiedKFold to ensure consistent splits with initial CV
skf = StratifiedKFold(n_splits=K, shuffle=True, random_state=42)

# Prepare training data for K-Fold splitting
X_train_cv = train_df['file_name'].values
y_train_cv = train_df['label'].values

In [None]:
EPOCHS_FINE_TUNE = 3   # Number of epochs for fine-tuning each fold (adjust based on GPU resources and performance)
cv_finetuned_paths = []
cv_finetuned_f1_scores = []

print("\n--- Fine-tuning existing CV models (starting from saved .keras) ---")

# Iterate through each fold's split and its corresponding saved model path
for fold_idx, ((train_idx, valid_idx), ckpt_path) in enumerate(zip(skf.split(X_train_cv, y_train_cv), cv_stage1_paths), start=1):
    print(f"\n=== Fold {fold_idx}/{K} ===")
    print(f"Using checkpoint: {ckpt_path}")

    # a. Recreate fold train/val splits based on the stored indices
    fold_train_df = train_df.iloc[train_idx].reset_index(drop=True)
    fold_valid_df = train_df.iloc[valid_idx].reset_index(drop=True)

    # Instantiate data generators for the current fold's fine-tuning
    fold_train_gen = AlbumentationsSequence(
        fold_train_df, train_image_dir,
        batch_size=BATCH_SIZE,
        transform=train_transform,
        shuffle=True
    )

    fold_valid_gen = AlbumentationsSequence(
        fold_valid_df, train_image_dir,
        batch_size=BATCH_SIZE,
        transform=val_transform,
        shuffle=False
    )

    # b. Calculate class weights for this fold's training data
    fold_counts = fold_train_df['label'].value_counts().sort_index().values.astype('float32')
    total_fold = fold_counts.sum()
    fold_class_weights_array = total_fold / (NUM_CLASSES * fold_counts)
    fold_class_weight = {i: float(w) for i, w in enumerate(fold_class_weights_array)}
    print("Fold class weights:", fold_class_weight)

    # c. Load the existing (frozen-head trained) model for this fold
    model_fold = tf.keras.models.load_model(ckpt_path)
    print("Loaded fold model from checkpoint.")

    # d. Find the VGG16 base model within the loaded model
    vgg16_base = None
    for layer in model_fold.layers:
        if 'vgg16' in layer.name.lower():
            vgg16_base = layer
            break

    if vgg16_base is None:
        raise ValueError("VGG16 base model not found in loaded fold model.")

    # e. Unfreeze only blocks 3, 4, and 5 of the VGG16 base for fine-tuning
    vgg16_base.trainable = True
    for layer in vgg16_base.layers:
        if any(block in layer.name for block in ['block3', 'block4', 'block5']):
            layer.trainable = True
        else:
            layer.trainable = False

    # f. Recompile the model for fine-tuning with a small learning rate
    model_fold.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    print("Model recompiled for fine-tuning (blocks 3â€“5 trainable, lr=1e-5).")

    # g. Create a ModelCheckpoint for saving the best fine-tuned version of this fold
    finetuned_ckpt_path = f'vgg16_cv_fold_{fold_idx}_finetuned_best.keras'
    fine_ckpt = tf.keras.callbacks.ModelCheckpoint(
        filepath=finetuned_ckpt_path,
        monitor='val_loss',
        mode='min',
        save_best_only=True,
        save_weights_only=False,
        verbose=1
    )

    # h. Fine-tune the model on this fold's data
    print(f"Fine-tuning Fold {fold_idx} for {EPOCHS_FINE_TUNE} epochs...")
    model_fold.fit(
        fold_train_gen,
        validation_data=fold_valid_gen,
        epochs=EPOCHS_FINE_TUNE,
        class_weight=fold_class_weight,
        callbacks=[fine_ckpt],
        verbose=1
    )

    # i. Load the best fine-tuned model and evaluate its F1 macro score on the fold's validation set
    best_fold_model = tf.keras.models.load_model(finetuned_ckpt_path)
    cv_finetuned_paths.append(finetuned_ckpt_path)

    fold_probs = best_fold_model.predict(fold_valid_gen, verbose=0)
    fold_pred = np.argmax(fold_probs, axis=1)
    fold_y_true = fold_valid_df['label'].values

    fold_f1 = f1_score(fold_y_true, fold_pred, average='macro')
    cv_finetuned_f1_scores.append(fold_f1)
    print(f"Fold {fold_idx} fine-tuned F1 (macro) on its validation set: {fold_f1:.4f}")

print("\n--- Fine-tuning of CV folds complete ---")
print("Fine-tuned CV F1 (macro) per fold:", cv_finetuned_f1_scores)
print("Mean fine-tuned CV F1 (macro):", np.mean(cv_finetuned_f1_scores))

## Ensemble

In [None]:
import tensorflow as tf
import numpy as np

# Function to perform ensemble prediction across multiple models
def ensemble_predict(model_paths, generator):
    """
    Performs ensemble prediction by loading models and averaging their probabilities.

    Args:
        model_paths (list): A list of file paths to the Keras models (.keras format).
        generator (tf.keras.utils.Sequence): A data generator for predictions.

    Returns:
        numpy.ndarray: Averaged probabilities from all successfully loaded models.
    """
    all_probs = None
    loaded_models = []
    print(f"Loading {len(model_paths)} models for ensembling...")
    for path in model_paths:
        try:
            m = tf.keras.models.load_model(path)
            loaded_models.append(m)
            print(f"Loaded model: {path}")
        except Exception as e:
            print(f"Error loading model {path}: {e}. Skipping this model.")

    if not loaded_models:
        raise ValueError("No models were successfully loaded for ensembling.")

    print(f"Making predictions with {len(loaded_models)} loaded models...")
    for i, m in enumerate(loaded_models):
        print(f"Predicting with model {i+1}/{len(loaded_models)}...")
        # Use verbose=0 to suppress prediction progress bars for cleaner output
        probs = m.predict(generator, verbose=0)
        if all_probs is None:
            all_probs = probs
        else:
            all_probs += probs
    all_probs /= len(loaded_models)
    return all_probs

In [None]:
# Assign the list of fine-tuned CV model paths to cv_models_paths
cv_models_paths = cv_finetuned_paths

In [None]:
from sklearn.metrics import f1_score
import numpy as np

print("\n--- Starting Ensemble Prediction (fine-tuned + CV models) ---")

# Path to previously trained fine-tuned model (if used in ensemble)
# fine_tuned_model_path = 'vgg16_fine_tuned_best.keras'

# Combine fine-tuned model + all CV models (if fine_tuned_model_path was included)
# ensemble_model_paths = [fine_tuned_model_path] + cv_models_paths

# Use only the fine-tuned CV models for this ensemble
ensemble_model_paths = cv_models_paths

print("Models used in ensemble:")
for p in ensemble_model_paths:
    print(" -", p)

# Perform ensemble prediction on the main validation set
print("\nEnsembling fine-tuned + CV models on main validation set...")
ens_probs = ensemble_predict(ensemble_model_paths, main_val_gen)
ens_pred = np.argmax(ens_probs, axis=1)

# Calculate and print the macro F1-score for the ensemble on the validation set
f1_macro_ens = f1_score(main_y_true, ens_pred, average='macro')
print(f"\nEnsemble Validation F1 (macro): {f1_macro_ens:.4f}")

print("\n--- Ensemble Prediction Complete ---")

# Step 4: Test

## FineTuned

In [None]:
import pandas as pd
import tensorflow as tf
import numpy as np
import os
import cv2
import albumentations as A
from sklearn.metrics import f1_score, classification_report

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# Re-define AlbumentationsSequence class to ensure it's available and robust for test set
class AlbumentationsSequence(tf.keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, transform, shuffle=True, is_test=False):
        self.df = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.transform = transform
        self.shuffle = shuffle
        self.is_test = is_test # Flag to indicate if this is a test generator (no labels needed)
        self.indices = np.arange(len(self.df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        # Get indices for the current batch
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]

        batch_images = []
        batch_labels = []

        for i in batch_indices:
            row = self.df.iloc[i]
            file_name = row["file_name"]

            # Construct image path and load image
            img_path = os.path.join(self.image_dir, file_name)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            # Apply transformations if provided
            if self.transform:
                augmented = self.transform(image=img)
                img = augmented["image"]

            batch_images.append(img)
            if not self.is_test:
                label = int(row["label"])
                batch_labels.append(label)

        # Convert lists to numpy arrays
        batch_x = np.stack(batch_images, axis=0).astype("float32")

        if self.is_test:
            return batch_x
        else:
            batch_y = np.array(batch_labels, dtype="int32")
            return batch_x, batch_y

# Define ImageNet mean and standard deviation for normalization
imagenet_mean = (0.485, 0.456, 0.406)
imagenet_std  = (0.229, 0.224, 0.225)

# Define validation/test image transformations (resizing and normalization only)
val_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=imagenet_mean, std=imagenet_std),
])

# Reload test_df to ensure it's available, using the refined test_refined.csv
try:
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv')
    print("test_df reloaded.")
except FileNotFoundError as e:
    print(f"Error reloading test_df: {e}. Please ensure the dataset is unzipped and available at '{dataset_base_path}/'.")
    # Exit or handle error appropriately if files are missing
    exit()

# Define image directory for the test set
test_image_dir = f'{dataset_base_path}/test'
BATCH_SIZE = 32

# 1. Load the best fine-tuned model saved from previous steps
fine_tuned_model = tf.keras.models.load_model('vgg16_fine_tuned_best.keras')
print("Successfully loaded the best fine-tuned model: vgg16_fine_tuned_best.keras")

# 2. Prepare a data generator for the test_df using val_transform
test_gen = AlbumentationsSequence(
    test_df,
    test_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False, # Important for maintaining order for submission file
    is_test=True # Indicate this is a test generator without labels
)
print("Test data generator prepared.")

# 3. Generate predictions on the test set using the fine-tuned model
print("Generating predictions on the test set...")
test_probabilities = fine_tuned_model.predict(test_gen)

# 4. Convert predicted probabilities to class labels (0-4)
test_predictions = np.argmax(test_probabilities, axis=1)
print("Predictions generated and converted to class labels.")

# 5. Create the submission DataFrame in the required format
submission_df = pd.DataFrame({
    'id': test_df['id'],
    'label': test_predictions
})

# 6. Save the predictions to a CSV file for submission
submission_file_path = 'submission.csv'
submission_df.to_csv(submission_file_path, index=False)

print(f"\nPredictions saved to {submission_file_path}")
print("Submission file head:")
print(submission_df.head())
print("\nPrediction on test set completed.")

## Ensemble

In [None]:
import pandas as pd
import tensorflow as tf
import numpy as np
import os
import cv2
import albumentations as A
from sklearn.metrics import f1_score, classification_report

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# Re-define AlbumentationsSequence class to ensure it's available and robust for test set
class AlbumentationsSequence(tf.keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, transform, shuffle=True, is_test=False):
        self.df = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.transform = transform
        self.shuffle = shuffle
        self.is_test = is_test # Flag to indicate if this is a test generator (no labels needed)
        self.indices = np.arange(len(self.df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        # Get indices for the current batch
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]

        batch_images = []
        batch_labels = []

        for i in batch_indices:
            row = self.df.iloc[i]
            file_name = row["file_name"]

            # Construct image path and load image
            img_path = os.path.join(self.image_dir, file_name)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            # Apply transformations if provided
            if self.transform:
                augmented = self.transform(image=img)
                img = augmented["image"]

            batch_images.append(img)
            if not self.is_test:
                label = int(row["label"])
                batch_labels.append(label)

        # Convert lists to numpy arrays
        batch_x = np.stack(batch_images, axis=0).astype("float32")

        if self.is_test:
            return batch_x
        else:
            batch_y = np.array(batch_labels, dtype="int32")
            return batch_x, batch_y

# Define ImageNet mean and standard deviation for normalization
imagenet_mean = (0.485, 0.456, 0.406)
imagenet_std  = (0.229, 0.224, 0.225)

# Define validation/test image transformations (resizing and normalization only)
val_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=imagenet_mean, std=imagenet_std),
])

# Reload test_df to ensure it's available
try:
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv')
    print("test_df reloaded.")
except FileNotFoundError as e:
    print(f"Error reloading test_df: {e}. Please ensure the dataset is unzipped and available at '{dataset_base_path}/'.")
    # Exit or handle error appropriately if files are missing
    exit()

# Define image directory for the test set
test_image_dir = f'{dataset_base_path}/test'
BATCH_SIZE = 32

# 2. Prepare a data generator for the test_df using val_transform
test_gen = AlbumentationsSequence(
    test_df,
    test_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False, # Important for maintaining order for submission
    is_test=True # Indicate this is a test generator without labels
)
print("Test data generator prepared.")

# Re-initialize `ensemble_model_paths` to the correct list from previous cells.
# This ensures the `ensemble_predict` function receives a list of model paths,
# not a single string or an improperly formatted variable.
ensemble_model_paths = cv_finetuned_paths # Assuming cv_finetuned_paths holds the correct list of fine-tuned CV models

# 3. Generate ensemble predictions on the test set
print("\nGenerating ensemble predictions on the test set...")
try:
    test_probabilities = ensemble_predict(ensemble_model_paths, test_gen)
except ValueError as e:
    print(f"Ensemble prediction failed: {e}. Please check 'ensemble_model_paths'.")
    # If ensemble prediction fails, create a dummy submission to avoid blocking further execution
    submission_df = pd.DataFrame({'id': test_df['id'], 'label': -1}) # -1 can indicate an error or unknown label
    submission_file_path = 'submission_ensemble_error.csv'
    submission_df.to_csv(submission_file_path, index=False)
    print(f"Generated error submission file: {submission_file_path}")
    exit() # Stop execution after error

# 4. Convert ensemble probabilities to class labels
test_predictions = np.argmax(test_probabilities, axis=1)
print("Ensemble predictions generated and converted to class labels.")

# 5. Create the submission DataFrame
submission_df = pd.DataFrame({
    'id': test_df['id'],
    'label': test_predictions
})

# 6. Save the predictions to a CSV file
submission_file_path = 'submission_ensemble.csv'
submission_df.to_csv(submission_file_path, index=False)

print(f"\nEnsemble predictions saved to {submission_file_path}")
print("Submission file head:")
print(submission_df.head())
print("\nEnsemble prediction on test set completed.")

# VIT

# Step 5: ViT Model Setup and Utilities

In [None]:
import keras_hub
import tensorflow as tf
from tensorflow import keras
from keras import layers, models
import keras_cv
import pandas as pd
import numpy as np
import cv2
import os
import albumentations as A
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score
from tensorflow.keras.applications import VGG16
import random

# Set seeds for reproducibility
SEED = 42
tf.random.set_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

# --- Albumentations Sequence for data loading and augmentation ---
class AlbumentationsSequence(tf.keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, transform, shuffle=True, is_test=False):
        self.df = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.transform = transform
        self.shuffle = shuffle
        self.is_test = is_test
        self.indices = np.arange(len(self.df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        # Get indices for the current batch
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]

        batch_images = []
        batch_labels = []

        for i in batch_indices:
            row = self.df.iloc[i]
            file_name = row["file_name"]

            # Construct image path and load image
            img_path = os.path.join(self.image_dir, file_name)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            # Apply transformations if provided
            if self.transform:
                augmented = self.transform(image=img)
                # Image should now be float32 in [0, 1] if ToFloat is used in transform
                img = augmented["image"]

            batch_images.append(img)
            if not self.is_test:
                label = int(row["label"])
                batch_labels.append(label)

        # Convert lists to numpy arrays
        batch_x = np.stack(batch_images, axis=0).astype("float32")

        if self.is_test:
            return batch_x
        else:
            batch_y = np.array(batch_labels, dtype="int32")
            return batch_x, batch_y

In [None]:
# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# --- Image transformations ---
# - Remove manual ImageNet Normalize
# - Use ToFloat so models see [0, 1]
train_transform = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Affine(
        scale=(1 - 0.1, 1 + 0.1),
        translate_percent=(0.05, 0.05),
        rotate=(-15, 15),
        p=0.5
    ),
    A.GaussNoise(p=0.3),
    A.ToFloat(max_value=255.0),  # Normalize to float32 in [0, 1]
])

val_transform = A.Compose([
    A.Resize(224, 224),
    A.ToFloat(max_value=255.0),  # Normalize to float32 in [0, 1]
])

# --- Load dataframes ---
try:
    train_df = pd.read_csv(f'{dataset_base_path}/train.csv')
    val_df = pd.read_csv(f'{dataset_base_path}/val.csv')
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv')
    print("DataFrames loaded successfully.")
except FileNotFoundError as e:
    print(f"Error loading dataframes: {e}. Please ensure the dataset is unzipped and available.")
    raise  # Stop execution if essential files are missing

# Define image directories
train_image_dir = f'{dataset_base_path}/train'
val_image_dir = f'{dataset_base_path}/val'
test_image_dir = f'{dataset_base_path}/test'

# --- Global constants ---
BATCH_SIZE = 32
NUM_CLASSES = 5
EPOCHS_CV_VIT_SWIN = 5  # Number of epochs for initial training of ViT/Swin heads
K_FOLDS = 3             # Number of splits for StratifiedKFold

# --- Ensemble predict function ---
def ensemble_predict(model_paths, generator):
    all_probs = None
    loaded_models = []
    for path in model_paths:
        try:
            m = tf.keras.models.load_model(path)
            loaded_models.append(m)
        except Exception as e:
            print(f"Error loading model {path}: {e}. Skipping this model.")

    if not loaded_models:
        raise ValueError("No models were successfully loaded for ensembling.")

    print(f"Making predictions with {len(loaded_models)} loaded models...")
    for i, m in enumerate(loaded_models):
        probs = m.predict(generator, verbose=0)
        if all_probs is None:
            all_probs = probs
        else:
            all_probs += probs
    all_probs /= len(loaded_models)
    return all_probs

# --- Main validation generator and cross-validation setup ---
main_val_gen = AlbumentationsSequence(
    val_df,
    val_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False
)
main_y_true = val_df['label'].values
print("Main validation data generator and true labels prepared for overall ensemble evaluation.")

# Setup for cross-validation
X_train_cv = train_df['file_name'].values
y_train_cv = train_df['label'].values
skf = StratifiedKFold(n_splits=K_FOLDS, shuffle=True, random_state=42)

# Cache splits so all models (VGG, ViT, Swin) use the same folds for consistency
fold_splits = list(skf.split(X_train_cv, y_train_cv))

# Step 6: ViT Cross-Validation Training

In [None]:
# --- ViT Cross-Validation Training (Initial Head Training) ---
vit_cv_finetuned_paths = []
vit_cv_f1_scores = []
print("\n--- Starting ViT Cross-Validation Training ---")

# Iterate through each pre-defined fold split
for fold_idx, (train_idx, valid_idx) in enumerate(fold_splits):
    print(f"\n=== ViT Fold {fold_idx+1}/{K_FOLDS} ===")

    # Create dataframes for the current fold's training and validation sets
    fold_train_df = train_df.iloc[train_idx].reset_index(drop=True)
    fold_valid_df = train_df.iloc[valid_idx].reset_index(drop=True)

    # Instantiate data generators for the current fold
    fold_train_gen = AlbumentationsSequence(
        fold_train_df,
        train_image_dir,
        batch_size=BATCH_SIZE,
        transform=train_transform,
        shuffle=True
    )
    fold_valid_gen = AlbumentationsSequence(
        fold_valid_df,
        train_image_dir,
        batch_size=BATCH_SIZE,
        transform=val_transform,
        shuffle=False
    )

    # Calculate class weights for the current fold's training data
    fold_counts = fold_train_df['label'].value_counts().sort_index().values.astype('float32')
    total_fold = fold_counts.sum()
    fold_class_weights_array = total_fold / (NUM_CLASSES * fold_counts)
    fold_class_weight = {i: float(w) for i, w in enumerate(fold_class_weights_array)}
    print("Fold class weights:", fold_class_weight)

    # Build a fresh ViT model (backbone frozen) for this fold
    vit_model = build_vit_model(NUM_CLASSES)
    print("Fresh ViT model initialized and compiled for the current fold.")

    # Define checkpoint filepath for this fold
    checkpoint_filepath = f'vit_cv_fold_{fold_idx+1}_best.keras'
    cv_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath,
        monitor='val_loss',
        mode='min',
        save_best_only=True,
        save_weights_only=False,
        verbose=1
    )

    # Train the ViT model's classification head
    print(f"Training ViT Fold {fold_idx+1} model for {EPOCHS_CV_VIT_SWIN} epochs...")
    vit_model.fit(
        fold_train_gen,
        validation_data=fold_valid_gen,
        epochs=EPOCHS_CV_VIT_SWIN,
        class_weight=fold_class_weight,
        callbacks=[cv_checkpoint_callback],
        verbose=1
    )

    # Evaluate the best model from this fold
    try:
        best_fold_model = tf.keras.models.load_model(checkpoint_filepath)
        vit_cv_finetuned_paths.append(checkpoint_filepath) # Store path for ensembling

        fold_probs = best_fold_model.predict(fold_valid_gen, verbose=0)
        fold_pred = np.argmax(fold_probs, axis=1)
        fold_y_true = fold_valid_df['label'].values

        fold_f1 = f1_score(fold_y_true, fold_pred, average='macro')
        vit_cv_f1_scores.append(fold_f1)
        print(f"ViT Fold {fold_idx+1} F1 (macro) on its validation set: {fold_f1:.4f}")
    except Exception as e:
        print(f"Could not load or evaluate best ViT model for fold {fold_idx+1}: {e}. Appending current checkpoint path and F1 0.0.")
        vit_cv_finetuned_paths.append(checkpoint_filepath) # Still append path even if evaluation fails
        vit_cv_f1_scores.append(0.0)

print("\n--- ViT Cross-Validation Training Complete ---")
print("ViT CV F1 (macro) per fold:", vit_cv_f1_scores)
print("Mean ViT CV F1 (macro):")
if vit_cv_f1_scores:
    print(np.mean(vit_cv_f1_scores))
else:
    print("N/A (no scores to average)")

# Step 7: Ensemble Setup for All CV Models

In [None]:
# --- Combine all model checkpoints for the final ensemble ---

# Paths to the fine-tuned VGG16 models from cross-validation
vgg16_cv_finetuned_paths = [
    'vgg16_cv_fold_1_finetuned_best.keras',
    'vgg16_cv_fold_2_finetuned_best.keras',
    'vgg16_cv_fold_3_finetuned_best.keras',
]

# Combine VGG16 fine-tuned CV model paths with ViT fine-tuned CV model paths
all_ensemble_model_paths = vgg16_cv_finetuned_paths + vit_cv_finetuned_paths

print(f"\nTotal models in ensemble: {len(all_ensemble_model_paths)}")
print("All ensemble model paths collected:")
for path in all_ensemble_model_paths:
    print(f" - {path}")

In [None]:
from sklearn.metrics import f1_score
import numpy as np

# --- Ensemble Prediction on Validation Set ---
print("\n--- Starting Ensemble Prediction on Validation Set ---")
try:
    # Perform ensemble prediction using all collected model paths on the main validation set
    ensemble_val_probs = ensemble_predict(all_ensemble_model_paths, main_val_gen)
    ensemble_val_pred = np.argmax(ensemble_val_probs, axis=1)

    # Calculate the macro F1-score for the ensemble predictions
    f1_macro_ensemble_val = f1_score(main_y_true, ensemble_val_pred, average='macro')
    print(f"\nFinal Ensemble Validation F1 (macro): {f1_macro_ensemble_val:.4f}")
except ValueError as e:
    print(f"Ensemble prediction on validation set failed: {e}")
    f1_macro_ensemble_val = 0.0 # Set F1 to 0.0 if ensemble fails

print("--- Ensemble Prediction on Validation Set Complete ---")

In [None]:
import pandas as pd
import numpy as np

# --- Ensemble Prediction on Test Set + Submission File Generation ---
print("\n--- Starting Ensemble Prediction on Test Set ---")

# Prepare a data generator specifically for the test set
test_gen_ensemble = AlbumentationsSequence(
    test_df,
    test_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False,
    is_test=True
)

try:
    # Perform ensemble prediction on the test set
    ensemble_test_probs = ensemble_predict(all_ensemble_model_paths, test_gen_ensemble)
    ensemble_test_predictions = np.argmax(ensemble_test_probs, axis=1)
    print("Ensemble predictions on test set generated.")

    # Create the final submission DataFrame
    submission_df_final = pd.DataFrame({
        'id': test_df['id'],
        'label': ensemble_test_predictions
    })

    # Define the submission file path and save the DataFrame to CSV
    submission_file_path_final = 'submission_final_ensemble.csv'
    submission_df_final.to_csv(submission_file_path_final, index=False)

    print(f"\nFinal Ensemble predictions saved to {submission_file_path_final}")
    print("Submission file head:")
    print(submission_df_final.head())
except ValueError as e:
    print(f"Ensemble prediction on test set failed: {e}")
    # If prediction fails, create an error submission file
    submission_df_final = pd.DataFrame({'id': test_df['id'], 'label': -1})
    submission_file_path_final = 'submission_final_ensemble_error.csv'
    submission_df_final.to_csv(submission_file_path_final, index=False)
    print(f"Generated error submission file: {submission_file_path_final}")

print("--- Ensemble Prediction on Test Set Complete ---")

In [None]:
from sklearn.metrics import f1_score
import numpy as np
from tensorflow import keras
import pandas as pd

# Define the base path for the dataset (for test_df)
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# Reload test_df to ensure it's available and consistent
try:
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv')
    print("test_df reloaded for ensemble evaluation.")
except FileNotFoundError as e:
    print(f"Error reloading test_df: {e}. Please ensure the dataset is unzipped.")
    # If test_df is critical and missing, exit or handle appropriately
    exit()

# --- ViT alone: validate + test submission ---
print("\n=== ViT: evaluation on val + prediction on test ===")

# Make predictions with the best fine-tuned ViT model on the validation set
vit_val_probs = vit_best.predict(val_gen, verbose=0)
vit_val_pred = np.argmax(vit_val_probs, axis=1)
vit_val_f1 = f1_score(y_val_true, vit_val_pred, average="macro")
print(f"ViT Validation F1 (macro): {vit_val_f1:.4f}")

# Make predictions with the best fine-tuned ViT model on the test set
vit_test_probs = vit_best.predict(test_gen, verbose=0)
vit_test_pred = np.argmax(vit_test_probs, axis=1)

# Create and save ViT-only submission file
submission_vit = pd.DataFrame({
    "id": test_df["id"],
    "label": vit_test_pred
})
submission_vit_path = "submission_vit_only.csv"
submission_vit.to_csv(submission_vit_path, index=False)
print(f"Saved ViT-only submission to {submission_vit_path}")
print(submission_vit.head())


# --- VGG ensemble: validate + test ---
print("\n=== VGG (CV models) ensemble on val + test ===")

# Paths to the fine-tuned VGG models from cross-validation
vgg_paths = [
    "vgg16_cv_fold_1_finetuned_best.keras",
    "vgg16_cv_fold_2_finetuned_best.keras",
    "vgg16_cv_fold_3_finetuned_best.keras",
]

# Helper function to predict with an ensemble of models
def predict_ensemble(paths, generator):
    all_probs = None
    num_used = 0

    for path in paths:
        try:
            model = keras.models.load_model(path)
        except Exception as e:
            print(f"Could not load {path}: {e}. Skipping.")
            continue

        probs = model.predict(generator, verbose=0)
        if all_probs is None:
            all_probs = probs
        else:
            all_probs += probs

        num_used += 1

    if num_used == 0:
        raise ValueError("No models successfully loaded for ensemble.")

    all_probs /= num_used
    return all_probs

# Make predictions with the VGG ensemble on the validation set
vgg_val_probs = predict_ensemble(vgg_paths, val_gen)
vgg_val_pred = np.argmax(vgg_val_probs, axis=1)
vgg_val_f1 = f1_score(y_val_true, vgg_val_pred, average="macro")
print(f"VGG CV Ensemble Validation F1 (macro): {vgg_val_f1:.4f}")

# Make predictions with the VGG ensemble on the test set
vgg_test_probs = predict_ensemble(vgg_paths, test_gen)
vgg_test_pred = np.argmax(vgg_test_probs, axis=1)

# Create and save VGG-ensemble submission file
submission_vgg = pd.DataFrame({
    "id": test_df["id"],
    "label": vgg_test_pred
})
submission_vgg_path = "submission_vgg_ensemble.csv"
submission_vgg.to_csv(submission_vgg_path, index=False)
print(f"Saved VGG-ensemble submission to {submission_vgg_path}")
print(submission_vgg.head())


# --- ViT + VGG ensemble: validate + test ---
print("\n=== ViT + VGG ensemble on val + test ===")

# Average probabilities from ViT and VGG ensembles for validation
vit_vgg_val_probs = (vit_val_probs + vgg_val_probs) / 2.0
vit_vgg_val_pred = np.argmax(vit_vgg_val_probs, axis=1)
vit_vgg_val_f1 = f1_score(y_val_true, vit_vgg_val_pred, average="macro")
print(f"ViT + VGG Ensemble Validation F1 (macro): {vit_vgg_val_f1:.4f}")

# Average probabilities from ViT and VGG ensembles for test set
vit_vgg_test_probs = (vit_test_probs + vgg_test_probs) / 2.0
vit_vgg_test_pred = np.argmax(vit_vgg_test_probs, axis=1)

# Create and save combined ViT+VGG ensemble submission file
submission_vit_vgg = pd.DataFrame({
    "id": test_df["id"],
    "label": vit_vgg_test_pred
})
submission_vit_vgg_path = "submission_vit_vgg_ensemble.csv"
submission_vit_vgg.to_csv(submission_vit_vgg_path, index=False)
print(f"Saved ViT+VGG ensemble submission to {submission_vit_vgg_path}")
print(submission_vit_vgg.head())

# Step 8: Alternative ViT Training Path (Single Model)

In [None]:
import os
import random
import cv2
import numpy as np
import pandas as pd

import tensorflow as tf
import keras
from keras import layers, models
import keras_hub
import keras_cv

from sklearn.metrics import f1_score
import albumentations as A

# Print versions for environment tracking
print("TF version:", tf.__version__)
print("Keras version:", keras.__version__)
print("Using backend:", keras.config.backend())

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# --- Reproducibility ---
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# --- Albumentations Sequence for data loading and augmentation ---
class AlbumentationsSequence(keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, transform, shuffle=True, is_test=False):
        self.df = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.transform = transform
        self.shuffle = shuffle
        self.is_test = is_test
        self.indices = np.arange(len(self.df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        # Get indices for the current batch
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_images = []
        batch_labels = []

        for i in batch_indices:
            row = self.df.iloc[i]
            file_name = row["file_name"]

            # Construct image path and load image
            img_path = os.path.join(self.image_dir, file_name)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            # Apply transformations if provided
            if self.transform is not None:
                augmented = self.transform(image=img)
                img = augmented["image"]  # Should be float32 [0,1] after ToFloat

            batch_images.append(img)
            if not self.is_test:
                label = int(row["label"])
                batch_labels.append(label)

        # Convert lists to numpy arrays
        batch_x = np.stack(batch_images, axis=0).astype("float32")

        if self.is_test:
            return batch_x
        else:
            batch_y = np.array(batch_labels, dtype="int32")
            return batch_x, batch_y

# --- Transforms for data augmentation and normalization ---
train_transform = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Affine(
        scale=(1 - 0.1, 1 + 0.1),
        translate_percent=(0.05, 0.05),
        rotate=(-15, 15),
        p=0.5
    ),
    A.GaussNoise(p=0.3),
    A.ToFloat(max_value=255.0),  # Normalize to float32 in [0,1]
])

val_transform = A.Compose([
    A.Resize(224, 224),
    A.ToFloat(max_value=255.0),  # Normalize to float32 in [0,1]
])

# --- Load dataframes ---
# Reload the dataframes for consistency within this cell
try:
    train_df = pd.read_csv(f'{dataset_base_path}/train.csv')
    val_df = pd.read_csv(f'{dataset_base_path}/val.csv')
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv') # Using refined test_refined.csv
    print("DataFrames reloaded successfully for VIT training.")
except FileNotFoundError as e:
    print(f"Error loading dataframes: {e}. Please ensure the dataset is unzipped and available.")
    raise  # Stop execution if essential files are missing

# Define image directories
train_image_dir = f'{dataset_base_path}/train'
val_image_dir = f'{dataset_base_path}/val'
test_image_dir = f'{dataset_base_path}/test'

# --- Global constants ---
BATCH_SIZE = 8 # Reduced batch size to mitigate OOM for ViT training
NUM_CLASSES = 5
EPOCHS_VIT = 8

# Calculate class weights on full training data
counts = train_df["label"].value_counts().sort_index().values.astype("float32")
total = counts.sum()
class_weights_array = total / (NUM_CLASSES * counts)
CLASS_WEIGHT = {i: float(w) for i, w in enumerate(class_weights_array)}
print("Class weights:", CLASS_WEIGHT)

# --- Build ViT model function ---
def build_vit_model(num_classes, input_shape=(224, 224, 3)):
    """
    ViT-Base model from KerasHub, with backbone frozen for initial training.
    """
    print("Building ViT-base model (KerasHub)...")

    # Load pretrained ViT backbone from KerasHub (ImageNet pretraining)
    backbone = keras_hub.models.ViTBackbone.from_preset(
        "vit_base_patch16_224_imagenet"
    )
    backbone.trainable = False  # Freeze backbone to train only the new head initially

    inputs = keras.Input(shape=input_shape)
    x = inputs
    x = backbone(x)          # Pass inputs through the backbone
    cls_token = x[:, 0]      # Extract the [CLS] token as the global representation
    x = layers.Dropout(0.3)(cls_token) # Add a dropout layer
    outputs = layers.Dense(num_classes, activation="softmax")(x) # Output Dense layer

    model = keras.Model(inputs, outputs, name="vit_base_classifier")

    # Compile the model
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=3e-4),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model

# --- Generators for train / val / test datasets ---
train_gen = AlbumentationsSequence(
    train_df, train_image_dir,
    batch_size=BATCH_SIZE,
    transform=train_transform,
    shuffle=True,
    is_test=False
)

val_gen = AlbumentationsSequence(
    val_df, val_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False,
    is_test=False
)

test_gen = AlbumentationsSequence(
    test_df, test_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False,
    is_test=True
)

y_val_true = val_df["label"].values

# --- Train ViT (Initial Head Training - Backbone Frozen) ---
vit_model = build_vit_model(NUM_CLASSES)

vit_ckpt_path = "vit_finetuned_best.keras" # Checkpoint path for the best model
vit_ckpt_cb = keras.callbacks.ModelCheckpoint(
    filepath=vit_ckpt_path,
    monitor="val_loss",
    mode="min",
    save_best_only=True,
    save_weights_only=False,
    verbose=1,
)

print("\n=== Training ViT (Head Training with Frozen Backbone) ===")
vit_history = vit_model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=EPOCHS_VIT,
    class_weight=CLASS_WEIGHT,
    callbacks=[vit_ckpt_cb],
    verbose=1,
)

# Load the best model saved during initial training
vit_best = keras.models.load_model(vit_ckpt_path)
print("Loaded best ViT model from", vit_ckpt_path)

## Fine-tuning the Single ViT Model

In [None]:
from tensorflow import keras
import os
import random
import cv2
import numpy as np
import pandas as pd

import tensorflow as tf
import keras
from keras import layers, models
import keras_hub
import keras_cv

from sklearn.metrics import f1_score
import albumentations as A

# Print versions for environment tracking
print("TF version:", tf.__version__)
print("Keras version:", keras.__version__)
print("Using backend:", keras.config.backend())

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# --- Reproducibility ---
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# --- Albumentations Sequence for data loading and augmentation ---
class AlbumentationsSequence(keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, transform, shuffle=True, is_test=False):
        self.df = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.transform = transform
        self.shuffle = shuffle
        self.is_test = is_test
        self.indices = np.arange(len(self.df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        # Get indices for the current batch
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_images = []
        batch_labels = []

        for i in batch_indices:
            row = self.df.iloc[i]
            file_name = row["file_name"]

            # Construct image path and load image
            img_path = os.path.join(self.image_dir, file_name)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            # Apply transformations if provided
            if self.transform is not None:
                augmented = self.transform(image=img)
                img = augmented["image"]  # Should be float32 [0,1] after ToFloat

            batch_images.append(img)
            if not self.is_test:
                label = int(row["label"])
                batch_labels.append(label)

        # Convert lists to numpy arrays
        batch_x = np.stack(batch_images, axis=0).astype("float32")

        if self.is_test:
            return batch_x
        else:
            batch_y = np.array(batch_labels, dtype="int32")
            return batch_x, batch_y

# --- Transforms for data augmentation and normalization ---
train_transform = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Affine(
        scale=(1 - 0.1, 1 + 0.1),
        translate_percent=(0.05, 0.05),
        rotate=(-15, 15),
        p=0.5
    ),
    A.GaussNoise(p=0.3),
    A.ToFloat(max_value=255.0),  # Normalize to float32 in [0,1]
])

val_transform = A.Compose([
    A.Resize(224, 224),
    A.ToFloat(max_value=255.0),  # Normalize to float32 in [0,1]
])

# --- Load dataframes ---
# Reload the dataframes for consistency within this cell
try:
    train_df = pd.read_csv(f'{dataset_base_path}/train.csv')
    val_df = pd.read_csv(f'{dataset_base_path}/val.csv')
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv') # Using refined test_refined.csv
    print("DataFrames reloaded successfully for VIT training.")
except FileNotFoundError as e:
    print(f"Error loading dataframes: {e}. Please ensure the dataset is unzipped and available.")
    raise  # Stop execution if essential files are missing

# Define image directories
train_image_dir = f'{dataset_base_path}/train'
val_image_dir = f'{dataset_base_path}/val'
test_image_dir = f'{dataset_base_path}/test'

# --- Global constants ---
BATCH_SIZE = 8 # Reduced batch size to mitigate OOM for ViT training
NUM_CLASSES = 5
EPOCHS_VIT = 8 # (Not directly used in this phase 2 fine-tuning, but kept for context)

# Class weights on full training data
counts = train_df["label"].value_counts().sort_index().values.astype("float32")
total = counts.sum()
class_weights_array = total / (NUM_CLASSES * counts)
CLASS_WEIGHT = {i: float(w) for i, w in enumerate(class_weights_array)}
print("Class weights:", CLASS_WEIGHT)

# --- Build ViT model function (re-definition for consistency, though not strictly building from scratch here) ---
def build_vit_model(num_classes, input_shape=(224, 224, 3)):
    """
    ViT-Base model from KerasHub. (This function is defined but the model
    is loaded from checkpoint for fine-tuning in this cell).
    """
    print("Building ViT-base model (KerasHub)...")

    # Load pretrained ViT backbone from KerasHub (ImageNet pretraining)
    backbone = keras_hub.models.ViTBackbone.from_preset(
        "vit_base_patch16_224_imagenet"
    )
    backbone.trainable = False  # Keep frozen for initial head training, then unfreeze for fine-tuning

    inputs = keras.Input(shape=input_shape)
    x = inputs
    x = backbone(x)          # Pass inputs through the backbone
    cls_token = x[:, 0]      # Extract the [CLS] token as the global representation
    x = layers.Dropout(0.3)(cls_token) # Add a dropout layer
    outputs = layers.Dense(num_classes, activation="softmax")(x) # Output Dense layer

    model = keras.Model(inputs, outputs, name="vit_base_classifier")

    # Compile the model
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=3e-4),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model

# --- Generators for train / val / test datasets ---
train_gen = AlbumentationsSequence(
    train_df, train_image_dir,
    batch_size=BATCH_SIZE,
    transform=train_transform,
    shuffle=True,
    is_test=False
)

val_gen = AlbumentationsSequence(
    val_df, val_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False,
    is_test=False
)

test_gen = AlbumentationsSequence(
    test_df, test_image_dir,
    batch_size=BATCH_SIZE,
    transform=val_transform,
    shuffle=False,
    is_test=True
)

y_val_true = val_df["label"].values

# --- Phase 2: Fine-tuning ViT Backbone ---

# 0) Path from previous normal training (Phase 1: head training)
vit_ckpt_path = "vit_initial_best.keras"  # Ensure this path is correct

# 1) Load the previously trained model (with frozen backbone)
vit_model = keras.models.load_model(vit_ckpt_path)
print("Loaded pre-trained ViT from", vit_ckpt_path)

# 2) Grab the backbone (ViTBackbone) layer and unfreeze it
# In our build, Input = layer 0, ViTBackbone = layer 1
vit_backbone = vit_model.layers[1]
vit_backbone.trainable = True # Now the backbone's weights can be updated

# 3) Recompile the entire model with a very small learning rate for fine-tuning
vit_model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-5), # Use a significantly smaller LR
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"],
)

# 4) New checkpoint for this fine-tuned phase
vit_ft_ckpt_path = "vit_finetuned_best.keras"
vit_ft_ckpt_cb = keras.callbacks.ModelCheckpoint(
    filepath=vit_ft_ckpt_path,
    monitor="val_loss",
    mode="min",
    save_best_only=True,
    save_weights_only=False,
    verbose=1,
)

print("\n=== Phase 2: Fine-tuning ViT Backbone from saved checkpoint ===")
vit_ft_history = vit_model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=5,            # Adjust number of epochs for fine-tuning
    class_weight=CLASS_WEIGHT,
    callbacks=[vit_ft_ckpt_cb],
    verbose=1,
)

# 5) Load the best model from this fine-tuning phase
vit_best = keras.models.load_model(vit_ft_ckpt_path)
print("Loaded fine-tuned ViT model from", vit_ft_ckpt_path)

# Final Model Evaluation and Submission

In [None]:
from sklearn.metrics import f1_score
import numpy as np
from tensorflow import keras

# -----------------------------------------
# 1) ViT alone: validate + test submission
# -----------------------------------------

print("\n=== ViT: evaluation on val + prediction on test ===")

# ViT on validation
vit_val_probs = vit_best.predict(val_gen, verbose=0)
vit_val_pred = np.argmax(vit_val_probs, axis=1)
vit_val_f1 = f1_score(y_val_true, vit_val_pred, average="macro")
print(f"ViT Validation F1 (macro): {vit_val_f1:.4f}")

# ViT on test (no labels, just predictions)
vit_test_probs = vit_best.predict(test_gen, verbose=0)
vit_test_pred = np.argmax(vit_test_probs, axis=1)

submission_vit = pd.DataFrame({
    "id": test_df["id"],
    "label": vit_test_pred
})
submission_vit_path = "submission_vit_only.csv"
submission_vit.to_csv(submission_vit_path, index=False)
print(f"Saved ViT-only submission to {submission_vit_path}")
print(submission_vit.head())


# -----------------------------------------
# 2) VGG ensemble: validate + test
# -----------------------------------------

print("\n=== VGG (CV models) ensemble on val + test ===")

# Adjust these paths if filenames are different
vgg_paths = [
    "vgg16_fine_tuned_best.keras"
]

def predict_ensemble(paths, generator):
    all_probs = None
    num_used = 0

    for path in paths:
        try:
            model = keras.models.load_model(path)
        except Exception as e:
            print(f"Could not load {path}: {e}")
            continue

        probs = model.predict(generator, verbose=0)
        if all_probs is None:
            all_probs = probs
        else:
            all_probs += probs

        num_used += 1

    if num_used == 0:
        raise ValueError("No models successfully loaded for ensemble.")

    all_probs /= num_used
    return all_probs

# VGG ensemble on val
vgg_val_probs = predict_ensemble(vgg_paths, val_gen)
vgg_val_pred = np.argmax(vgg_val_probs, axis=1)
vgg_val_f1 = f1_score(y_val_true, vgg_val_pred, average="macro")
print(f"VGG CV Ensemble Validation F1 (macro): {vgg_val_f1:.4f}")

# VGG ensemble on test
vgg_test_probs = predict_ensemble(vgg_paths, test_gen)
vgg_test_pred = np.argmax(vgg_test_probs, axis=1)

submission_vgg = pd.DataFrame({
    "id": test_df["id"],
    "label": vgg_test_pred
})
submission_vgg_path = "submission_vgg_ensemble.csv"
submission_vgg.to_csv(submission_vgg_path, index=False)
print(f"Saved VGG-ensemble submission to {submission_vgg_path}")
print(submission_vgg.head())


# -----------------------------------------
# 3) ViT + VGG ensemble: validate + test
# -----------------------------------------

print("\n=== ViT + VGG ensemble on val + test ===")

# Ensemble probabilities on validation
vit_vgg_val_probs = (vit_val_probs + vgg_val_probs) / 2.0
vit_vgg_val_pred = np.argmax(vit_vgg_val_probs, axis=1)
vit_vgg_val_f1 = f1_score(y_val_true, vit_vgg_val_pred, average="macro")
print(f"ViT + VGG Ensemble Validation F1 (macro): {vit_vgg_val_f1:.4f}")

# Ensemble probabilities on test
vit_vgg_test_probs = (vit_test_probs + vgg_test_probs) / 2.0
vit_vgg_test_pred = np.argmax(vit_vgg_test_probs, axis=1)

submission_vit_vgg = pd.DataFrame({
    "id": test_df["id"],
    "label": vit_vgg_test_pred
})
submission_vit_vgg_path = "submission_vit_vgg_ensemble.csv"
submission_vit_vgg.to_csv(submission_vit_vgg_path, index=False)
print(f"Saved ViT+VGG ensemble submission to {submission_vit_vgg_path}")
print(submission_vit_vgg.head())

# Step 9: Model Evaluation on Validation Set

In [None]:
    from google.colab import drive
    drive.mount('/content/drive')

In [None]:
    %cd /content/drive/My Drive/loadedModel/

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import cv2
import os
import albumentations as A
from sklearn.metrics import f1_score, classification_report
from sklearn.model_selection import StratifiedKFold # Needed for fold_splits definition
import keras_hub # For ViT models
import keras_cv # For build_swin_model if any .keras depends on it (safer to include)
from tensorflow import keras # Alias for tf.keras

# Define the base path for the dataset
dataset_base_path = '/content/cpe342-karena/public_dataset/task4'

# --- Albumentations Sequence for data loading ---
class AlbumentationsSequence(tf.keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, transform, shuffle=True, is_test=False):
        self.df = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.transform = transform
        self.shuffle = shuffle
        self.is_test = is_test
        self.indices = np.arange(len(self.df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_images = []
        batch_labels = []

        for i in batch_indices:
            row = self.df.iloc[i]
            file_name = row["file_name"]
            img_path = os.path.join(self.image_dir, file_name)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            if self.transform:
                augmented = self.transform(image=img)
                img = augmented["image"]

            batch_images.append(img)
            if not self.is_test:
                label = int(row["label"])
                batch_labels.append(label)

        batch_x = np.stack(batch_images, axis=0).astype("float32")

        if self.is_test:
            return batch_x
        else:
            batch_y = np.array(batch_labels, dtype="int32")
            return batch_x, batch_y

# --- Image transformations (consistent with previous cells) ---
imagenet_mean = (0.485, 0.456, 0.406)
imagenet_std  = (0.229, 0.224, 0.225)

# For VGG16 models
vgg_val_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=imagenet_mean, std=imagenet_std),
])

# For ViT models (assuming they expect [0,1] range as per their definition)
vit_val_transform = A.Compose([
    A.Resize(224, 224),
    A.ToFloat(max_value=255.0),
])

# --- Load dataframes ---
try:
    train_df = pd.read_csv(f'{dataset_base_path}/train.csv')
    val_df = pd.read_csv(f'{dataset_base_path}/val.csv')
    test_df = pd.read_csv(f'{dataset_base_path}/test_refined.csv')
    print("DataFrames loaded successfully.")
except FileNotFoundError as e:
    print(f"Error loading dataframes: {e}. Please ensure the dataset is unzipped and available.")
    raise

# Define image directories
train_image_dir = f'{dataset_base_path}/train'
val_image_dir = f'{dataset_base_path}/val'
test_image_dir = f'{dataset_base_path}/test'

# --- Global Constants ---
BATCH_SIZE = 32 # Can be adjusted for ViT evaluation if needed, but 32 is common.
NUM_CLASSES = 5

# --- Setup Main Validation Data Generator and True Labels ---
# Using val_df from the overall dataset for consistent evaluation across all models
main_val_gen_vgg = AlbumentationsSequence(
    val_df,
    val_image_dir,
    batch_size=BATCH_SIZE,
    transform=vgg_val_transform,
    shuffle=False
)
main_val_gen_vit = AlbumentationsSequence(
    val_df,
    val_image_dir,
    batch_size=BATCH_SIZE,
    transform=vit_val_transform,
    shuffle=False
)
main_y_true = val_df['label'].values
print("Main validation data generators (VGG-compatible and ViT-compatible) and true labels prepared.")

# --- Define evaluation function for single models ---
def evaluate_single_model(model_path, data_generator, true_labels, model_name="Model"):
    print(f"\n--- Evaluating {model_name} ({model_path}) ---")
    try:
        model = tf.keras.models.load_model(model_path)

        loss, accuracy = model.evaluate(data_generator, verbose=0)
        print(f"{model_name} Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")

        predictions_probs = model.predict(data_generator, verbose=0)
        predictions = np.argmax(predictions_probs, axis=1)

        f1_macro = f1_score(true_labels, predictions, average='macro', zero_division=0)
        print(f"{model_name} F1-Macro: {f1_macro:.4f}")

        print(f"\n{model_name} Classification Report:")
        print(classification_report(true_labels, predictions, zero_division=0))

    except Exception as e:
        print(f"Error evaluating {model_name} from {model_path}: {e}")

# --- Define ensemble prediction function ---
def ensemble_predict(model_paths, generator, model_type='vgg'):
    all_probs = None
    loaded_models = []
    print(f"Loading {len(model_paths)} models for ensembling (type: {model_type})...")

    # Determine which transform to use for the generator based on model_type
    current_generator = None
    if model_type == 'vgg':
        current_generator = AlbumentationsSequence(generator.df, generator.image_dir, generator.batch_size, vgg_val_transform, shuffle=False)
    elif model_type == 'vit':
        current_generator = AlbumentationsSequence(generator.df, generator.image_dir, generator.batch_size, vit_val_transform, shuffle=False)
    else: # For combined ensemble where individual model types might be mixed
        current_generator = generator # Assume generator is already correctly transformed

    for path in model_paths:
        try:
            m = tf.keras.models.load_model(path)
            loaded_models.append(m)
            print(f"Loaded model: {path}")
        except Exception as e:
            print(f"Error loading model {path}: {e}. Skipping this model.")

    if not loaded_models:
        raise ValueError("No models were successfully loaded for ensembling.")

    print(f"Making predictions with {len(loaded_models)} loaded models...")
    for i, m in enumerate(loaded_models):
        # Use current_generator for prediction
        probs = m.predict(current_generator, verbose=0)
        if all_probs is None:
            all_probs = probs
        else:
            all_probs += probs
    all_probs /= len(loaded_models)
    return all_probs

# --- Model Paths ---
single_vgg_initial = '/content/drive/My Drive/loadedModel/vgg16_initial_best.keras'
single_vgg_finetuned = '/content/drive/My Drive/loadedModel/vgg16_finetuned_best.keras'

cv_vgg_finetuned_paths = [
    '/content/drive/My Drive/loadedModel/vgg16_cv_fold_1_finetuned_best.keras',
    '/content/drive/My Drive/loadedModel/vgg16_cv_fold_2_finetuned_best.keras',
    '/content/drive/My Drive/loadedModel/vgg16_cv_fold_3_finetuned_best.keras',
]

single_vit_initial = '/content/drive/My Drive/loadedModel/vit_initial_best.keras' # Assuming this is from Phase 1 head training
single_vit_finetuned = '/content/drive/My Drive/loadedModel/vit_finetuned_best.keras' # Assuming this is from Phase 2 backbone fine-tuning

cv_vit_finetuned_paths = [
    '/content/drive/My Drive/loadedModel/vit_cv_fold_1_best.keras', # Assuming these are the best from CV folds, potentially already fine-tuned
    '/content/drive/My Drive/loadedModel/vit_cv_fold_2_best.keras',
    '/content/drive/My Drive/loadedModel/vit_cv_fold_3_best.keras',
]

# Aggregate all single model paths for iteration
model_paths_to_evaluate_single = [
    single_vgg_initial,
    single_vgg_finetuned,
    single_vit_initial,
    single_vit_finetuned
]

# Map friendly names to paths
model_names = {
    single_vgg_initial: 'VGG16 Initial (Frozen Head)',
    single_vgg_finetuned: 'VGG16 Fine-tuned (Single Model)',
    single_vit_initial: 'ViT Initial (Single Model - Head Trained)',
    single_vit_finetuned: 'ViT Fine-tuned (Single Model - Backbone Trained)',
    '/content/drive/My Drive/loadedModel/vgg16_cv_fold_1_finetuned_best.keras': 'VGG16 CV Fold 1 Fine-tuned',
    '/content/drive/My Drive/loadedModel/vgg16_cv_fold_2_finetuned_best.keras': 'VGG16 CV Fold 2 Fine-tuned',
    '/content/drive/My Drive/loadedModel/vgg16_cv_fold_3_finetuned_best.keras': 'VGG16 CV Fold 3 Fine-tuned',
    '/content/drive/My Drive/loadedModel/vit_cv_fold_1_best.keras': 'ViT CV Fold 1',
    '/content/drive/My Drive/loadedModel/vit_cv_fold_2_best.keras': 'ViT CV Fold 2',
    '/content/drive/My Drive/loadedModel/vit_cv_fold_3_best.keras': 'ViT CV Fold 3',
}

# --- Evaluate Single Models ---
print("\n===== Evaluating Individual Models =====")
for path in model_paths_to_evaluate_single:
    name = model_names.get(path, path)
    # Use the appropriate generator based on model type
    if 'vgg16' in name.lower():
        evaluate_single_model(path, main_val_gen_vgg, main_y_true, model_name=name)
    elif 'vit' in name.lower():
        evaluate_single_model(path, main_val_gen_vit, main_y_true, model_name=name)
    else:
        print(f"Warning: Could not determine generator type for {name}. Skipping.")

# --- Evaluate Ensemble Models ---
print("\n===== Evaluating Ensemble Models =====")

# VGG16 CV Ensemble
if cv_vgg_finetuned_paths:
    print("\n--- Evaluating VGG16 CV Ensemble ---")
    try:
        vgg_ensemble_probs = ensemble_predict(cv_vgg_finetuned_paths, main_val_gen_vgg, model_type='vgg')
        vgg_ensemble_pred = np.argmax(vgg_ensemble_probs, axis=1)
        vgg_ensemble_f1 = f1_score(main_y_true, vgg_ensemble_pred, average='macro', zero_division=0)
        print(f"VGG16 CV Ensemble F1-Macro: {vgg_ensemble_f1:.4f}")
        print("VGG16 CV Ensemble Classification Report:")
        print(classification_report(main_y_true, vgg_ensemble_pred, zero_division=0))
    except Exception as e:
        print(f"Error evaluating VGG16 CV Ensemble: {e}")
else:
    print("VGG16 CV Ensemble paths not found or empty. Skipping evaluation.")

# ViT CV Ensemble
if cv_vit_finetuned_paths:
    print("\n--- Evaluating ViT CV Ensemble ---")
    try:
        # For ViT ensemble, pass the vit-compatible generator
        vit_ensemble_probs = ensemble_predict(cv_vit_finetuned_paths, main_val_gen_vit, model_type='vit')
        vit_ensemble_pred = np.argmax(vit_ensemble_probs, axis=1)
        vit_ensemble_f1 = f1_score(main_y_true, vit_ensemble_pred, average='macro', zero_division=0)
        print(f"ViT CV Ensemble F1-Macro: {vit_ensemble_f1:.4f}")
        print("ViT CV Ensemble Classification Report:")
        print(classification_report(main_y_true, vit_ensemble_pred, zero_division=0))
    except Exception as e:
        print(f"Error evaluating ViT CV Ensemble: {e}")
else:
    print("ViT CV Ensemble paths not found or empty. Skipping evaluation.")

# --- Overall Ensemble (VGG16 CV Fine-tuned + ViT CV Fine-tuned) ---
# To combine, we need probabilities from both VGG and ViT models on the SAME validation set.
# We assume the VGG models are evaluated with vgg_val_transform and ViT models with vit_val_transform
# The final ensemble logic will need both sets of probabilities to be combined AFTER individual model predictions.

print("\n--- Evaluating Overall Ensemble (VGG16 CV + ViT CV) ---")
try:
    # Get probabilities from VGG ensemble
    vgg_combined_probs = None
    if cv_vgg_finetuned_paths:
        vgg_combined_probs = ensemble_predict(cv_vgg_finetuned_paths, main_val_gen_vgg, model_type='vgg')
    else:
        print("VGG16 CV Ensemble paths missing for combined ensemble.")

    # Get probabilities from ViT ensemble
    vit_combined_probs = None
    if cv_vit_finetuned_paths:
        vit_combined_probs = ensemble_predict(cv_vit_finetuned_paths, main_val_gen_vit, model_type='vit')
    else:
        print("ViT CV Ensemble paths missing for combined ensemble.")

    if vgg_combined_probs is not None and vit_combined_probs is not None:
        # Average the probabilities from the two ensembles
        overall_ensemble_probs = (vgg_combined_probs + vit_combined_probs) / 2.0
        overall_ensemble_pred = np.argmax(overall_ensemble_probs, axis=1)
        overall_ensemble_f1 = f1_score(main_y_true, overall_ensemble_pred, average='macro', zero_division=0)
        print(f"Overall Ensemble (VGG16 CV + ViT CV) F1-Macro: {overall_ensemble_f1:.4f}")
        print("Overall Ensemble (VGG16 CV + ViT CV) Classification Report:")
        print(classification_report(main_y_true, overall_ensemble_pred, zero_division=0))
    else:
        print("Skipping overall ensemble evaluation due to missing probabilities from sub-ensembles.")

except Exception as e:
    print(f"Error evaluating Overall Ensemble: {e}")


In [None]:
print("\n===== Evaluating Ensemble of Single Fine-tuned Models =====")
# --- Model Paths ---
single_vgg_finetuned_path = '/content/drive/My Drive/loadedModel/vgg16_fine_tuned_best.keras'
single_vit_finetuned_path = '/content/drive/My Drive/loadedModel/vit_finetuned_best.keras'

# --- Load Models ---
print("\nLoading individual models for ensemble evaluation...")
try:
    vgg_model = tf.keras.models.load_model(single_vgg_finetuned_path)
    print(f"Loaded VGG16 Fine-tuned model from {single_vgg_finetuned_path}")
except Exception as e:
    print(f"Error loading VGG16 Fine-tuned model: {e}")
    vgg_model = None

try:
    vit_model = tf.keras.models.load_model(single_vit_finetuned_path)
    print(f"Loaded ViT Fine-tuned model from {single_vit_finetuned_path}")
except Exception as e:
    print(f"Error loading ViT Fine-tuned model: {e}")
    vit_model = None

# --- Generate Predictions and Evaluate Ensemble ---
if vgg_model and vit_model:
    print("\nGenerating predictions from individual models...")
    vgg_val_probs = vgg_model.predict(main_val_gen_vgg, verbose=0)
    vit_val_probs = vit_model.predict(main_val_gen_vit, verbose=0)

    # Average probabilities for ensemble
    ensemble_val_probs = (vgg_val_probs + vit_val_probs) / 2.0
    ensemble_val_pred = np.argmax(ensemble_val_probs, axis=1)

    # Calculate ensemble metrics
    ensemble_f1_macro = f1_score(main_y_true, ensemble_val_pred, average='macro', zero_division=0)
    ensemble_accuracy = np.mean(ensemble_val_pred == main_y_true)

    print("\n--- Ensemble (VGG16 Fine-tuned + ViT Fine-tuned) Evaluation on Validation Set ---")
    print(f"Ensemble Accuracy: {ensemble_accuracy:.4f}")
    print(f"Ensemble F1-Macro: {ensemble_f1_macro:.4f}")
    print("\nEnsemble Classification Report:")
    print(classification_report(main_y_true, ensemble_val_pred, zero_division=0))
else:
    print("Skipping ensemble evaluation due to missing one or both models.")

print("\nEnsemble evaluation complete.")