In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!ls /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data

fer_dataset  test  train  val


In [None]:
BASE_PATH = "/content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data"

In [None]:
# ============================
# 0. Imports & basic settings
# ============================

import os
import random
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import mixed_precision
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard

from sklearn.metrics import classification_report, confusion_matrix


In [None]:
from tensorflow.keras.layers import (
    Dense,
    GlobalAveragePooling2D,
    Input,
    Dropout,
    BatchNormalization
)

In [None]:
# ============================
# 1. GPU & mixed precision
# ============================

# See GPUs
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("✅ GPUs found:", gpus)
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
else:
    print("⚠️ No GPU found, running on CPU.")

# Mixed precision (good for modern NVIDIA GPUs)
mixed_precision.set_global_policy('mixed_float16')
print("Mixed precision policy:", mixed_precision.global_policy())


⚠️ No GPU found, running on CPU.
Mixed precision policy: <DTypePolicy "mixed_float16">


In [None]:
import os

def explore(path):
    print(f"\n=== {path} ===")
    if not os.path.exists(path):
        print("Path does not exist.")
        return

    emotions = sorted(os.listdir(path))
    total = 0

    for emotion in emotions:
        emotion_path = os.path.join(path, emotion)
        if os.path.isdir(emotion_path):
            files = [
                f for f in os.listdir(emotion_path)
                if f.lower().endswith(('.jpg', '.jpeg', '.png'))
            ]
            count = len(files)
            total += count
            print(f"{emotion}: {count} images")

    print(f"Total images: {total}")
    print(f"Classes: {len(emotions)}")
import time
import functools
import inspect
import time
import functools
import inspect


def timeit(label: str | None = None, precision: int = 4):
    """
    Decorator to measure execution time of a function.

    Supports both sync and async functions.

    Parameters
    ----------
    label : str | None
        Optional label to show before the timing.
    precision : int
        Number of decimal places for seconds.

    Example
    -------
    @timeit("copy operation")
    def copy():
        ...

    @timeit()
    async def run_async():
        ...
    """

    def decorator(func):
        is_coroutine = inspect.iscoroutinefunction(func)

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            start = time.perf_counter()
            result = func(*args, **kwargs)
            end = time.perf_counter()
            name = label or func.__name__
            print(f"[TIME] {name} took {end - start:.{precision}f}s")
            return result

        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            start = time.perf_counter()
            result = await func(*args, **kwargs)
            end = time.perf_counter()
            name = label or func.__name__
            print(f"[TIME] {name} took {end - start:.{precision}f}s")
            return result

        return async_wrapper if is_coroutine else sync_wrapper

    return decorator


In [None]:
import os
import shutil
import time

# -------------------
# PATH CONFIG
# -------------------


@timeit()
def copy_folder(src, dst):
    if os.path.exists(dst):
        print(f"[INFO] Data already exists at {dst}. Skipping copy.")
        return

    print(f"[INFO] Copying from {src} to {dst}...")
    print("       This will take 10-15 mins but will save hours of training time.")
    start = time.time()

    # shutil.copytree is cleaner than !cp
    shutil.copytree(src, dst)

    end = time.time()
    print(f"[SUCCESS] Copied in {(end-start)/60:.2f} minutes.")

import os
import shutil
import time
from pathlib import Path
from typing import Union, List
from concurrent.futures import ThreadPoolExecutor, as_completed

try:
    from tqdm import tqdm
except ImportError:
    tqdm = None  # Fallback if tqdm is not installed


def _fast_copy_file(src: Path, dst: Path, buffer_size: int = 1024 * 1024) -> None:
    """Copy a single file with buffered I/O (good for large files)."""
    dst.parent.mkdir(parents=True, exist_ok=True)
    with open(src, "rb") as fsrc, open(dst, "wb") as fdst:
        while True:
            buf = fsrc.read(buffer_size)
            if not buf:
                break
            fdst.write(buf)

