In [None]:
import os, numpy as np, pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import VGG16, ResNet50
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, cohen_kappa_score, precision_recall_curve, auc
from sklearn.preprocessing import LabelBinarizer
from PIL import Image
import json

In [None]:
DATA_DIR = "./dataset"   # change if needed
IMAGES_DIR = os.path.join(DATA_DIR, "images")
ANNOTATIONS_DIR = os.path.join(DATA_DIR, "annotations")
OUTPUT_DIR = "./outputs"
os.makedirs(OUTPUT_DIR, exist_ok=True)

IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 20
SEED = 42
NUM_CLASSES = 8
LEARNING_RATE = 1e-4
N_SAMPLES = 5496  # adjust if different

In [None]:
def build_dataframe(ann_dir, img_dir, n_samples=N_SAMPLES):
    records = []
    for idx in range(n_samples):
        exp_file = os.path.join(ann_dir, f"{idx}_exp.npy")
        val_file = os.path.join(ann_dir, f"{idx}_val.npy")
        aro_file = os.path.join(ann_dir, f"{idx}_aro.npy")
        img_path = os.path.join(img_dir, f"{idx}.jpg")

        # Skip if any required file missing
        if not (os.path.exists(exp_file) and os.path.exists(val_file) and os.path.exists(aro_file) and os.path.exists(img_path)):
            # optional: print(f"Skipping index {idx} (missing file)")
            continue

        try:
            expression = int(np.load(exp_file))
            valence = float(np.load(val_file))
            arousal = float(np.load(aro_file))
        except Exception as e:
            print(f"Skipping index {idx}: load error -> {e}")
            continue

        records.append({
            "filepath": img_path,
            "expression": expression,
            "valence": valence,
            "arousal": arousal
        })

    df = pd.DataFrame(records)
    return df

df = build_dataframe(ANNOTATIONS_DIR, IMAGES_DIR, n_samples=N_SAMPLES)
print("Dataset size after filtering:", len(df))
print(df.head())



In [None]:
def _load_image_np(path, class_label, reg_label):
    # Convert Tensor -> string
    path = path.numpy().decode("utf-8")
    
    # Open and preprocess image
    img = Image.open(path).convert("RGB")
    img = img.resize(IMG_SIZE)
    arr = np.asarray(img, dtype=np.float32) / 255.0
    
    # Labels
    cls = np.int32(class_label.numpy())   # scalar int
    reg = np.array(reg_label.numpy(), dtype=np.float32)
    reg = reg.reshape(2)                  # enforce shape [2]
    
    return arr, cls, reg

def tf_loader(path, label_class, label_reg):
    arr, c, r = tf.py_function(
        func=_load_image_np,
        inp=[path, label_class, label_reg],
        Tout=[tf.float32, tf.int32, tf.float32]
    )
    arr.set_shape([IMG_SIZE[0], IMG_SIZE[1], 3])
    c.set_shape([])   # scalar
    r.set_shape([2])  # vector length 2
    return arr, {"class_output": c, "reg_output": r}

def make_dataset(df, batch_size=32, shuffle=True):
    paths = df["filepath"].values.astype(str)
    y_class = df["expression"].values.astype(np.int32)
    y_reg = df[["valence", "arousal"]].values.astype(np.float32)

    ds = tf.data.Dataset.from_tensor_slices((paths, y_class, y_reg))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(paths), seed=SEED)
    ds = ds.map(tf_loader, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

# quick test of dataset creation (run one batch)
small_ds = make_dataset(df.head(8), batch_size=4, shuffle=False)
for x, y in small_ds.take(1):
    print("x shape:", x.shape)
    print("y class shape:", y["class_output"].shape)
    print("y reg shape:", y["reg_output"].shape)



In [None]:
def build_two_head_model(base_name="VGG16", input_shape=(224,224,3), num_classes=NUM_CLASSES, lr=LEARNING_RATE):
    if base_name == "VGG16":
        base = VGG16(include_top=False, weights="imagenet", input_shape=input_shape)
    elif base_name == "ResNet50":
        base = ResNet50(include_top=False, weights="imagenet", input_shape=input_shape)
    else:
        raise ValueError("Unsupported base model")

    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dropout(0.4)(x)

    # Classification head
    c = layers.Dense(128, activation="relu")(x)
    c = layers.Dropout(0.3)(c)
    c = layers.Dense(num_classes, activation="softmax", name="class_output")(c)

    # Regression head
    r = layers.Dense(64, activation="relu")(x)
    r = layers.Dropout(0.3)(r)
    r = layers.Dense(2, activation="linear", name="reg_output")(r)

    model = keras.Model(base.input, [c, r])

    losses = {"class_output": "sparse_categorical_crossentropy", "reg_output": "mse"}
    loss_weights = {"class_output": 1.0, "reg_output": 1.0}

    model.compile(optimizer=keras.optimizers.Adam(LEARNING_RATE),
                  loss=losses,
                  loss_weights=loss_weights,
                  metrics={"class_output": ["accuracy"]})
    return model



In [None]:
def evaluate_classification(y_true, y_pred_proba):
    y_pred = np.argmax(y_pred_proba, axis=1)
    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average="macro")
    kappa = cohen_kappa_score(y_true, y_pred)

    lb = LabelBinarizer()
    y_true_bin = lb.fit_transform(y_true)
    aucs, pr_aucs = [], []
    # handle potential binary case
    if y_true_bin.ndim == 1:
        y_true_bin = y_true_bin.reshape(-1, 1)
    for i in range(y_true_bin.shape[1]):
        try:
            precision, recall, _ = precision_recall_curve(y_true_bin[:, i], y_pred_proba[:, i])
            pr_aucs.append(auc(recall, precision))
            aucs.append(roc_auc_score(y_true_bin[:, i], y_pred_proba[:, i]))
        except Exception:
            pass
    return {"accuracy": acc, "f1_macro": f1, "kappa": kappa,
            "auc": float(np.mean(aucs)) if aucs else float('nan'),
            "pr_auc": float(np.mean(pr_aucs)) if pr_aucs else float('nan')}

