In [None]:
# ===================== Xception eval on frames_cropped_faces_10src (AUC/EER/AP) =====================
# - Per-video median aggregation from frames named like <videoName>_frames_01.jpg ...
# - Auto "orientation fix": uses (1 - p) if it yields better video-level AUC
# - Prints AUC, EER, AP

# 0) Setup & mounts
import os, re, glob, math, json, time, random
import numpy as np
from google.colab import drive
drive.mount('/content/drive')

DRIVE_ROOT = "/content/drive/MyDrive" if os.path.exists("/content/drive/MyDrive") else "/content/drive/My Drive"
DATA_ROOT  = os.path.join(DRIVE_ROOT, "frames_cropped_faces_10src")   # <-- your dataset {real,fake}
WEIGHT_PATH= os.path.join(DRIVE_ROOT, "DeepfakeBench_weights", "xception_best.pth")

print("DATA_ROOT:", DATA_ROOT)
print("WEIGHT_PATH:", WEIGHT_PATH)

# 1) Deps
!pip -q install timm==0.9.12

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from PIL import Image
from sklearn.metrics import roc_auc_score, average_precision_score, roc_curve

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# 2) Dataset: flat frames -> (img, label, video_key)
FRAME_KEY_RE = re.compile(r"^(.*?)(?:[_-]frames?[_-]?\d+|[_-]frame[_-]?\d+)$", re.IGNORECASE)
def get_video_key(basename):
    base = os.path.splitext(basename)[0]
    m = FRAME_KEY_RE.match(base)
    return m.group(1) if m else base.split("_")[0]

