### Dependencies

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.applications import ResNet50, Xception
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import matplotlib.pyplot as plt
from pathlib import Path 
import pandas as pd        
import keras
import numpy as np
import os, json
from glob import glob
from sklearn.metrics import (
    ConfusionMatrixDisplay,
    accuracy_score,
    auc,
    cohen_kappa_score,
    confusion_matrix,
    f1_score,
    precision_recall_curve,
    roc_auc_score,
)

### Data Paths

In [None]:
DATA_PATH_ANN="../data/annotations"
DATA_PATH_IMG="../data/images"

In [None]:
# Image
print("Image")
img=plt.imread(f"{DATA_PATH_IMG}/0.jpg")
plt.imshow(img)

#### 0: Neutral, 1: Happy, 2: Sad, 3: Surprise, 4: Fear, 5: Disgust, 6: Anger, 7: Contempt

In [None]:
# Expression
exp=np.load(f"{DATA_PATH_ANN}/0_exp.npy")
print("Expression: ", exp)
print("Shape:", exp.shape)
print("Data type:", exp.dtype) 

#### Range [-1,+1] (for Uncertain and Noface categories the value is -2) 
#### Continuous values from -1 (negative/unpleasant) to +1 (positive/pleasant)

In [None]:
# Valence
val=np.load(f"{DATA_PATH_ANN}/0_val.npy")
print("Valence: ", val)
print("Shape:", val.shape)
print("Data type:", val.dtype) 

#### Range [-1,+1] (for Uncertain and Noface categories the value is -2) 
#### Continuous values from -1 (Tires) to +1 (Active)

In [None]:
# Arousal
aro=np.load(f"{DATA_PATH_ANN}/0_aro.npy")
print("Arousal: ", aro)
print("Shape:", aro.shape)
print("Data type:", aro.dtype) 

#### Location of the 68 facial landmarks

In [None]:
# Landmark
lnd=np.load(f"{DATA_PATH_ANN}/0_lnd.npy")
print("Landmark: ", lnd)
print("Shape:", lnd.shape)
print("Data type:", lnd.dtype) 

In [None]:
resNet = ResNet50(weights="imagenet")

# Print the architecture
resNet.summary()

In [None]:
xception = Xception(weights="imagenet")

xception.summary()

In [None]:
IMG_SIZE = (128,128)
BATCH_SIZE = 24
EPOCHS = 35
SEED = 42

np.random.seed(SEED)
tf.random.set_seed(SEED)

In [None]:
# Get all images
img_paths = sorted(glob(os.path.join(DATA_PATH_IMG, "*.jpg")))

records = []
landmarks_list = []

for img_path in img_paths:
    base = os.path.splitext(os.path.basename(img_path))[0]  # e.g. "0"
    exp_file = os.path.join(DATA_PATH_ANN, f"{base}_exp.npy")
    val_file = os.path.join(DATA_PATH_ANN, f"{base}_val.npy")
    aro_file = os.path.join(DATA_PATH_ANN, f"{base}_aro.npy")
    lnd_file = os.path.join(DATA_PATH_ANN, f"{base}_lnd.npy")

    # Load each annotation (each is a tiny .npy file)
    expression = int(np.load(exp_file))
    valence = float(np.load(val_file))
    arousal = float(np.load(aro_file))
    landmark = np.load(lnd_file)

    # Flatten landmarks if 68x2
    if landmark.ndim == 2 and landmark.shape[1] == 2:
        landmark = landmark.reshape(-1)

    records.append([img_path, expression, valence, arousal])
    landmarks_list.append(landmark)

# Make DataFrame and landmarks array
df = pd.DataFrame(records, columns=["image_path", "expression", "valence", "arousal"])
landmarks_array = np.array(landmarks_list, dtype="float32")

print("Dataset size:", len(df))
print("Expression classes:", np.unique(df["expression"]))
print("Landmarks shape:", landmarks_array.shape)

In [None]:
df.head()

In [None]:
# --- Split into train/val ---
train_df, val_df, train_landmarks, val_landmarks = train_test_split(
    df, landmarks_array, test_size=0.2, stratify=df["expression"], random_state=SEED
)

# --- Helper to load images ---
def _load_image(path, img_size=IMG_SIZE):
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg(img, channels=3)   # force RGB
    img = tf.image.resize(img, img_size)
    img = tf.cast(img, tf.float32) / 255.0
    return img