@timeit()
def copy_folder_fast(
    src: Union[str, Path],
    dst: Union[str, Path],
    overwrite: bool = False,
    workers: int | None = None,
    verbose: bool = True,
    buffer_size: int = 1024 * 1024,
) -> None:
    """
    High-performance folder copy with:

    - Multithreaded copying
    - Progress bar (if `tqdm` is installed)
    - Auto-detected number of workers if not provided

    Parameters
    ----------
    src : str | Path
        Source directory.
    dst : str | Path
        Destination directory.
    overwrite : bool
        If True, delete existing dst before copying.
    workers : int | None
        Number of threads to use. If None → auto-detect from CPU count.
    verbose : bool
        Print status messages.
    buffer_size : int
        Buffer size (bytes) for file copy.
    """
    src = Path(src)
    dst = Path(dst)

    if not src.exists():
        raise FileNotFoundError(f"[ERROR] Source not found: {src}")
    if not src.is_dir():
        raise NotADirectoryError(f"[ERROR] Source is not a directory: {src}")

    # Destination handling
    if dst.exists():
        if overwrite:
            if verbose:
                print(f"[WARN] Destination {dst} exists, removing (overwrite=True).")
            shutil.rmtree(dst)
        else:
            if verbose:
                print(f"[INFO] Destination {dst} already exists. Skipping.")
            return

    # Auto-detect workers
    if workers is None:
        cpu_count = os.cpu_count() or 4
        # A reasonable heuristic for I/O bound work:
        workers = min(32, cpu_count * 2)
        if verbose:
            print(f"[INFO] Auto-selected workers={workers} (cpu_count={cpu_count}).")

    # Collect files
    if verbose:
        print(f"[INFO] Scanning files in {src}...")
    files: List[Path] = [p for p in src.rglob("*") if p.is_file()]
    num_files = len(files)

    if num_files == 0:
        if verbose:
            print(f"[INFO] No files found in {src}. Nothing to copy.")
        return

    if verbose:
        print(f"[INFO] Found {num_files} files to copy.")
        print(f"[INFO] Copying from {src} → {dst} using {workers} threads...")

    start = time.time()

    progress_bar = None
    if tqdm is not None and verbose:
        progress_bar = tqdm(total=num_files, unit="file", desc="Copying")

    try:
        with ThreadPoolExecutor(max_workers=workers) as executor:
            futures = []
            for f in files:
                rel = f.relative_to(src)
                dst_file = dst / rel
                futures.append(
                    executor.submit(_fast_copy_file, f, dst_file, buffer_size)
                )

            for _ in as_completed(futures):
                if progress_bar is not None:
                    progress_bar.update(1)

    finally:
        if progress_bar is not None:
            progress_bar.close()

    elapsed = time.time() - start
    if verbose:
        print(f"[SUCCESS] Copied {num_files} files in {elapsed:.2f}s ({elapsed/60:.2f} min).")
        print(f"[INFO] From: {src}")
        print(f"[INFO] To:   {dst}")


In [None]:
# ============================
# 2. Paths & hyperparameters
# ============================
# Your Shared Drive Paths
DRIVE_TEST   = f'{BASE_PATH}/test'
DRIVE_TRAIN = f'{BASE_PATH}/train'
DRIVE_VAL   = f'{BASE_PATH}/val'

# Local VM Paths (Fast SSD)
LOCAL_TEST = '/content/data/test'
LOCAL_TRAIN = '/content/data/train'
LOCAL_VAL   = '/content/data/val'


copy_folder_fast(DRIVE_TRAIN, LOCAL_TRAIN)
copy_folder_fast(DRIVE_VAL, LOCAL_VAL)
copy_folder_fast(DRIVE_TEST, LOCAL_TEST)


[INFO] Auto-selected workers=4 (cpu_count=2).
[INFO] Scanning files in /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/train...
[INFO] Found 29008 files to copy.
[INFO] Copying from /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/train → /content/data/train using 4 threads...


Copying: 100%|██████████| 29008/29008 [33:18<00:00, 14.51file/s]


[SUCCESS] Copied 29008 files in 1998.62s (33.31 min).
[INFO] From: /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/train
[INFO] To:   /content/data/train
[TIME] copy_folder_fast took 2059.2548s
[INFO] Auto-selected workers=4 (cpu_count=2).
[INFO] Scanning files in /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/val...
[INFO] Found 6216 files to copy.
[INFO] Copying from /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/val → /content/data/val using 4 threads...


Copying: 100%|██████████| 6216/6216 [06:51<00:00, 15.12file/s]


[SUCCESS] Copied 6216 files in 411.10s (6.85 min).
[INFO] From: /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/val
[INFO] To:   /content/data/val
[TIME] copy_folder_fast took 425.9078s
[INFO] Auto-selected workers=4 (cpu_count=2).
[INFO] Scanning files in /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/test...
[INFO] Found 6216 files to copy.
[INFO] Copying from /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/test → /content/data/test using 4 threads...


Copying: 100%|██████████| 6216/6216 [06:59<00:00, 14.83file/s]

[SUCCESS] Copied 6216 files in 419.25s (6.99 min).
[INFO] From: /content/drive/MyDrive/Project_2_mnt/Project_2/data/proccessed_data/test
[INFO] To:   /content/data/test
[TIME] copy_folder_fast took 430.7404s





In [None]:



IMG_SIZE = 224          # ResNet50 expects 224x224
BATCH_SIZE = 64
VAL_SPLIT = 0.2
EPOCHS = 30  # you can increase later

# Reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)


In [None]:
# ============================
# 3. Data generators
# ============================

train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.1,
    horizontal_flip=True,
    validation_split=VAL_SPLIT
)

train_generator = train_datagen.flow_from_directory(
    LOCAL_TRAIN,
    target_size=(IMG_SIZE, IMG_SIZE),
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    class_mode="categorical",
    subset="training",
    seed=SEED
)

val_generator = train_datagen.flow_from_directory(
    LOCAL_VAL,
    target_size=(IMG_SIZE, IMG_SIZE),
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    class_mode="categorical",
    subset="validation",
    seed=SEED
)

test_datagen = ImageDataGenerator(rescale=1./255)

