In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, regularizers
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input as vgg_preprocess
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import os

# --- 1. Configuration ---
SAVE_DIR = './FER2013_Final_Optimized/'
os.makedirs(SAVE_DIR, exist_ok=True)

TRAIN_DIR = '/kaggle/input/fer2013/train'
TEST_DIR = '/kaggle/input/fer2013/test'
IMG_SIZE = (72, 72)  # Sweet spot between 48 and 96
BATCH_SIZE = 32
AUTOTUNE = tf.data.AUTOTUNE

print("="*70)
print("FER2013 OPTIMIZED ENSEMBLE - VGG16 + Multiple Custom CNNs")
print("="*70)

# --- 2. Load Data ---
train_ds = tf.keras.utils.image_dataset_from_directory(
    TRAIN_DIR, labels='inferred', label_mode='categorical',
    image_size=IMG_SIZE, interpolation='bilinear',
    batch_size=BATCH_SIZE, shuffle=True, color_mode='rgb', seed=42
)

val_ds = tf.keras.utils.image_dataset_from_directory(
    TEST_DIR, labels='inferred', label_mode='categorical',
    image_size=IMG_SIZE, interpolation='bilinear',
    batch_size=BATCH_SIZE, shuffle=False, color_mode='rgb', seed=42
)

class_names = train_ds.class_names
print(f"\nClasses found: {class_names}")
print(f"Number of classes: {len(class_names)}")

# --- 3. Class Weights ---
labels_iterator = train_ds.unbatch().map(lambda x, y: y).as_numpy_iterator()
all_labels = np.array([label for label in labels_iterator])
y_integers = np.argmax(all_labels, axis=1)
class_weights = compute_class_weight(
    class_weight='balanced', 
    classes=np.unique(y_integers), 
    y=y_integers
)
class_weights_dict = dict(enumerate(class_weights))
print(f"\nClass weights computed: {class_weights_dict}")

# --- 4. Advanced Augmentation ---
def get_strong_augmentation():
    return tf.keras.Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.2),
        layers.RandomZoom(0.2),
        layers.RandomContrast(0.25),
        layers.RandomBrightness(0.2),
        layers.RandomTranslation(0.15, 0.15),
    ], name="strong_augmentation")

def get_medium_augmentation():
    return tf.keras.Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.15),
        layers.RandomZoom(0.15),
        layers.RandomContrast(0.15),
    ], name="medium_augmentation")

strong_aug = get_strong_augmentation()
medium_aug = get_medium_augmentation()

# --- 5. Dataset Preparation ---
def prepare_dataset(ds, preprocessor, augmentation=None):
    ds = ds.map(lambda x, y: (tf.cast(x, tf.float32), y), num_parallel_calls=AUTOTUNE)
    if augmentation is not None:
        ds = ds.map(lambda x, y: (augmentation(x, training=True), y), num_parallel_calls=AUTOTUNE)
    ds = ds.map(lambda x, y: (preprocessor(x), y), num_parallel_calls=AUTOTUNE)
    return ds.prefetch(buffer_size=AUTOTUNE)

# Prepare datasets for VGG16
train_ds_vgg = prepare_dataset(train_ds, vgg_preprocess, medium_aug)
val_ds_vgg = prepare_dataset(val_ds, vgg_preprocess)

# Prepare datasets for Custom CNNs (with different augmentation levels)
train_ds_custom1 = prepare_dataset(train_ds, lambda x: x / 255.0, strong_aug)
val_ds_custom1 = prepare_dataset(val_ds, lambda x: x / 255.0)

train_ds_custom2 = prepare_dataset(train_ds, lambda x: x / 255.0, medium_aug)
val_ds_custom2 = prepare_dataset(val_ds, lambda x: x / 255.0)

# --- 6. Improved VGG16 Model ---
def create_improved_vgg16():
    """VGG16 with carefully tuned head to prevent fine-tuning collapse"""
    base_model = VGG16(
        input_shape=(72, 72, 3),
        include_top=False,
        weights='imagenet'
    )
    base_model.trainable = False  # Start frozen
    
    inputs = layers.Input(shape=(72, 72, 3))
    x = base_model(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    
    # Stronger regularization to prevent overfitting
    x = layers.Dense(1024, activation='relu', kernel_regularizer=regularizers.l2(0.002))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.6)(x)
    
    x = layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.002))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)
    
    x = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.Dropout(0.4)(x)
    
    outputs = layers.Dense(len(class_names), activation='softmax')(x)
    
    return Model(inputs, outputs, name='VGG16_Improved')