# --- Dataset builder ---
def make_multimodal_dataset(df_subset, lm_array, batch_size=BATCH_SIZE, shuffle=False):
    def gen():
        for p, exp, val, aro, lm in zip(
            df_subset["image_path"], df_subset["expression"],
            df_subset["valence"], df_subset["arousal"], lm_array
        ):
            yield p.encode("utf-8"), lm.astype("float32"), exp, np.array([val, aro], dtype="float32")
    
    output_types = (tf.string, tf.float32, tf.int64, tf.float32)
    output_shapes = ((), (lm_array.shape[1],), (), (2,))
    ds = tf.data.Dataset.from_generator(gen, output_types=output_types, output_shapes=output_shapes)
    
    def _map(path, lm, exp, va):
        img = _load_image(path)
        return {"image_input": img, "lm_input": lm}, {"expr_out": tf.one_hot(exp, df["expression"].nunique()), "va_out": va}
    
    ds = ds.map(_map, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        ds = ds.shuffle(1024, seed=SEED)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

# --- Create train/val datasets ---
train_ds = make_multimodal_dataset(train_df, train_landmarks, shuffle=True)
val_ds   = make_multimodal_dataset(val_df, val_landmarks, shuffle=False)

print(train_ds)
print(val_ds)

In [None]:
IMG_SHAPE = (128, 128, 3)
LM_DIM = train_landmarks.shape[1]
NUM_CLASSES = df["expression"].nunique()

def build_resnet_with_landmarks(img_shape=IMG_SHAPE, lm_dim=LM_DIM, freeze_backbone=True):
    # --- Image branch (ResNet50) ---
    img_in = layers.Input(shape=img_shape, name="image_input")
    base = ResNet50(weights="imagenet", include_top=False, input_tensor=img_in)
    if freeze_backbone:
        base.trainable = False
    x = base.output
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(512, activation="relu")(x)
    x = layers.Dropout(0.4)(x)
    img_feat = layers.Dense(256, activation="relu")(x)

    # --- Landmark branch ---
    lm_in = layers.Input(shape=(lm_dim,), name="lm_input")
    y = layers.Dense(128, activation="relu")(lm_in)
    y = layers.Dropout(0.3)(y)
    lm_feat = layers.Dense(64, activation="relu")(y)

    # --- Fusion ---
    fused = layers.Concatenate()([img_feat, lm_feat])
    fused = layers.Dense(256, activation="relu")(fused)
    fused = layers.Dropout(0.4)(fused)

    # --- Heads ---
    expr_out = layers.Dense(NUM_CLASSES, activation="softmax", name="expr_out")(fused)
    va_out = layers.Dense(2, activation="tanh", name="va_out")(fused)

    model = models.Model(inputs=[img_in, lm_in], outputs=[expr_out, va_out])
    return model

# Build model with frozen backbone first
model = build_resnet_with_landmarks(freeze_backbone=True)

# Unfreeze last ~30 layers (fine-tuning)
for layer in model.layers[-100:]:
    if not isinstance(layer, layers.BatchNormalization):
        layer.trainable = True

# Re-compile with smaller learning rate for fine-tuning
model.compile(
    optimizer=optimizers.Adam(learning_rate=1e-5),   # <-- smaller LR
    loss={"expr_out": "categorical_crossentropy", "va_out": "mse"},
    loss_weights={"expr_out": 1.0, "va_out": 1.0},
    metrics={"expr_out": "accuracy"}
)

model.summary()


In [None]:
# --- Callbacks ---
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        "best_resnet_landmarks.h5", monitor="val_expr_out_accuracy",
        save_best_only=True, mode="max", verbose=1
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor="val_expr_out_accuracy", factor=0.5, patience=3,
        mode="max", verbose=1
    ),
    tf.keras.callbacks.EarlyStopping(
        monitor="val_expr_out_accuracy", patience=7,
        mode="max", restore_best_weights=True, verbose=1
    )
]

# --- Train ---
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=callbacks
)


In [None]:
# Phase 1: Build frozen model
model = build_resnet_with_landmarks(freeze_backbone=True)

model.compile(
    optimizer=optimizers.Adam(learning_rate=1e-4),
    loss={"expr_out": "categorical_crossentropy", "va_out": "mse"},
    loss_weights={"expr_out": 1.0, "va_out": 1.0},
    metrics={"expr_out": "accuracy"}
)

callbacks = [
    tf.keras.callbacks.ModelCheckpoint("warmup_resnet.h5", monitor="val_expr_out_accuracy",
                                       save_best_only=True, mode="max", verbose=1)
]

history_warmup = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=10,
    callbacks=callbacks
)