test_generator = test_datagen.flow_from_directory(
    LOCAL_TEST,
    target_size=(IMG_SIZE, IMG_SIZE),
    color_mode="rgb",
    batch_size=BATCH_SIZE,
    class_mode="categorical",
    shuffle=False
)

num_classes = train_generator.num_classes
class_indices = train_generator.class_indices
idx_to_class = {v: k for k, v in class_indices.items()}

print("Class indices:", class_indices)
print("idx_to_class:", idx_to_class)


Found 23212 images belonging to 7 classes.
Found 1239 images belonging to 7 classes.
Found 6216 images belonging to 7 classes.
Class indices: {'angry': 0, 'disgust': 1, 'fear': 2, 'happy': 3, 'neutral': 4, 'sad': 5, 'surprise': 6}
idx_to_class: {0: 'angry', 1: 'disgust', 2: 'fear', 3: 'happy', 4: 'neutral', 5: 'sad', 6: 'surprise'}


In [None]:
# ============================
# 4. Build ResNet50 model
# ============================

def build_resnet50_emotion_model(num_classes):
    inputs = Input(shape=(IMG_SIZE, IMG_SIZE, 3))

    base_model = ResNet50(
        include_top=False,
        weights="imagenet",
        input_tensor=inputs
    )

    # 1️⃣ Freeze the base model at first
    for layer in base_model.layers:
        layer.trainable = False

    x = base_model.output
    x = GlobalAveragePooling2D()(x)

    # 2️⃣ Stronger classifier head
    x = Dense(512, activation="relu")(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)

    x = Dense(256, activation="relu")(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)

    # 3️⃣ Final output (float32 because of mixed precision)
    outputs = Dense(num_classes, activation="softmax", dtype="float32")(x)

    model = Model(inputs, outputs)
    return model


In [None]:
# ============================
# 5. Strategy & compile
# ============================

strategy = tf.distribute.MirroredStrategy()
print("Replicas in sync:", strategy.num_replicas_in_sync)

with strategy.scope():
    model = build_resnet50_emotion_model(num_classes=num_classes)

    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

    model.compile(
        optimizer=optimizer,
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )


model.summary()


Replicas in sync: 1
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [None]:
# ============================
# 6. Callbacks
# ============================

CHECKPOINT_PATH = "best_resnet50_fer2013.keras"
LOG_DIR = "logs_resnet50"

checkpoint_cb = ModelCheckpoint(
    CHECKPOINT_PATH,
    monitor="val_accuracy",
    save_best_only=True,
    mode="max",
    verbose=1
)

earlystop_cb = EarlyStopping(
    monitor="val_loss",
    patience=7,
    restore_best_weights=True,
    verbose=1
)

reduce_lr_cb = ReduceLROnPlateau(
    monitor="val_loss",
    factor=0.5,
    patience=3,
    min_lr=1e-6,
    verbose=1
)

tensorboard_cb = TensorBoard(
    log_dir=LOG_DIR,
    histogram_freq=1
)

callbacks = [checkpoint_cb, earlystop_cb, reduce_lr_cb, tensorboard_cb]


In [None]:
# ============================
# 7. Training (frozen backbone)
# ============================

history = model.fit(
    train_generator,
    epochs=EPOCHS,
    validation_data=val_generator,
    callbacks=callbacks
)


  self._warn_if_super_not_called()


In [None]:
# Unfreeze last N layers (e.g. 40)
N = 40
for layer in model.layers[-N:]:
    if not isinstance(layer, Dense):  # keep head trainable anyway
        layer.trainable = True

with strategy.scope():
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

history_finetune = model.fit(
    train_generator,
    epochs=10,
    validation_data=val_generator,
    callbacks=callbacks
)

In [None]:
# ============================
# 9. Training curves
# ============================

def plot_history(h, title_prefix=""):
    plt.figure()
    plt.plot(h.history['accuracy'], label='train_acc')
    plt.plot(h.history['val_accuracy'], label='val_acc')
    plt.title(f'{title_prefix} Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

    plt.figure()
    plt.plot(h.history['loss'], label='train_loss')
    plt.plot(h.history['val_loss'], label='val_loss')
    plt.title(f'{title_prefix} Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

plot_history(history, "ResNet50 (frozen)")
if 'history_finetune' in globals():
    plot_history(history_finetune, "ResNet50 (fine-tune)")


In [None]:
# ============================
# 10. Evaluation on test set
# ============================

from tensorflow.keras.models import load_model

best_model = load_model(CHECKPOINT_PATH)

test_loss, test_acc = best_model.evaluate(test_generator)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")


In [None]:
# ============================
# 11. Save model & TFLite
# ============================

best_model.save("final_resnet50_fer2013.keras")
print("Saved Keras model as final_resnet50_fer2013.keras")

# TFLite export
converter = tf.lite.TFLiteConverter.from_keras_model(best_model)
tflite_model = converter.convert()

with open("resnet50_fer2013.tflite", "wb") as f:
    f.write(tflite_model)

print("Saved TFLite model as resnet50_fer2013.tflite")