def rmse(y_true, y_pred): return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))
def corr(y_true, y_pred):
    try:
        return float(np.corrcoef(y_true.ravel(), y_pred.ravel())[0,1])
    except Exception:
        return float('nan')
def sagr(y_true, y_pred): return float(np.mean(np.sign(y_true) == np.sign(y_pred)))
def ccc(y_true, y_pred):
    y_true, y_pred = y_true.ravel(), y_pred.ravel()
    mean_true, mean_pred = np.mean(y_true), np.mean(y_pred)
    var_true, var_pred = np.var(y_true), np.var(y_pred)
    cov = np.mean((y_true-mean_true)*(y_pred-mean_pred))
    denom = (var_true + var_pred + (mean_true-mean_pred)**2)
    return float((2*cov) / denom) if denom != 0 else float('nan')

def evaluate_regression(y_true, y_pred):
    return {
        "rmse": rmse(y_true, y_pred),
        "corr": corr(y_true, y_pred),
        "sagr": sagr(y_true, y_pred),
        "ccc": ccc(y_true, y_pred)
    }



In [None]:
def run_experiment(df, base="VGG16", exp_name="exp"):
    # ensure enough samples and class distribution for stratify
    if len(df) < 10:
        raise ValueError("Not enough samples in df to train.")
    df_train, df_temp = train_test_split(df, test_size=0.2, random_state=SEED, stratify=df["expression"])
    df_val, df_test = train_test_split(df_temp, test_size=0.5, random_state=SEED, stratify=df_temp["expression"])

    train_ds = make_dataset(df_train, batch_size=BATCH_SIZE, shuffle=True)
    val_ds = make_dataset(df_val, batch_size=BATCH_SIZE, shuffle=False)
    test_ds = make_dataset(df_test, batch_size=BATCH_SIZE, shuffle=False)

    model = build_two_head_model(base_name=base, num_classes=NUM_CLASSES, lr=LEARNING_RATE)
    history = model.fit(train_ds, validation_data=val_ds, epochs=EPOCHS)

    # Predictions
    y_true_cls, y_true_reg, y_pred_proba, y_pred_reg = [], [], [], []
    for imgs, labels in test_ds:
        cls_true = labels["class_output"].numpy()
        reg_true = labels["reg_output"].numpy()
        cls_pred, reg_pred = model.predict(imgs, verbose=0)
        y_true_cls.extend(cls_true.tolist())
        y_true_reg.extend(reg_true.tolist())
        y_pred_proba.extend(cls_pred.tolist())
        y_pred_reg.extend(reg_pred.tolist())

    y_true_cls = np.array(y_true_cls)
    y_true_reg = np.array(y_true_reg)
    y_pred_proba = np.array(y_pred_proba)
    y_pred_reg = np.array(y_pred_reg)

    results = {
        "classification": evaluate_classification(y_true_cls, y_pred_proba),
        "regression": evaluate_regression(y_true_reg, y_pred_reg),
        "history": history.history
    }

    with open(os.path.join(OUTPUT_DIR, f"{exp_name}_results.json"), "w") as f:
        json.dump(results, f, indent=2)
    return results


results_vgg = run_experiment(df, base="VGG16", exp_name="VGG16")
results_resnet = run_experiment(df, base="ResNet50", exp_name="ResNet50")
print("VGG16:", results_vgg)
print("ResNet50:", results_resnet)