In [None]:
# Clear old graph
tf.keras.backend.clear_session()

# Reload best warmup weights into a fresh model
model = build_resnet_with_landmarks(freeze_backbone=True)
model.load_weights("warmup_resnet.h5")

# Re-compile with smaller LR
model.compile(
    optimizer=optimizers.Adam(learning_rate=1e-5),
    loss={"expr_out": "categorical_crossentropy", "va_out": "mse"},
    loss_weights={"expr_out": 1.0, "va_out": 1.0},
    metrics={"expr_out": "accuracy"}
)

callbacks = [
    tf.keras.callbacks.ModelCheckpoint("finetuned_resnet.h5", monitor="val_expr_out_accuracy",
                                       save_best_only=True, mode="max", verbose=1),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_expr_out_accuracy", factor=0.5, patience=3, mode="max"),
    tf.keras.callbacks.EarlyStopping(monitor="val_expr_out_accuracy", patience=7, mode="max", restore_best_weights=True)
]

history_finetune = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=25,
    callbacks=callbacks
)


In [None]:
# ==========================
# 1. Imports
# ==========================
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import numpy as np
from pathlib import Path
import json

# metrics
from sklearn.metrics import accuracy_score, f1_score, cohen_kappa_score
from scipy.stats import pearsonr

# plotting (optional for confusion matrix / report)
import matplotlib.pyplot as plt

# if needed
from PIL import Image

from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import models, transforms

# optional timm
try:
    import timm

    TIMM_AVAILABLE = True
except Exception:
    TIMM_AVAILABLE = False

In [None]:
# ==========================
# 2. Config
# ==========================
DATA_PATH_IMG = "../data/images"        # <-- change if needed
DATA_PATH_ANN = "../data/annotations"   # <-- change if needed
IMG_SIZE = 128
BATCH_SIZE = 24
SEED = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class CFG:
    IMG_SIZE = 128
    BATCH_SIZE = 24
    NUM_WORKERS = 4
    LR = 1e-4
    EPOCHS = 25
    SEED = 42
    SAVE_DIR = Path("outputs")
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    PRINT_FREQ = 50

In [None]:
# ==========================
# 3. Dataset
# ==========================
class AffectDataset(Dataset):
    def __init__(self, img_dir, ann_dir, transform=None):
        self.img_dir = img_dir
        self.ann_dir = ann_dir
        self.transform = transform

        self.img_paths = sorted(glob(os.path.join(img_dir, "*.jpg")))

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        base = os.path.splitext(os.path.basename(img_path))[0]

        exp_file = os.path.join(self.ann_dir, f"{base}_exp.npy")
        val_file = os.path.join(self.ann_dir, f"{base}_val.npy")
        aro_file = os.path.join(self.ann_dir, f"{base}_aro.npy")
        lnd_file = os.path.join(self.ann_dir, f"{base}_lnd.npy")

        # Load
        image = Image.open(img_path).convert("RGB")
        expression = int(np.load(exp_file))
        valence = float(np.load(val_file))
        arousal = float(np.load(aro_file))
        landmark = np.load(lnd_file)

        if landmark.ndim == 2:  # flatten 68x2 -> 136
            landmark = landmark.reshape(-1)

        if self.transform:
            image = self.transform(image)

        return {
            "image": image,
            "landmarks": torch.tensor(landmark, dtype=torch.float32),
            "expression": torch.tensor(expression, dtype=torch.long),
            "valence": torch.tensor(valence, dtype=torch.float32),
            "arousal": torch.tensor(arousal, dtype=torch.float32),
        }

def collate_fn(batch):
    return {
        "image": torch.stack([b["image"] for b in batch]),
        "landmarks": torch.stack([b["landmarks"] for b in batch]),
        "expression": torch.stack([b["expression"] for b in batch]),
        "valence": torch.stack([b["valence"] for b in batch]),
        "arousal": torch.stack([b["arousal"] for b in batch]),
    }