# --- 7. Custom CNN Architecture 1: Deep with Attention ---
class ChannelAttention(layers.Layer):
    """Squeeze-and-Excitation block for channel attention"""
    def __init__(self, ratio=8, **kwargs):
        super(ChannelAttention, self).__init__(**kwargs)
        self.ratio = ratio
        
    def build(self, input_shape):
        channels = input_shape[-1]
        self.shared_dense_one = layers.Dense(channels // self.ratio, activation='relu')
        self.shared_dense_two = layers.Dense(channels, activation='sigmoid')
        
    def call(self, inputs):
        avg_pool = tf.reduce_mean(inputs, axis=[1, 2], keepdims=True)
        max_pool = tf.reduce_max(inputs, axis=[1, 2], keepdims=True)
        
        avg_out = self.shared_dense_two(self.shared_dense_one(avg_pool))
        max_out = self.shared_dense_two(self.shared_dense_one(max_pool))
        
        attention = avg_out + max_out
        return inputs * attention

def create_custom_cnn_with_attention():
    """Custom CNN with channel attention - Architecture 1"""
    inputs = layers.Input(shape=(72, 72, 3))
    
    # Block 1
    x = layers.Conv2D(64, (3, 3), padding='same', kernel_initializer='he_normal')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(64, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = ChannelAttention()(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.25)(x)
    
    # Block 2
    x = layers.Conv2D(128, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(128, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = ChannelAttention()(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.3)(x)
    
    # Block 3
    x = layers.Conv2D(256, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(256, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = ChannelAttention()(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.4)(x)
    
    # Block 4
    x = layers.Conv2D(512, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = ChannelAttention()(x)
    x = layers.Dropout(0.5)(x)
    
    # Head
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(1024, activation='relu', kernel_regularizer=regularizers.l2(0.002))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.6)(x)
    x = layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(len(class_names), activation='softmax')(x)
    
    return Model(inputs, outputs, name='CustomCNN_Attention')

# --- 8. Custom CNN Architecture 2: Wide and Deep ---
def create_custom_cnn_wide():
    """Wider custom CNN - Architecture 2"""
    inputs = layers.Input(shape=(72, 72, 3))
    
    # Initial block - wider filters
    x = layers.Conv2D(96, (7, 7), strides=2, padding='same', kernel_initializer='he_normal')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((3, 3), strides=2, padding='same')(x)
    x = layers.Dropout(0.2)(x)
    
    # Block 1
    x = layers.Conv2D(192, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(192, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.3)(x)
    
    # Block 2
    x = layers.Conv2D(384, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(384, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dropout(0.4)(x)
    
    # Block 3
    x = layers.Conv2D(512, (3, 3), padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dropout(0.5)(x)
    
    # Head
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(1536, activation='relu', kernel_regularizer=regularizers.l2(0.002))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.6)(x)
    x = layers.Dense(768, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(len(class_names), activation='softmax')(x)
    
    return Model(inputs, outputs, name='CustomCNN_Wide')

# --- 9. Careful Training Function for VGG16 ---
def train_vgg16_carefully(model, train_ds, val_ds):
    """Conservative training to prevent fine-tuning collapse"""
    print(f"\n{'='*70}")
    print("TRAINING VGG16 WITH CAREFUL STRATEGY")
    print(f"{'='*70}")
    
    model_path = os.path.join(SAVE_DIR, 'best_VGG16_Careful.keras')
    
    checkpoint = ModelCheckpoint(
        model_path, monitor='val_accuracy',
        save_best_only=True, mode='max', verbose=1
    )
    early_stop = EarlyStopping(
        monitor='val_loss', patience=15,
        restore_best_weights=True, verbose=1
    )
    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss', factor=0.5,
        patience=5, min_lr=1e-7, verbose=1
    )
    
    # Phase 1: Feature extraction with frozen base
    print("\n--- Phase 1: Feature Extraction (Frozen Base) ---")
    model.compile(
        optimizer=Adam(learning_rate=1e-3),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    history1 = model.fit(
        train_ds, validation_data=val_ds,
        epochs=50, class_weight=class_weights_dict,
        callbacks=[checkpoint, early_stop, reduce_lr],
        verbose=1
    )
    
    # Phase 2: Very gentle fine-tuning of only top layers
    model.load_weights(model_path)
    print("\n--- Phase 2: Gentle Fine-Tuning (Top 10 Layers) ---")
    
    base_model = model.layers[1]
    base_model.trainable = True
    
    # Freeze all but the last 10 layers
    for layer in base_model.layers[:-10]:
        layer.trainable = False
    
    print(f"Trainable layers: {sum([1 for layer in base_model.layers if layer.trainable])}")
    
    # Very low learning rate for fine-tuning
    model.compile(
        optimizer=Adam(learning_rate=5e-6),  # Very conservative
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    history2 = model.fit(
        train_ds, validation_data=val_ds,
        epochs=30, class_weight=class_weights_dict,
        callbacks=[checkpoint, early_stop, reduce_lr],
        verbose=1
    )
    
    model.load_weights(model_path)
    return model

# --- 10. Standard Training Function for Custom CNNs ---
def train_custom_cnn(model, train_ds, val_ds, model_name, epochs=100):
    """Standard training for custom CNNs"""
    print(f"\n{'='*70}")
    print(f"TRAINING {model_name}")
    print(f"{'='*70}")
    
    model_path = os.path.join(SAVE_DIR, f'best_{model_name}.keras')
    
    checkpoint = ModelCheckpoint(
        model_path, monitor='val_accuracy',
        save_best_only=True, mode='max', verbose=1
    )
    early_stop = EarlyStopping(
        monitor='val_loss', patience=20,
        restore_best_weights=True, verbose=1
    )
    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss', factor=0.5,
        patience=7, min_lr=1e-7, verbose=1
    )
    
    model.compile(
        optimizer=Adam(learning_rate=1e-3),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    history = model.fit(
        train_ds, validation_data=val_ds,
        epochs=epochs, class_weight=class_weights_dict,
        callbacks=[checkpoint, early_stop, reduce_lr],
        verbose=1
    )
    
    model.load_weights(model_path)
    return model

# --- 11. TRAIN ALL MODELS ---
print("\n" + "="*70)
print("STARTING TRAINING PIPELINE")
print("="*70)

# Train VGG16
vgg_model = create_improved_vgg16()
vgg_model = train_vgg16_carefully(vgg_model, train_ds_vgg, val_ds_vgg)

# Train Custom CNN with Attention
custom_model_1 = create_custom_cnn_with_attention()
custom_model_1 = train_custom_cnn(
    custom_model_1, train_ds_custom1, val_ds_custom1,
    'CustomCNN_Attention', epochs=120
)

# Train Wide Custom CNN
custom_model_2 = create_custom_cnn_wide()
custom_model_2 = train_custom_cnn(
    custom_model_2, train_ds_custom2, val_ds_custom2,
    'CustomCNN_Wide', epochs=120
)

# --- 12. CREATE FINAL ENSEMBLE ---
print("\n" + "="*70)
print("CREATING FINAL ENSEMBLE MODEL")
print("="*70)

# Load best weights
vgg_model = tf.keras.models.load_model(
    os.path.join(SAVE_DIR, 'best_VGG16_Careful.keras')
)
custom_model_1 = tf.keras.models.load_model(
    os.path.join(SAVE_DIR, 'best_CustomCNN_Attention.keras'),
    custom_objects={'ChannelAttention': ChannelAttention}
)
custom_model_2 = tf.keras.models.load_model(
    os.path.join(SAVE_DIR, 'best_CustomCNN_Wide.keras')
)

# Build ensemble
input_layer = layers.Input(shape=(72, 72, 3))

# VGG16 branch (40% weight - best performer)
vgg_preprocessed = vgg_preprocess(input_layer)
vgg_output = vgg_model(vgg_preprocessed)

# Custom CNN 1 branch (30% weight)
custom1_preprocessed = input_layer / 255.0
custom1_output = custom_model_1(custom1_preprocessed)

# Custom CNN 2 branch (30% weight)
custom2_preprocessed = input_layer / 255.0
custom2_output = custom_model_2(custom2_preprocessed)

# Weighted ensemble
ensemble_output = layers.Average()([
    layers.Lambda(lambda x: x * 0.40)(vgg_output),
    layers.Lambda(lambda x: x * 0.30)(custom1_output),
    layers.Lambda(lambda x: x * 0.30)(custom2_output)
])

final_ensemble = Model(
    inputs=input_layer,
    outputs=ensemble_output,
    name='Final_Optimized_Ensemble'
)
final_ensemble.compile(loss='categorical_crossentropy', metrics=['accuracy'])

# Save ensemble
ensemble_path = os.path.join(SAVE_DIR, 'Final_Ensemble_VGG_Custom.keras')
final_ensemble.save(ensemble_path)
print(f"\nâœ… Ensemble saved: {ensemble_path}")

# --- 13. COMPREHENSIVE EVALUATION ---
print("\n" + "="*70)
print("FINAL EVALUATION")
print("="*70)

# Prepare validation data
val_ds_eval = val_ds.map(lambda x, y: (tf.cast(x, tf.float32), y))

# Individual model predictions
print("\nðŸ“Š Evaluating individual models...")
vgg_val_ds = val_ds.map(lambda x, y: (vgg_preprocess(tf.cast(x, tf.float32)), y))
custom_val_ds = val_ds.map(lambda x, y: (tf.cast(x, tf.float32) / 255.0, y))

vgg_loss, vgg_acc = vgg_model.evaluate(vgg_val_ds, verbose=0)
custom1_loss, custom1_acc = custom_model_1.evaluate(custom_val_ds, verbose=0)
custom2_loss, custom2_acc = custom_model_2.evaluate(custom_val_ds, verbose=0)

print(f"\n Individual Model Accuracies:")
print(f"  VGG16:              {vgg_acc*100:.2f}%")
print(f"  Custom CNN 1:       {custom1_acc*100:.2f}%")
print(f"  Custom CNN 2:       {custom2_acc*100:.2f}%")

# Ensemble predictions
ensemble_preds_probs = final_ensemble.predict(val_ds_eval, verbose=1)
ensemble_preds = np.argmax(ensemble_preds_probs, axis=1)

# True labels
y_true = np.concatenate([y for x, y in val_ds], axis=0)
y_true_labels = np.argmax(y_true, axis=1)

# Accuracy
ensemble_accuracy = np.mean(ensemble_preds == y_true_labels)

print("\n" + "="*70)
print(f"ðŸŽ¯ FINAL ENSEMBLE ACCURACY: {ensemble_accuracy * 100:.2f}%")
print("="*70)

# Classification report
print("\nðŸ“Š Detailed Classification Report:")
print(classification_report(
    y_true_labels, ensemble_preds,
    target_names=class_names, digits=4
))

# Confusion matrix
cm = confusion_matrix(y_true_labels, ensemble_preds)
plt.figure(figsize=(14, 12))
sns.heatmap(
    cm, annot=True, fmt='d', cmap='RdYlGn',
    xticklabels=class_names, yticklabels=class_names,
    cbar_kws={'label': 'Count'}
)
plt.title(
    f'Final Ensemble Confusion Matrix\nAccuracy: {ensemble_accuracy*100:.2f}%',
    fontsize=16, fontweight='bold', pad=20
)
plt.ylabel('True Label', fontsize=13, fontweight='bold')
plt.xlabel('Predicted Label', fontsize=13, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig(
    os.path.join(SAVE_DIR, 'final_confusion_matrix.png'),
    dpi=300, bbox_inches='tight'
)
plt.show()

# Per-class accuracy
print("\nðŸ“ˆ Per-Class Accuracy:")
per_class_acc = cm.diagonal() / cm.sum(axis=1)
for class_name, acc in zip(class_names, per_class_acc):
    print(f"  {class_name:15s}: {acc*100:5.2f}%")

# Comparison plot
plt.figure(figsize=(10, 6))
models = ['VGG16', 'Custom CNN\n(Attention)', 'Custom CNN\n(Wide)', 'ENSEMBLE']
accuracies = [vgg_acc*100, custom1_acc*100, custom2_acc*100, ensemble_accuracy*100]
colors = ['#3498db', '#e74c3c', '#9b59b6', '#2ecc71']

bars = plt.bar(models, accuracies, color=colors, edgecolor='black', linewidth=1.5)
plt.ylabel('Accuracy (%)', fontsize=12, fontweight='bold')
plt.title('Model Performance Comparison', fontsize=14, fontweight='bold', pad=15)
plt.ylim([0, 100])
plt.grid(axis='y', alpha=0.3, linestyle='--')

for bar, acc in zip(bars, accuracies):
    height = bar.get_height()
    plt.text(
        bar.get_x() + bar.get_width()/2., height,
        f'{acc:.2f}%', ha='center', va='bottom',
        fontweight='bold', fontsize=11
    )

plt.tight_layout()
plt.savefig(
    os.path.join(SAVE_DIR, 'model_comparison.png'),
    dpi=300, bbox_inches='tight'
)
plt.show()

print(f"\nâœ… All models and visualizations saved to: {SAVE_DIR}")
print("="*70)
print("ðŸŽ‰ TRAINING COMPLETE!")
print("="*70)