In [2]:
# ============================================================
# Project 9 — Build fer2013_small from folder splits:
# dataset/train/<class>/* and dataset/test/<class>/*
# Saves: fer2013_small.npz (X: float32 [0,1], y: int64), fer2013_small.csv
# ============================================================

import os
from pathlib import Path
import random
import csv
from collections import defaultdict, Counter

import numpy as np
from PIL import Image

# 1) Parameters
DATASET_DIR = Path("dataset")      # contains train/ and test/
OUTPUT_PREFIX = "fer2013_small"    # -> fer2013_small.npz / fer2013_small.csv
TARGET_TOTAL = 9000                # ≈9000 total; balanced across 7 classes
RANDOM_SEED = 42
IMG_SIZE = (48, 48)                # FER2013 uses 48x48 grayscale
VALID_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}

# FER2013 label convention:
# 0: Angry, 1: Disgust, 2: Fear, 3: Happy, 4: Sad, 5: Surprise, 6: Neutral
LABEL_MAP = {
    "angry": 0,
    "disgust": 1,
    "fear": 2,
    "happy": 3,
    "sad": 4,
    "surprise": 5,
    "neutral": 6,
}

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

# 2) Gather files from BOTH train/ and test/ per class
def collect_files(root: Path):
    files = []
    if not root.exists():
        return files
    for p in root.rglob("*"):
        if p.is_file() and p.suffix.lower() in VALID_EXTS:
            files.append(p)
    return files

files_by_class = defaultdict(list)
for split in ["train", "test"]:
    split_dir = DATASET_DIR / split
    for cls_name in LABEL_MAP.keys():
        cls_dir = split_dir / cls_name
        if not cls_dir.exists():
            continue
        for p in cls_dir.rglob("*"):
            if p.is_file() and p.suffix.lower() in VALID_EXTS:
                files_by_class[cls_name].append(p)

# Basic report
for cls_name in LABEL_MAP:
    print(f"{cls_name:>8}:", len(files_by_class.get(cls_name, [])))
total_found = sum(len(v) for v in files_by_class.values())
print("Total images discovered:", total_found)
assert total_found > 0, "No images found. Please check dataset/train|test/<class> structure."

# 3) Balanced downsampling (pool train+test, ignore original split)
target_per_class = max(1, TARGET_TOTAL // len(LABEL_MAP))
print("Target per class ≈", target_per_class)

sampled_paths = []
for cls_name in LABEL_MAP:
    paths = files_by_class.get(cls_name, [])
    if len(paths) == 0:
        continue
    if len(paths) <= target_per_class:
        chosen = paths[:]  # keep all if fewer than target
    else:
        chosen = random.sample(paths, target_per_class)
    sampled_paths.extend(chosen)

# Shuffle globally to remove any ordering effects
random.shuffle(sampled_paths)
print("Sampled images:", len(sampled_paths))

# 4) Load → grayscale → resize to 48x48 → normalize to [0,1]
def load_as_gray48(img_path: Path):
    with Image.open(img_path) as im:
        im = im.convert("L")
        if im.size != IMG_SIZE:
            im = im.resize(IMG_SIZE, resample=Image.BILINEAR)
        arr = np.asarray(im, dtype=np.float32) / 255.0
    return arr

X_list, y_list, keep_paths = [], [], []
skipped = 0

for p in sampled_paths:
    # infer class name from immediate parent (or ancestors, just in case)
    cls_name = p.parent.name.lower()
    if cls_name not in LABEL_MAP:
        found = False
        for anc in p.parents:
            if anc.name.lower() in LABEL_MAP:
                cls_name = anc.name.lower()
                found = True
                break
        if not found:
            print(f"Skipping (unknown class folder): {p}")
            skipped += 1
            continue

    try:
        arr = load_as_gray48(p)
        X_list.append(arr)
        y_list.append(LABEL_MAP[cls_name])
        # store path relative to DATASET_DIR for traceability
        keep_paths.append(str(p.relative_to(DATASET_DIR)))
    except Exception as e:
        print(f"Skipping corrupt/unreadable image: {p} ({e})")
        skipped += 1

X = np.stack(X_list, axis=0) if X_list else np.empty((0, 48, 48), dtype=np.float32)
y = np.array(y_list, dtype=np.int64)

print("Loaded X shape:", X.shape, "y shape:", y.shape, "skipped:", skipped)
if X.size:
    print("Pixel range:", float(X.min()), "→", float(X.max()))

# 5) Final class distribution
from collections import Counter
dist = Counter(y.tolist())
print("Class distribution (label_id: count):", dict(dist))

# 6) Save NPZ (X in [0,1], y in {0..6})
np.savez_compressed(f"{OUTPUT_PREFIX}.npz", X=X.astype(np.float32), y=y)
print(f"Saved: {Path(OUTPUT_PREFIX + '.npz').resolve()}")

# 7) Save CSV manifest (emotion id + relative filepath)
with open(f"{OUTPUT_PREFIX}.csv", "w", newline="") as f:
    w = csv.writer(f)
    w.writerow(["emotion", "filepath"])
    for lbl, relp in zip(y, keep_paths):
        w.writerow([lbl, relp])
print(f"Saved: {Path(OUTPUT_PREFIX + '.csv').resolve()}")

# 8) Reminder for NB01 wording
print("\nNB01 note:")
print("> We use a downsampled and balanced subset of FER2013 prepared beforehand "
      "to keep training under five minutes on CPU. Images are grayscale 48x48 across seven emotions. "
      "We load it from 'fer2013_small.npz' (X in [0,1], y in {0..6}).")


   angry: 4953
 disgust: 547
    fear: 5121
   happy: 8989
     sad: 6077
surprise: 4002
 neutral: 6198
Total images discovered: 35887
Target per class ≈ 1285
Sampled images: 8257
Loaded X shape: (8257, 48, 48) y shape: (8257,) skipped: 0
Pixel range: 0.0 → 1.0
Class distribution (label_id: count): {5: 1285, 4: 1285, 0: 1285, 2: 1285, 3: 1285, 1: 547, 6: 1285}
Saved: D:\OneDrive\WorldQuant_DeepLearning\Nomans_Work_DL_WQU\Project_09\fer2013_small.npz
Saved: D:\OneDrive\WorldQuant_DeepLearning\Nomans_Work_DL_WQU\Project_09\fer2013_small.csv

NB01 note:
> We use a downsampled and balanced subset of FER2013 prepared beforehand to keep training under five minutes on CPU. Images are grayscale 48x48 across seven emotions. We load it from 'fer2013_small.npz' (X in [0,1], y in {0..6}).