In [None]:
# ==========================
# 4. Model
# ==========================
class MultiTaskModel(nn.Module):
    def __init__(self, num_classes=8, lm_dim=136, backbone_name="resnet50", pretrained=False):
        super().__init__()
        if backbone_name == "resnet50":
            base = models.resnet50(weights="IMAGENET1K_V1" if pretrained else None)
        else:
            raise ValueError("Only resnet50 implemented here")

        # remove FC
        modules = list(base.children())[:-1]
        self.cnn = nn.Sequential(*modules)
        feat_dim = base.fc.in_features

        # CNN branch
        self.cnn_fc = nn.Sequential(
            nn.Linear(feat_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.3)
        )

        # Landmarks branch
        self.lm_fc = nn.Sequential(
            nn.Linear(lm_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3)
        )

        # Fusion
        self.fusion = nn.Sequential(
            nn.Linear(256 + 128, 256),
            nn.ReLU(),
            nn.Dropout(0.4)
        )

        # Heads
        self.expr_head = nn.Linear(256, num_classes)
        self.va_head = nn.Linear(256, 2)

    def forward(self, x_img, x_lm):
        feat_img = self.cnn(x_img).view(x_img.size(0), -1)
        feat_img = self.cnn_fc(feat_img)

        feat_lm = self.lm_fc(x_lm)

        fused = self.fusion(torch.cat([feat_img, feat_lm], dim=1))

        expr_logits = self.expr_head(fused)
        va_out = self.va_head(fused)

        return expr_logits, va_out

In [None]:
# ==========================
# 5. Metrics (classification + regression)
# ==========================
def ccc(a, b):
    a_mean, b_mean = a.mean(), b.mean()
    cov = ((a - a_mean) * (b - b_mean)).mean()
    var_a, var_b = a.var(), b.var()
    return (2 * cov) / (var_a + var_b + (a_mean - b_mean) ** 2 + 1e-8)

def evaluate(model, loader, device):
    model.eval()
    y_true_exp, y_pred_exp = [], []
    y_true_va, y_pred_va = [], []

    with torch.no_grad():
        for batch in loader:
            imgs = batch["image"].to(device)
            lms = batch["landmarks"].to(device)
            expr = batch["expression"].to(device)
            val = batch["valence"].cpu().numpy()
            aro = batch["arousal"].cpu().numpy()

            expr_logits, va_out = model(imgs, lms)
            pred_expr = torch.argmax(expr_logits, dim=1)

            y_true_exp.extend(expr.cpu().numpy())
            y_pred_exp.extend(pred_expr.cpu().numpy())

            y_true_va.extend(np.vstack([val, aro]).T)
            y_pred_va.extend(va_out.cpu().numpy())

    y_true_exp = np.array(y_true_exp)
    y_pred_exp = np.array(y_pred_exp)
    y_true_va = np.array(y_true_va)
    y_pred_va = np.array(y_pred_va)

    # classification metrics
    acc = accuracy_score(y_true_exp, y_pred_exp)
    f1 = f1_score(y_true_exp, y_pred_exp, average="macro")
    kappa = cohen_kappa_score(y_true_exp, y_pred_exp)

    # regression metrics
    rmse = np.sqrt(np.mean((y_true_va - y_pred_va) ** 2, axis=0))
    pear = [pearsonr(y_true_va[:, i], y_pred_va[:, i])[0] for i in range(2)]
    cccs = [ccc(y_true_va[:, i], y_pred_va[:, i]) for i in range(2)]

    return {
        "acc": acc,
        "f1": f1,
        "kappa": kappa,
        "rmse_val": rmse[0],
        "rmse_aro": rmse[1],
        "pear_val": pear[0],
        "pear_aro": pear[1],
        "ccc_val": cccs[0],
        "ccc_aro": cccs[1],
    }


In [None]:
# ==========================
# 6. Run evaluation
# ==========================
transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
])

ds = AffectDataset(DATA_PATH_IMG, DATA_PATH_ANN, transform=transform)
n = len(ds)
test_size = int(0.15 * n)
val_size = test_size
train_size = n - val_size - test_size
_, _, test_ds = random_split(ds, [train_size, val_size, test_size],
                             generator=torch.Generator().manual_seed(SEED))

test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE,
                         shuffle=False, collate_fn=collate_fn)

# build model
model = MultiTaskModel(num_classes=len(np.unique([d['expression'] for d in ds])),
                       lm_dim=ds[0]['landmarks'].numel(),
                       pretrained=False).to(DEVICE)

# load weights# Allow numpy scalar types for this session
torch.serialization.add_safe_globals([np._core.multiarray.scalar])

# Now load safely
checkpoint = torch.load("../output/best_resnet18.pth", map_location=CFG.DEVICE, weights_only=False)

# Access model weights
model.load_state_dict(checkpoint["model_state"])
model.eval()

# evaluate
metrics = evaluate(model, test_loader, DEVICE)
print("\n📊 Evaluation Results:")
for k, v in metrics.items():
    print(f"{k:12s}: {v:.4f}")