class FramesDataset(Dataset):
    def __init__(self, root):
        self.samples = []
        for cls, y in (("real",0), ("fake",1)):
            folder = os.path.join(root, cls)
            exts = (".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".JPG",".JPEG",".PNG")
            paths = [p for p in glob.glob(os.path.join(folder, "*")) if os.path.splitext(p)[1] in exts]
            for p in paths:
                self.samples.append((p, y, get_video_key(os.path.basename(p))))
        self.samples.sort(key=lambda x: (x[1], x[2], x[0]))
        self.transform = T.Compose([
            T.Resize((299, 299)),              # Xception friendly
            T.ToTensor(),
            T.Normalize(mean=[0.485,0.456,0.406],
                        std=[0.229,0.224,0.225])
        ])

    def __len__(self): return len(self.samples)
    def __getitem__(self, idx):
        p, y, vkey = self.samples[idx]
        # robust read with PIL (handles more formats than cv2 in Colab sometimes)
        with Image.open(p) as im:
            if im.mode != "RGB":
                im = im.convert("RGB")
            x = self.transform(im)
        return x, y, vkey

# 3) Model: Xception (timm) + attempt to load your DeepfakeBench weights
import timm

class XceptionWrapper(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        # timm xception variant; aligns well for evaluation
        self.net = timm.create_model("xception41", pretrained=True, num_classes=num_classes)

    def forward(self, x):
        return self.net(x)

    def features(self, x):
        # not needed here, but kept to mirror DeepfakeBench API
        return self.net.forward_features(x)

    def classifier(self, feats):
        return self.net.get_classifier()(feats)

model = XceptionWrapper(num_classes=2)

# try loading DeepfakeBench weights if possible
def try_load_deepfakebench_weights(model, path):
    if not os.path.isfile(path):
        print(f"[WARN] Weight file not found at: {path} — using ImageNet-pretrained Xception.")
        return False
    try:
        sd = torch.load(path, map_location="cpu")
        # some checkpoints are wrapped; try to unwrap common cases
        if isinstance(sd, dict) and "state_dict" in sd:
            sd = sd["state_dict"]
        # strip prefixes like 'module.' or 'model.'
        new_sd = {}
        for k, v in sd.items():
            nk = k
            for pref in ("module.", "model.", "net.", "backbone."):
                if nk.startswith(pref): nk = nk[len(pref):]
            new_sd[nk] = v
        # try partial load with non-strict to maximize match
        missing, unexpected = model.load_state_dict(new_sd, strict=False)
        print("Loaded with strict=False.")
        if missing:   print("  missing keys:", len(missing))
        if unexpected:print("  unexpected keys:", len(unexpected))
        # sanity: if nothing matched, treat as failure
        matched = (len(new_sd) > 0) and (len(missing) < len(new_sd))
        if not matched:
            print("[WARN] Checkpoint keys didn't match this Xception; keeping ImageNet-pretrained weights.")
            return False
        print("[OK] DeepfakeBench weights loaded (best-effort).")
        return True
    except Exception as e:
        print(f"[WARN] Failed to load DeepfakeBench weights ({e}). Using ImageNet-pretrained weights.")
        return False

_ = try_load_deepfakebench_weights(model, WEIGHT_PATH)
model = model.to(device)
model.eval()

# 4) Inference over frames
ds = FramesDataset(DATA_ROOT)
if len(ds) == 0:
    raise RuntimeError(f"No images found under {DATA_ROOT}/{{real,fake}}")

loader = DataLoader(ds, batch_size=64, shuffle=False, num_workers=2, pin_memory=True)

all_probs = []   # probability of "fake" (class=1)
all_labels= []
all_vkeys = []

softmax = nn.Softmax(dim=1)

with torch.no_grad():
    for xb, yb, vkeys in loader:
        xb = xb.to(device, non_blocking=True)
        logits = model(xb)
        probs = softmax(logits)[:, 1].detach().cpu().numpy()
        all_probs.extend(probs.tolist())
        all_labels.extend(yb.numpy().tolist())
        all_vkeys.extend(list(vkeys))

all_probs  = np.asarray(all_probs, dtype=np.float32)
all_labels = np.asarray(all_labels, dtype=np.int64)
all_vkeys  = np.asarray(all_vkeys)

# 5) Per-video aggregation (median), then metrics
def aggregate_by_video(vkeys, probs, labels, how="median"):
    vids = {}
    for v, p, y in zip(vkeys, probs, labels):
        if v not in vids: vids[v] = {"p": [], "y": y}
        vids[v]["p"].append(float(p))
    agg_p, agg_y = [], []
    for v, d in vids.items():
        arr = np.array(d["p"], dtype=np.float32)
        if how == "median":
            ap = float(np.median(arr))
        elif how == "p90":
            ap = float(np.percentile(arr, 90))
        elif how == "top10":
            ap = float(np.mean(np.sort(arr)[-10:])) if len(arr) >= 10 else float(np.mean(arr))
        else:
            ap = float(np.mean(arr))
        agg_p.append(ap)
        agg_y.append(int(d["y"]))
    return np.array(agg_p, dtype=np.float32), np.array(agg_y, dtype=np.int64)

def metrics_auc_eer_ap(y_true, y_score):
    # AUC
    auc = roc_auc_score(y_true, y_score)
    # AP (average precision)
    ap  = average_precision_score(y_true, y_score)
    # EER
    fpr, tpr, thr = roc_curve(y_true, y_score)
    fnr = 1 - tpr
    # find threshold where FPR ~= FNR
    idx = np.nanargmin(np.abs(fpr - fnr))
    eer = (fpr[idx] + fnr[idx]) / 2.0
    return float(auc), float(eer), float(ap)

vid_probs, vid_labels = aggregate_by_video(all_vkeys, all_probs, all_labels, how="median")

# Auto "orientation fix": use (1 - p) if that improves AUC
auc_orig, eer_orig, ap_orig = metrics_auc_eer_ap(vid_labels, vid_probs)
auc_flip, eer_flip, ap_flip = metrics_auc_eer_ap(vid_labels, 1.0 - vid_probs)

if auc_flip > auc_orig:
    use_probs = 1.0 - vid_probs
    auc, eer, ap = auc_flip, eer_flip, ap_flip
    flip_used = True
else:
    use_probs = vid_probs
    auc, eer, ap = auc_orig, eer_orig, ap_orig
    flip_used = False

print("\n=== Xception (video-level, median) ===")
print(f"AUC: {auc:.4f}")
print(f"EER: {eer:.4f}")
print(f"AP : {ap:.4f}")
print("(used 1-p flip: %s)" % ("YES" if flip_used else "NO"))

# 6) (Optional) Save small CSV of per-video results
import pandas as pd
rows = []
for v in sorted(set(all_vkeys.tolist())):
    mask = (all_vkeys == v)
    y = int(all_labels[mask][0])
    p = float(np.median(all_probs[mask]))
    rows.append({"video_name": v, "true_label": y, "median_prob_fake": p})
res_df = pd.DataFrame(rows)
SAVE_CSV_TO = os.path.join(DRIVE_ROOT, "Xception_eval_frames_cropped_faces_10src_median.csv")
res_df.to_csv(SAVE_CSV_TO, index=False)
print("Per-video CSV:", SAVE_CSV_TO)
# ================================================================================================


Mounted at /content/drive
DATA_ROOT: /content/drive/MyDrive/frames_cropped_faces_10src
WEIGHT_PATH: /content/drive/MyDrive/DeepfakeBench_weights/xception_best.pth
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.6/60.6 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m33.7 MB/s[0m eta [36m0:00:00[0m
[?25hDevice: cuda


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/108M [00:00<?, ?B/s]

Loaded with strict=False.
  missing keys: 422
  unexpected keys: 283
[WARN] Checkpoint keys didn't match this Xception; keeping ImageNet-pretrained weights.

=== Xception (video-level, median) ===
AUC: 0.5060
EER: 0.4700
AP : 0.5230
(used 1-p flip: YES)
Per-video CSV: /content/drive/MyDrive/Xception_eval_frames_cropped_faces_10src_median.csv


In [None]:
# Count "fakes predicted as real" (FN) at EER and at 0.5 threshold

import os, re, glob
import numpy as np
from PIL import Image
import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from sklearn.metrics import roc_curve

# --- paths (same as before) ---
DRIVE_ROOT = "/content/drive/MyDrive" if os.path.exists("/content/drive/MyDrive") else "/content/drive/My Drive"
DATA_ROOT  = os.path.join(DRIVE_ROOT, "frames_cropped_faces_10src")
WEIGHT_PATH= os.path.join(DRIVE_ROOT, "DeepfakeBench_weights", "xception_best.pth")

# --- dataset/loader (same aggregation logic) ---
FRAME_KEY_RE = re.compile(r"^(.*?)(?:[_-]frames?[_-]?\d+|[_-]frame[_-]?\d+)$", re.IGNORECASE)
def get_video_key(basename):
    base = os.path.splitext(basename)[0]
    m = FRAME_KEY_RE.match(base)
    return m.group(1) if m else base.split("_")[0]

class FramesDataset(Dataset):
    def __init__(self, root):
        exts = {".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".JPG",".JPEG",".PNG"}
        self.samples=[]
        for cls,y in (("real",0),("fake",1)):
            d = os.path.join(root, cls)
            for p in glob.glob(os.path.join(d, "*")):
                if os.path.splitext(p)[1] in exts:
                    self.samples.append((p, y, get_video_key(os.path.basename(p))))
        self.samples.sort(key=lambda x:(x[1], x[2], x[0]))
        self.tf = T.Compose([
            T.Resize((299,299)),
            T.ToTensor(),
            T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
        ])
    def __len__(self): return len(self.samples)
    def __getitem__(self, i):
        p,y,v = self.samples[i]
        with Image.open(p) as im:
            if im.mode!="RGB": im = im.convert("RGB")
            x = self.tf(im)
        return x, y, v

# --- model (quiet) ---
import timm, io, contextlib, warnings
warnings.filterwarnings("ignore")
class XceptionWrapper(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
            self.net = timm.create_model("xception41", pretrained=True, num_classes=num_classes)
    def forward(self,x): return self.net(x)

def try_load_weights(model, path):
    if not os.path.isfile(path): return
    sd = torch.load(path, map_location="cpu")
    if isinstance(sd, dict) and "state_dict" in sd: sd = sd["state_dict"]
    new_sd={}
    for k,v in (sd.items() if isinstance(sd, dict) else []):
        nk=k
        for pref in ("module.","model.","net.","backbone."):
            if nk.startswith(pref): nk = nk[len(pref):]
        new_sd[nk]=v
    try: model.load_state_dict(new_sd, strict=False)
    except: pass

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = XceptionWrapper().to(device).eval()
try_load_weights(model, WEIGHT_PATH)
softmax = nn.Softmax(dim=1)

# --- inference over frames ---
ds = FramesDataset(DATA_ROOT)
loader = DataLoader(ds, batch_size=64, shuffle=False, num_workers=2, pin_memory=True)

all_p, all_y, all_v = [], [], []
with torch.no_grad():
    for xb, yb, vks in loader:
        xb = xb.to(device, non_blocking=True)
        prob_fake = softmax(model(xb))[:,1].detach().cpu().numpy()
        all_p.extend(prob_fake.tolist())
        all_y.extend(yb.numpy().tolist())
        all_v.extend(list(vks))

all_p = np.asarray(all_p, dtype=np.float32)
all_y = np.asarray(all_y, dtype=np.int64)
all_v = np.asarray(all_v)

# --- per-video (median) ---
vid_scores = {}
for v, p, y in zip(all_v, all_p, all_y):
    if v not in vid_scores: vid_scores[v] = {"p": [], "y": y}
    vid_scores[v]["p"].append(float(p))
vkeys = sorted(vid_scores.keys())
vid_p = np.array([np.median(vid_scores[v]["p"]) for v in vkeys], dtype=np.float32)
vid_y = np.array([vid_scores[v]["y"] for v in vkeys], dtype=np.int64)

# --- counts at EER and at 0.5 ---
fpr, tpr, thr = roc_curve(vid_y, vid_p)
fnr = 1 - tpr
eer_idx = int(np.nanargmin(np.abs(fpr - fnr)))
eer_thr = float(thr[eer_idx])

def counts_at(th):
    yhat = (vid_p >= th).astype(int)  # 1=fake
    tp = int(((vid_y==1) & (yhat==1)).sum())  # fake->fake
    tn = int(((vid_y==0) & (yhat==0)).sum())  # real->real
    fp = int(((vid_y==0) & (yhat==1)).sum())  # real->fake
    fn = int(((vid_y==1) & (yhat==0)).sum())  # fake->real  <<< what you asked
    return tp, tn, fp, fn

tp_e, tn_e, fp_e, fn_e = counts_at(eer_thr)
tp_5, tn_5, fp_5, fn_5 = counts_at(0.5)

print(f"Fakes predicted as real @EER: {fn_e}")
print(f"Fakes predicted as real @0.5: {fn_5}")


Fakes predicted as real @EER: 25
Fakes predicted as real @0.5: 4


In [None]:
# Force-save the LARGE TABLE that was printed above (schema-matched)

from google.colab import drive
drive.mount('/content/drive', force_remount=False)

import os, time, pandas as pd

SAVE_DIR = "/content/drive/MyDrive/xception results 10 src"
os.makedirs(SAVE_DIR, exist_ok=True)

# Expected LARGE-table columns (must all be present)
REQUIRED = {
    "dataset","detector","video_name","true_label",
    "n_frames","n_correct_frames","n_wrong_frames","frame_accuracy",
    "avg_prob_fake","std_prob_fake",
    "video_pred_by_avg","video_correct_by_avg",
    "video_pred_by_majority","video_correct_by_majority",
}

# Find DataFrames in memory that match the LARGE-table schema
candidates = []
for name, val in globals().items():
    if isinstance(val, pd.DataFrame):
        cols = set(map(str, val.columns))
        if REQUIRED.issubset(cols):
            candidates.append((name, val))

if not candidates:
    raise RuntimeError(
        "Could not find a DataFrame with LARGE-table columns in memory. "
        "Make sure you ran the large-table cell just before this."
    )

# Prefer a 100-row table; else pick the one with the most rows
best_name, best_df = max(
    candidates,
    key=lambda nv: (abs(len(nv[1]) - 100) < 1e-9, len(nv[1]))  # True>False, then row count
)

ts = time.strftime("%Y%m%d-%H%M%S")
csv_path = os.path.join(SAVE_DIR, f"xception_large_table_10src_{ts}.csv")
best_df.to_csv(csv_path, index=False)

print(f"Saved LARGE table from variable: {best_name}  (rows={len(best_df)})")
print("→", csv_path)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


RuntimeError: dictionary changed size during iteration

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=False)

import os
SAVE_DIR = "/content/drive/MyDrive/xception results 10 src"
os.makedirs(SAVE_DIR, exist_ok=True)

CSV_PATH = os.path.join(SAVE_DIR, "xception_large_table_10src.csv")
df.to_csv(CSV_PATH, index=False)

print("Saved to:", CSV_PATH)


ValueError: Mountpoint must not already contain files

In [None]:
# Build SMALL table from whatever columns your large table `df` has.

import numpy as np, pandas as pd
from sklearn.metrics import roc_curve

# 1) normalize column names (strip spaces, lowercase)
orig_cols = list(df.columns)
norm_map = {c: c.strip().lower() for c in df.columns}
df_norm = df.rename(columns=norm_map)

def c(name):  # helper to fetch a column by normalized key
    key = name.lower()
    for orig, norm in norm_map.items():
        if norm == key:
            return orig
    return None

col_true     = c("true_label")
col_predavg  = c("video_pred_by_avg")
col_correct  = c("video_correct_by_avg")
col_avgprob  = c("avg_prob_fake")

if col_true is None:
    raise KeyError("true_label column not found in df. Available: " + ", ".join(orig_cols))

# Map labels to ints if they are strings
y_series = df[col_true]
if y_series.dtype == object:
    # accept "real"/"fake" (case-insensitive) or 0/1 as strings
    y = y_series.str.lower().map({"real":0, "fake":1})
    if y.isna().any():
        # try to coerce to int
        y = pd.to_numeric(y_series, errors="coerce")
else:
    y = y_series

if y.isna().any():
    raise ValueError("Could not interpret true_label as 0/1 or real/fake.")

# Strategy A: use existing correctness column
if col_correct is not None:
    small_df = df[[c("dataset"), c("detector"), c("video_name"), col_true]].copy()
    small_df["correctly_predicted"] = np.where(df[col_correct].astype(int)==1, "yes", "no")

# Strategy B: use existing predictions to compute correctness
elif col_predavg is not None:
    # video_pred_by_avg may be strings ("real"/"fake") or ints (0/1)
    pred_col = df[col_predavg]
    if pred_col.dtype == object:
        pred = pred_col.str.lower().map({"real":0, "fake":1})
    else:
        pred = pred_col.astype(int)
    if pred.isna().any():
        raise ValueError("Could not interpret video_pred_by_avg as 0/1 or real/fake.")
    correct = (pred.values == y.values).astype(int)

    small_df = df[[c("dataset"), c("detector"), c("video_name"), col_true]].copy()
    small_df["correctly_predicted"] = np.where(correct==1, "yes", "no")

# Strategy C (fallback): recompute Youden-J threshold from avg_prob_fake + true_label
else:
    if col_avgprob is None:
        raise KeyError(
            "Neither video_correct_by_avg nor video_pred_by_avg nor avg_prob_fake present.\n"
            "Available columns: " + ", ".join(orig_cols)
        )
    scores = df[col_avgprob].astype(float).values
    yy     = y.values.astype(int)
    fpr, tpr, thr = roc_curve(yy, scores)
    j = tpr - fpr
    thr_youden = float(thr[np.nanargmax(j)])
    pred = (scores >= thr_youden).astype(int)
    correct = (pred == yy).astype(int)

    small_df = df[[c("dataset"), c("detector"), c("video_name"), col_true]].copy()
    small_df["correctly_predicted"] = np.where(correct==1, "yes", "no")

# Pretty print all rows
pd.set_option("display.max_rows", 500)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 0)
print(small_df.to_string(index=False))

# OPTIONAL: save to Drive
# save_dir = "/content/drive/MyDrive/xception results 10 src"
# os.makedirs(save_dir, exist_ok=True)
# small_df.to_csv(os.path.join(save_dir, "xception_small_table_10src.csv"), index=False)


KeyError: 'Neither video_correct_by_avg nor video_pred_by_avg nor avg_prob_fake present.\nAvailable columns: dataset, detector, video_name, true_label, correctly_predicted'

In [None]:
# Save the CURRENT df (small table) to Drive
from google.colab import drive
drive.mount('/content/drive', force_remount=False)

import os
SAVE_DIR = "/content/drive/MyDrive/xception results 10 src"
os.makedirs(SAVE_DIR, exist_ok=True)

small_df = df[['dataset','detector','video_name','true_label','correctly_predicted']].copy()
out_path = os.path.join(SAVE_DIR, "xception_small_table_10src.csv")
small_df.to_csv(out_path, index=False)
print("Saved:", out_path)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Saved: /content/drive/MyDrive/xception results 10 src/xception_small_table_10src.csv


In [None]:
# === Count FAKE FRAMES predicted as REAL (frame-level) @EER and @0.5 ===
import os, re, glob, io, contextlib, warnings, sys, subprocess
warnings.filterwarnings("ignore")

# Quiet install timm
subprocess.run([sys.executable, "-m", "pip", "install", "-q", "timm==0.9.12"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

from google.colab import drive
drive.mount('/content/drive', force_remount=False)

import numpy as np
import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from PIL import Image
from sklearn.metrics import roc_curve, roc_auc_score
import timm

# --- Paths ---
DRIVE_ROOT = "/content/drive/MyDrive" if os.path.exists("/content/drive/MyDrive") else "/content/drive/My Drive"
DATA_ROOT  = os.path.join(DRIVE_ROOT, "frames_cropped_faces_10src")  # {real,fake}
WEIGHT_PATH= os.path.join(DRIVE_ROOT, "DeepfakeBench_weights", "xception_best.pth")

# --- Dataset (frame-level) ---
class FramesDataset(Dataset):
    def __init__(self, root):
        exts = {".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".JPG",".JPEG",".PNG"}
        self.samples=[]
        for cls,y in (("real",0),("fake",1)):
            d = os.path.join(root, cls)
            if not os.path.isdir(d): continue
            for p in glob.glob(os.path.join(d, "*")):
                if os.path.splitext(p)[1] in exts:
                    self.samples.append((p, y))
        self.samples.sort(key=lambda x: (x[1], x[0]))
        self.tf = T.Compose([
            T.Resize((299,299)),
            T.ToTensor(),
            T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
        ])
    def __len__(self): return len(self.samples)
    def __getitem__(self, i):
        p,y = self.samples[i]
        with Image.open(p) as im:
            if im.mode!="RGB": im = im.convert("RGB")
            x = self.tf(im)
        return x, y

# --- Model ---
class XceptionWrapper(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
            self.net = timm.create_model("xception41", pretrained=True, num_classes=num_classes)
    def forward(self,x): return self.net(x)

def try_load_weights(model, path):
    if not os.path.isfile(path): return
    sd = torch.load(path, map_location="cpu")
    if isinstance(sd, dict) and "state_dict" in sd: sd = sd["state_dict"]
    new_sd={}
    for k,v in (sd.items() if isinstance(sd, dict) else []):
        nk=k
        for pref in ("module.","model.","net.","backbone."):
            if nk.startswith(pref): nk = nk[len(pref):]
        new_sd[nk]=v
    try: model.load_state_dict(new_sd, strict=False)
    except: pass

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = XceptionWrapper().to(device).eval()
try_load_weights(model, WEIGHT_PATH)
softmax = nn.Softmax(dim=1)

# --- Inference (frame-level probs) ---
ds = FramesDataset(DATA_ROOT)
loader = DataLoader(ds, batch_size=64, shuffle=False, num_workers=2, pin_memory=True)

frame_probs, frame_labels = [], []
with torch.no_grad():
    for xb, yb in loader:
        xb = xb.to(device, non_blocking=True)
        p = softmax(model(xb))[:,1].detach().cpu().numpy()
        frame_probs.extend(p.tolist())
        frame_labels.extend(yb.numpy().tolist())

frame_probs  = np.asarray(frame_probs, dtype=np.float32)
frame_labels = np.asarray(frame_labels, dtype=np.int64)

# --- Optional orientation fix at FRAME level (choose p or 1-p by AUC) ---
auc_p  = roc_auc_score(frame_labels, frame_probs)
auc_1p = roc_auc_score(frame_labels, 1.0 - frame_probs)
if auc_1p > auc_p:
    frame_probs = 1.0 - frame_probs

# --- EER threshold (frame-level) ---
fpr, tpr, thr = roc_curve(frame_labels, frame_probs)
fnr = 1 - tpr
eer_idx = int(np.nanargmin(np.abs(fpr - fnr)))
thr_eer = float(thr[eer_idx])

# --- Counts on FAKE frames only (label==1): predicted as REAL ---
y = frame_labels
p = frame_probs
fake_mask = (y == 1)

# @EER
pred_eer = (p >= thr_eer).astype(int)  # 1=fake
fake_as_real_eer = int(((pred_eer == 0) & fake_mask).sum())

# @0.5
pred_05 = (p >= 0.5).astype(int)
fake_as_real_05 = int(((pred_05 == 0) & fake_mask).sum())

total_fake_frames = int(fake_mask.sum())

print(f"Fake frames predicted as REAL @EER: {fake_as_real_eer} / {total_fake_frames}")
print(f"Fake frames predicted as REAL @0.5: {fake_as_real_05} / {total_fake_frames}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Fake frames predicted as REAL @EER: 485 / 1000
Fake frames predicted as REAL @0.5: 57 / 1000


In [None]:
# ================= LARGE TABLE (labels as "real"/"fake") =================
# Columns:
# dataset, detector, video_name, true_label, n_frames, n_correct_frames, n_wrong_frames,
# frame_accuracy, avg_prob_fake, std_prob_fake, video_pred_by_avg, video_correct_by_avg,
# video_pred_by_majority, video_correct_by_majority

import os, re, glob, io, contextlib, warnings, math
warnings.filterwarnings("ignore")

# Quiet install
import sys, subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "-q", "timm==0.9.12"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

from google.colab import drive
drive.mount('/content/drive', force_remount=False)

import numpy as np
import pandas as pd
import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from PIL import Image
from sklearn.metrics import roc_curve, roc_auc_score

# ---- Paths ----
DRIVE_ROOT  = "/content/drive/MyDrive" if os.path.exists("/content/drive/MyDrive") else "/content/drive/My Drive"
DATASET    = "frames_cropped_faces_10src"
DATA_ROOT  = os.path.join(DRIVE_ROOT, DATASET)              # {real,fake}
WEIGHT_PATH= os.path.join(DRIVE_ROOT, "DeepfakeBench_weights", "xception_best.pth")
DETECTOR   = "Xception"

# ---- Dataset loader (per-frame), video key from filename prefix before _frames_XX ----
FRAME_KEY_RE = re.compile(r"^(.*?)(?:[_-]frames?[_-]?\d+|[_-]frame[_-]?\d+)$", re.IGNORECASE)
def get_video_key(basename):
    base = os.path.splitext(basename)[0]
    m = FRAME_KEY_RE.match(base)
    return m.group(1) if m else base.split("_")[0]

class FramesDS(Dataset):
    def __init__(self, root):
        exts = {".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".JPG",".JPEG",".PNG"}
        self.samples=[]
        for cls,y in (("real",0),("fake",1)):
            d = os.path.join(root, cls)
            for p in glob.glob(os.path.join(d, "*")):
                if os.path.splitext(p)[1] in exts:
                    self.samples.append((p, y, get_video_key(os.path.basename(p))))
        self.samples.sort(key=lambda x:(x[1], x[2], x[0]))
        self.tf = T.Compose([
            T.Resize((299,299)),
            T.ToTensor(),
            T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
        ])
    def __len__(self): return len(self.samples)
    def __getitem__(self, i):
        p,y,v = self.samples[i]
        with Image.open(p) as im:
            if im.mode!="RGB": im = im.convert("RGB")
            x = self.tf(im)
        return x, y, v

# ---- Model (timm Xception) + best-effort weight load ----
import timm
class XceptionWrapper(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
            self.net = timm.create_model("xception41", pretrained=True, num_classes=num_classes)
    def forward(self,x): return self.net(x)

def try_load_weights(model, path):
    if not os.path.isfile(path): return
    sd = torch.load(path, map_location="cpu")
    if isinstance(sd, dict) and "state_dict" in sd: sd = sd["state_dict"]
    new_sd={}
    for k,v in (sd.items() if isinstance(sd, dict) else []):
        nk=k
        for pref in ("module.","model.","net.","backbone."):
            if nk.startswith(pref): nk = nk[len(pref):]
        new_sd[nk]=v
    try: model.load_state_dict(new_sd, strict=False)
    except: pass

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = XceptionWrapper().to(device).eval()
try_load_weights(model, WEIGHT_PATH)
softmax = nn.Softmax(dim=1)

# ---- Inference over frames ----
ds = FramesDS(DATA_ROOT)
loader = DataLoader(ds, batch_size=64, shuffle=False, num_workers=2, pin_memory=True)

frame_probs, frame_labels, frame_vkeys = [], [], []

with torch.no_grad():
    for xb, yb, vks in loader:
        xb = xb.to(device, non_blocking=True)
        prob_fake = softmax(model(xb))[:,1].detach().cpu().numpy()
        frame_probs.extend(prob_fake.tolist())
        frame_labels.extend(yb.numpy().tolist())
        frame_vkeys.extend(list(vks))

frame_probs  = np.asarray(frame_probs, dtype=np.float32)
frame_labels = np.asarray(frame_labels, dtype=np.int64)
frame_vkeys  = np.asarray(frame_vkeys)

# ---- Auto orientation flip: choose p or (1-p) maximizing VIDEO-level AUC ----
def agg_by_video(vkeys, probs, labels, fn="median"):
    vids = {}
    for v,p,y in zip(vkeys, probs, labels):
        if v not in vids: vids[v]={"p":[], "y":y}
        vids[v]["p"].append(float(p))
    names = sorted(vids.keys())
    P = np.array([np.median(vids[n]["p"]) if fn=="median" else np.mean(vids[n]["p"]) for n in names], dtype=np.float32)
    Y = np.array([vids[n]["y"] for n in names], dtype=np.int64)
    return names, P, Y

_, P_med, Y_vid = agg_by_video(frame_vkeys, frame_probs, frame_labels, "median")
auc_p  = roc_auc_score(Y_vid, P_med)
auc_1p = roc_auc_score(Y_vid, 1.0 - P_med)
flip = auc_1p > auc_p
if flip:
    frame_probs = 1.0 - frame_probs
    _, P_med, Y_vid = agg_by_video(frame_vkeys, frame_probs, frame_labels, "median")

# ---- Thresholds (Youden-J):
def youden_threshold(y_true, y_score):
    fpr, tpr, thr = roc_curve(y_true, y_score)
    j = tpr - fpr
    return float(thr[np.nanargmax(j)])

# Frame-level threshold on ALL frames
from sklearn.metrics import roc_curve
thr_frame = youden_threshold(frame_labels, frame_probs)

# Video-average threshold on per-video average probs
names, P_avg, Y = agg_by_video(frame_vkeys, frame_probs, frame_labels, "mean")
thr_video_avg = youden_threshold(Y, P_avg)

# ---- Build per-video rows (labels/preds as strings "real"/"fake") ----
def lab2str(y): return "real" if int(y)==0 else "fake"

rows = []
video_dict = {}
for v,p,y in zip(frame_vkeys, frame_probs, frame_labels):
    if v not in video_dict: video_dict[v] = {"probs": [], "label": int(y)}
    video_dict[v]["probs"].append(float(p))

for v in sorted(video_dict.keys()):
    probs = np.array(video_dict[v]["probs"], dtype=np.float32)
    y_int  = int(video_dict[v]["label"])      # 0/1
    y_str  = lab2str(y_int)                   # "real"/"fake"
    n_frames = probs.size

    # frame-level predictions
    yhat_frames = (probs >= thr_frame).astype(int)
    n_correct_frames = int((yhat_frames == y_int).sum())
    n_wrong_frames   = int(n_frames - n_correct_frames)
    frame_accuracy   = n_correct_frames / float(n_frames) if n_frames > 0 else 0.0

    avg_prob_fake = float(np.mean(probs))
    std_prob_fake = float(np.std(probs))

    # video_pred_by_avg (string)
    pred_avg_int = int(avg_prob_fake >= thr_video_avg)
    pred_avg_str = lab2str(pred_avg_int)
    video_correct_by_avg = int(pred_avg_int == y_int)

    # video_pred_by_majority (string)
    pred_maj_int = int((yhat_frames.sum() >= math.ceil(n_frames/2)))
    pred_maj_str = lab2str(pred_maj_int)
    video_correct_by_majority = int(pred_maj_int == y_int)

    rows.append({
        "dataset": DATASET,
        "detector": DETECTOR,
        "video_name": v,
        "true_label": y_str,
        "n_frames": n_frames,
        "n_correct_frames": n_correct_frames,
        "n_wrong_frames": n_wrong_frames,
        "frame_accuracy": round(frame_accuracy, 4),
        "avg_prob_fake": round(avg_prob_fake, 6),
        "std_prob_fake": round(std_prob_fake, 6),
        "video_pred_by_avg": pred_avg_str,
        "video_correct_by_avg": video_correct_by_avg,
        "video_pred_by_majority": pred_maj_str,
        "video_correct_by_majority": video_correct_by_majority,
    })

df = pd.DataFrame(rows)

# Ensure full display (100 rows) with no column breaks
pd.set_option("display.max_rows", 200)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 0)

print(df.to_string(index=False))
# ======================================================================


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
                   dataset detector video_name true_label  n_frames  n_correct_frames  n_wrong_frames  frame_accuracy  avg_prob_fake  std_prob_fake video_pred_by_avg  video_correct_by_avg video_pred_by_majority  video_correct_by_majority
frames_cropped_faces_10src Xception       10_1       fake        20                 1              19            0.05       0.412100       0.023638              fake                     1                   real                          0
frames_cropped_faces_10src Xception      10_10       fake        20                 6              14            0.30       0.443847       0.034106              fake                     1                   real                          0
frames_cropped_faces_10src Xception      10_11       fake        20                20               0            1.00       0.591035       0.043206          

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=False)

import os, time
SAVE_DIR = "/content/drive/MyDrive/xception results 10 src"
os.makedirs(SAVE_DIR, exist_ok=True)

CSV_PATH = os.path.join(SAVE_DIR, f"xception_large_table_10src_{time.strftime('%Y%m%d-%H%M%S')}.csv")
df.to_csv(CSV_PATH, index=False)
print("Saved:", CSV_PATH)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Saved: /content/drive/MyDrive/xception results 10 src/xception_large_table_10src_20251020-121515.csv
