In [1]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score

def _emb_cols(df, prefix="e"):
    cols = [c for c in df.columns if c.startswith(prefix)]
    return sorted(cols, key=lambda c: int(c[len(prefix):]))

def _l2(x, eps=1e-12):
    n = np.linalg.norm(x, axis=1, keepdims=True)
    return x / np.clip(n, eps, None)

def _sim_percent_from_dist(d):
    # L2-normalized embeddings => Euclidean distance in ~[0,2]
    return 100.0 * np.maximum(0.0, 1.0 - (d / 2.0))

def _fit_prototypes(train_csv, prefix="e", label_col="folder_name"):
    tr = pd.read_csv(train_csv)
    cols = _emb_cols(tr, prefix)
    Xtr = _l2(tr[cols].to_numpy(dtype=np.float32))
    ytr = tr[label_col].astype(str).to_numpy()

    names = np.unique(ytr)
    P = np.stack([Xtr[ytr == n].mean(axis=0) for n in names], axis=0)
    P = _l2(P)
    return cols, names, P

def _predict_scores(csv_path, cols, names, P, prefix="e", label_col="folder_name"):
    df = pd.read_csv(csv_path)
    X = _l2(df[cols].to_numpy(dtype=np.float32))
    y = df[label_col].astype(str).to_numpy()

    dists = np.linalg.norm(X[:, None, :] - P[None, :, :], axis=2)  # (N,K)
    best_k = np.argmin(dists, axis=1)
    best_d = dists[np.arange(len(X)), best_k]
    best_name = names[best_k]
    best_sim = _sim_percent_from_dist(best_d)
    return y, best_name, best_sim

def tune_threshold_for_far(val_str_csv, cols, names, P, far_max=0.05, grid=np.linspace(0,100,201)):
    # FAR = fraction of strangers accepted as known (sim >= th)
    _, _, sim = _predict_scores(val_str_csv, cols, names, P)
    best_th = None
    for th in grid:
        far = float((sim >= th).mean())
        if far <= far_max:
            best_th = float(th)
            break
    return best_th  # highest threshold isn't what we want; we want smallest th that meets FAR? (see below)

def tune_threshold_max_open_macro_f1(val_known_csv, val_str_csv, cols, names, P, far_max=0.05, grid=np.linspace(0,100,201)):
    yk, pk, sk = _predict_scores(val_known_csv, cols, names, P)
    ys, ps, ss = _predict_scores(val_str_csv, cols, names, P)

    best = {"th": None, "open_macro_f1": -1, "far": None}
    for th in grid:
        far = float((ss >= th).mean())
        if far > far_max:
            continue

        pred_known = np.where(sk >= th, pk, "Stranger")
        pred_str   = np.where(ss >= th, ps, "Stranger")

        y_true = np.concatenate([yk, np.array(["Stranger"] * len(ys))])
        y_pred = np.concatenate([pred_known, pred_str])

        score = float(f1_score(y_true, y_pred, average="macro"))
        if score > best["open_macro_f1"]:
            best = {"th": float(th), "open_macro_f1": score, "far": far}

    return best  # contains tuned threshold + score + FAR

import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score  # + roc_auc_score

# ... (everything above unchanged)

def eval_with_threshold(test_known_csv, test_str_csv, cols, names, P, th):
    yk, pk, sk = _predict_scores(test_known_csv, cols, names, P)
    ys, ps, ss = _predict_scores(test_str_csv, cols, names, P)

    # open-set macro-F1 (IDs + Stranger)
    pred_known = np.where(sk >= th, pk, "Stranger")
    pred_str   = np.where(ss >= th, ps, "Stranger")

    y_true_open = np.concatenate([yk, np.array(["Stranger"] * len(ys))])
    y_pred_open = np.concatenate([pred_known, pred_str])

    open_macro_f1 = float(f1_score(y_true_open, y_pred_open, average="macro"))
    open_acc = float(accuracy_score(y_true_open, y_pred_open))

    # binary known-vs-unknown (thresholded for acc/F1, continuous for ROC AUC)
    y_true_bin = np.concatenate([np.ones(len(yk)), np.zeros(len(ys))]).astype(int)
    y_pred_bin = np.concatenate([(sk >= th).astype(int), (ss >= th).astype(int)])
    bin_acc = float(accuracy_score(y_true_bin, y_pred_bin))
    bin_macro_f1 = float(f1_score(y_true_bin, y_pred_bin, average="macro"))

    # --- NEW: ROC AUC using raw similarity scores as the positive (known) score
    y_scores_bin = np.concatenate([sk, ss])  # higher => more likely known
    bin_roc_auc = float(roc_auc_score(y_true_bin, y_scores_bin))

    far = float((ss >= th).mean())
    tpr_known = float((sk >= th).mean())

    return {
        "threshold": float(th),
        "FAR": far,
        "TPR_known": tpr_known,
        "open_acc": open_acc,
        "open_macro_f1": open_macro_f1,
        "bin_acc": bin_acc,
        "bin_macro_f1": bin_macro_f1,
        "bin_roc_auc": bin_roc_auc,   # <-- added to results
    }

# -------------------------
# Usage
# -------------------------
TRAIN = "emb_csv/train_embeddings.csv"
VAL   = "emb_csv/val_embeddings.csv"
TEST  = "emb_csv/test_embeddings.csv"
VAL_STR  = "emb_csv/val_strangers_embeddings.csv"   # make this split beforehand
TEST_STR = "emb_csv/test_strangers_embeddings.csv"

cols, names, P = _fit_prototypes(TRAIN)

best = tune_threshold_max_open_macro_f1(
    val_known_csv=VAL,
    val_str_csv=VAL_STR,
    cols=cols, names=names, P=P,
    far_max=0.05,
    grid=np.linspace(0, 100, 201)
)
print("Tuned on VAL (FAR<=5%)")
print(best)

res = eval_with_threshold(TEST, TEST_STR, cols, names, P, th=best["th"])
print("\nTEST results @ tuned threshold")
for k,v in res.items():
    print(f"  {k}: {v}")


Tuned on VAL (FAR<=5%)
{'th': 61.5, 'open_macro_f1': 0.9045024655168937, 'far': 0.043478260869565216}

TEST results @ tuned threshold
  threshold: 61.5
  FAR: 0.0
  TPR_known: 0.9259259259259259
  open_acc: 0.9342723004694836
  open_macro_f1: 0.9459997290403167
  bin_acc: 0.9342723004694836
  bin_macro_f1: 0.8678660049627791
  bin_roc_auc: 0.9880952380952381
