In [1]:
# ==========================================================
# DeepFakeShield — Train & Predict (Full Script)
# ==========================================================
# Inputs (recursive):
#   Train/Real:
#     C:\Users\sagni\Downloads\DeepFakeShield\archive\train-20250112T065955Z-001\train\real
#   Train/Fake:
#     C:\Users\sagni\Downloads\DeepFakeShield\archive\train-20250112T065955Z-001\train\fake
#   Sample/Fake (optional extra test set):
#     C:\Users\sagni\Downloads\DeepFakeShield\archive\Sample_fake_images\Sample_fake_images\fake
#   Test/Fake:
#     C:\Users\sagni\Downloads\DeepFakeShield\archive\test-20250112T065939Z-001\test\fake
#   Test/Real:
#     C:\Users\sagni\Downloads\DeepFakeShield\archive\test-20250112T065939Z-001\test\real
#
# Outputs (saved to C:\Users\sagni\Downloads\DeepFakeShield):
#   - deepfakeshield_model.pkl
#   - deepfakeshield_test_predictions.csv
#   - deepfakeshield_sample_predictions.csv   (if sample set found)
#   - deepfakeshield_metrics.json             (for test set)
#   - deepfakeshield_confusion_matrix.png
#   - deepfakeshield_roc_curve.png
#   - deepfakeshield_pr_curve.png
#   - deepfakeshield_feature_importance.csv
#   - deepfakeshield_classification_report.txt
# ==========================================================

from pathlib import Path
from typing import List, Dict, Any, Tuple
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from PIL import Image
from skimage.color import rgb2gray
from skimage.feature import canny

from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    roc_auc_score, average_precision_score,
    precision_recall_curve, roc_curve,
    confusion_matrix, classification_report
)
import joblib

# ----------------------------
# Paths / Config
# ----------------------------
OUT_DIR = Path(r"C:\Users\sagni\Downloads\DeepFakeShield")
OUT_DIR.mkdir(parents=True, exist_ok=True)

TRAIN_REAL = Path(r"C:\Users\sagni\Downloads\DeepFakeShield\archive\train-20250112T065955Z-001\train\real")
TRAIN_FAKE = Path(r"C:\Users\sagni\Downloads\DeepFakeShield\archive\train-20250112T065955Z-001\train\fake")
TEST_FAKE  = Path(r"C:\Users\sagni\Downloads\DeepFakeShield\archive\test-20250112T065939Z-001\test\fake")
TEST_REAL  = Path(r"C:\Users\sagni\Downloads\DeepFakeShield\archive\test-20250112T065939Z-001\test\real")
SAMPLE_FAKE = Path(r"C:\Users\sagni\Downloads\DeepFakeShield\archive\Sample_fake_images\Sample_fake_images\fake")  # optional

IMG_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp", ".gif"}
HIST_BINS = 16            # grayscale histogram bins
EDGE_RESIZE = 256         # resize shorter side for edge density
THRESHOLD = 0.5           # decision threshold on probability

# Output artifact paths
MODEL_PKL = OUT_DIR / "deepfakeshield_model.pkl"
TEST_PRED_CSV = OUT_DIR / "deepfakeshield_test_predictions.csv"
SAMPLE_PRED_CSV = OUT_DIR / "deepfakeshield_sample_predictions.csv"
METRICS_JSON = OUT_DIR / "deepfakeshield_metrics.json"
CM_PNG = OUT_DIR / "deepfakeshield_confusion_matrix.png"
ROC_PNG = OUT_DIR / "deepfakeshield_roc_curve.png"
PR_PNG = OUT_DIR / "deepfakeshield_pr_curve.png"
FI_CSV = OUT_DIR / "deepfakeshield_feature_importance.csv"
CLS_TXT = OUT_DIR / "deepfakeshield_classification_report.txt"

# ----------------------------
# Utilities
# ----------------------------
def list_images(root: Path) -> List[Path]:
    if not root.exists():
        print(f"[WARN] Missing: {root}")
        return []
    return [p for p in root.rglob("*") if p.is_file() and p.suffix.lower() in IMG_EXTS]

def extract_features_one(path: Path, hist_bins: int = HIST_BINS, edge_resize: int = EDGE_RESIZE) -> Dict[str, Any]:
    """
    Lightweight vision features:
      - width/height/aspect_ratio
      - RGB mean/std
      - grayscale brightness & contrast
      - edge density (Canny on resized gray)
      - grayscale histogram (hist_bins, L1-normalized)
    """
    try:
        with Image.open(path) as im:
            im = im.convert("RGB")
            w, h = im.size
            arr = np.asarray(im, dtype=np.float32) / 255.0
        width, height = float(w), float(h)
        aspect = float(w / h) if h else np.nan

        r, g, b = arr[..., 0], arr[..., 1], arr[..., 2]
        mean_r, mean_g, mean_b = float(r.mean()), float(g.mean()), float(b.mean())
        std_r,  std_g,  std_b  = float(r.std()),  float(g.std()),  float(b.std())

        gray = rgb2gray(arr)  # [0,1]
        brightness = float(gray.mean())
        contrast   = float(gray.std())

        # Edge density on resized gray
        if min(h, w) > 0 and min(h, w) != edge_resize:
            scale = edge_resize / min(h, w)
            new_w, new_h = int(w * scale), int(h * scale)
            gray_small = np.asarray(Image.fromarray((gray*255).astype(np.uint8)).resize((new_w, new_h))) / 255.0
        else:
            gray_small = gray
        edges = canny(gray_small, sigma=1.5)
        edge_density = float(edges.mean())

        # Grayscale histogram
        hist, _ = np.histogram((gray * 255.0).astype(np.uint8), bins=hist_bins, range=(0, 255))
        hist = hist.astype(np.float32)
        hist = hist / (hist.sum() + 1e-9)

        feats = {
            "width": width, "height": height, "aspect_ratio": aspect,
            "mean_r": mean_r, "mean_g": mean_g, "mean_b": mean_b,
            "std_r": std_r, "std_g": std_g, "std_b": std_b,
            "brightness": brightness, "contrast": contrast,
            "edge_density": edge_density,
        }
        for i, v in enumerate(hist):
            feats[f"hist_{i:02d}"] = float(v)
        return feats

    except Exception as e:
        base = {
            "width": np.nan, "height": np.nan, "aspect_ratio": np.nan,
            "mean_r": np.nan, "mean_g": np.nan, "mean_b": np.nan,
            "std_r": np.nan, "std_g": np.nan, "std_b": np.nan,
            "brightness": np.nan, "contrast": np.nan,
            "edge_density": np.nan,
        }
        for i in range(hist_bins):
            base[f"hist_{i:02d}"] = np.nan
        base["error"] = str(e)
        return base

def scan_folder(root: Path, label: int, split_name: str) -> pd.DataFrame:
    rows = []
    files = list_images(root)
    for p in files:
        feats = extract_features_one(p)
        rows.append({
            "split": split_name,
            "label": int(label),  # real=0, fake=1
            "filename": p.name,
            "abspath": str(p.resolve()),
            **feats
        })
    print(f"[INFO] {split_name} | label={label}: {len(files)} images from {root}")
    return pd.DataFrame(rows)

def plot_confusion_heatmap(cm: np.ndarray, labels: list, title: str, out_path: Path):
    plt.figure(figsize=(6, 5))
    im = plt.imshow(cm, aspect='equal')
    plt.title(title)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.colorbar(im)
    plt.xticks(range(len(labels)), labels)
    plt.yticks(range(len(labels)), labels)
    for (i, j), v in np.ndenumerate(cm):
        plt.text(j, i, str(v), ha='center', va='center')
    plt.tight_layout()
    plt.savefig(out_path, dpi=220)
    plt.close()

# ----------------------------
# 1) Load & Feature-ize datasets
# ----------------------------
train_real_df = scan_folder(TRAIN_REAL, label=0, split_name="train")
train_fake_df = scan_folder(TRAIN_FAKE, label=1, split_name="train")
test_real_df  = scan_folder(TEST_REAL,  label=0, split_name="test")
test_fake_df  = scan_folder(TEST_FAKE,  label=1, split_name="test")

dfs = [train_real_df, train_fake_df, test_real_df, test_fake_df]
if SAMPLE_FAKE.exists():
    sample_fake_df = scan_folder(SAMPLE_FAKE, label=1, split_name="sample")
    dfs.append(sample_fake_df)
else:
    sample_fake_df = pd.DataFrame()

all_df = pd.concat([d for d in dfs if not d.empty], ignore_index=True)
if all_df.empty:
    raise SystemExit("No images found. Please check your paths.")

# Drop rows missing core features
core = ["width","height","aspect_ratio","brightness","contrast","edge_density"]
all_df = all_df.dropna(subset=core).reset_index(drop=True)

# Feature columns
non_feat = {"split","label","filename","abspath","error"}
feat_cols = [c for c in all_df.columns if c not in non_feat and pd.api.types.is_numeric_dtype(all_df[c])]

# Train / Test (sample is extra)
train_df = all_df[all_df["split"] == "train"].copy()
test_df  = all_df[all_df["split"] == "test"].copy()
has_sample = not sample_fake_df.empty
if has_sample:
    sample_df = all_df[all_df["split"] == "sample"].copy()

if train_df.empty or test_df.empty:
    raise SystemExit("Train or Test split is empty. Ensure train/test folders have images.")

X_train = train_df[feat_cols].astype(float).values
y_train = train_df["label"].astype(int).values
X_test  = test_df[feat_cols].astype(float).values
y_test  = test_df["label"].astype(int).values

# ----------------------------
# 2) Pipeline: Standardize + Balanced Logistic Regression
# ----------------------------
pipe = Pipeline(steps=[
    ("scaler", StandardScaler(with_mean=True, with_std=True)),
    ("clf", LogisticRegression(
        solver="saga",
        penalty="l2",
        class_weight="balanced",
        max_iter=1000,
        n_jobs=-1,
        random_state=42
    ))
])

print("[INFO] Training model...")
pipe.fit(X_train, y_train)

# ----------------------------
# 3) Predict on Test (+ Sample)
# ----------------------------
# Test
probs_test = pipe.predict_proba(X_test)[:, 1]
preds_test = (probs_test >= THRESHOLD).astype(int)

test_out = pd.DataFrame({
    "filename": test_df["filename"].values,
    "abspath": test_df["abspath"].values,
    "y_true": y_test,
    "prob_fake": probs_test,
    "y_pred": preds_test
})
test_out.to_csv(TEST_PRED_CSV, index=False)
print(f"[SAVED] Test predictions -> {TEST_PRED_CSV}")

# Sample (if present; note: ground truth is fake=1 by construction)
if has_sample:
    X_sample = sample_df[feat_cols].astype(float).values
    probs_sample = pipe.predict_proba(X_sample)[:, 1]
    preds_sample = (probs_sample >= THRESHOLD).astype(int)
    sample_out = pd.DataFrame({
        "filename": sample_df["filename"].values,
        "abspath": sample_df["abspath"].values,
        "prob_fake": probs_sample,
        "y_pred": preds_sample
    })
    sample_out.to_csv(SAMPLE_PRED_CSV, index=False)
    print(f"[SAVED] Sample predictions -> {SAMPLE_PRED_CSV}")

# ----------------------------
# 4) Metrics & Plots (on Test set)
# ----------------------------
roc_auc = roc_auc_score(y_test, probs_test)
pr_auc  = average_precision_score(y_test, probs_test)
cm = confusion_matrix(y_test, preds_test, labels=[0,1])
report = classification_report(y_test, preds_test, digits=4)

print(f"[INFO] Test ROC-AUC: {roc_auc:.4f} | PR-AUC: {pr_auc:.4f}")
print("[INFO] Confusion matrix:\n", cm)
print("[INFO] Classification report:\n", report)

# Save metrics json
metrics = {
    "threshold": THRESHOLD,
    "roc_auc": float(roc_auc),
    "pr_auc": float(pr_auc),
    "confusion_matrix": {
        "tn": int(cm[0,0]), "fp": int(cm[0,1]),
        "fn": int(cm[1,0]), "tp": int(cm[1,1])
    }
}
with open(METRICS_JSON, "w", encoding="utf-8") as f:
    json.dump(metrics, f, indent=2)
print(f"[SAVED] Metrics JSON -> {METRICS_JSON}")

# Save classification report
with open(CLS_TXT, "w", encoding="utf-8") as f:
    f.write("=== DeepFakeShield: Balanced Logistic Regression (Test) ===\n\n")
    f.write(report + "\n")
print(f"[SAVED] Classification report -> {CLS_TXT}")

# ROC curve
fpr, tpr, _ = roc_curve(y_test, probs_test)
plt.figure(figsize=(6,5))
plt.plot(fpr, tpr, label=f"ROC-AUC={roc_auc:.3f}")
plt.plot([0,1],[0,1],"--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate (Recall)")
plt.title("DeepFakeShield: ROC Curve")
plt.legend(loc="lower right")
plt.grid(True)
plt.tight_layout()
plt.savefig(ROC_PNG, dpi=220)
plt.close()
print(f"[SAVED] ROC curve -> {ROC_PNG}")

# PR curve
prec, rec, _ = precision_recall_curve(y_test, probs_test)
plt.figure(figsize=(6,5))
plt.plot(rec, prec, label=f"PR-AUC={pr_auc:.3f}")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("DeepFakeShield: Precision-Recall Curve")
plt.legend(loc="upper right")
plt.grid(True)
plt.tight_layout()
plt.savefig(PR_PNG, dpi=220)
plt.close()
print(f"[SAVED] PR curve -> {PR_PNG}")

# Confusion matrix heatmap
plt.figure(figsize=(5.2,4.6))
im = plt.imshow(cm, aspect='equal')
plt.title(f"DeepFakeShield: Confusion Matrix (th={THRESHOLD})")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.colorbar(im)
plt.xticks([0,1], ["Real (0)", "Fake (1)"])
plt.yticks([0,1], ["Real (0)", "Fake (1)"])
for (i, j), v in np.ndenumerate(cm):
    plt.text(j, i, str(v), ha='center', va='center')
plt.tight_layout()
plt.savefig(CM_PNG, dpi=220)
plt.close()
print(f"[SAVED] Confusion matrix -> {CM_PNG}")

# ----------------------------
# 5) Feature importance (LogReg coefficients)
# ----------------------------
# Coefs correspond to standardized features
clf = pipe.named_steps["clf"]
if hasattr(clf, "coef_"):
    coefs = clf.coef_.ravel()
    fi = pd.DataFrame({
        "feature": feat_cols,
        "coef": coefs,
        "importance_abs": np.abs(coefs)
    }).sort_values("importance_abs", ascending=False)
    fi.to_csv(FI_CSV, index=False)
    print(f"[SAVED] Feature importance -> {FI_CSV}")

# ----------------------------
# 6) Save trained model
# ----------------------------
joblib.dump(pipe, MODEL_PKL)
print(f"[SAVED] Model -> {MODEL_PKL}")

print("\n[DONE] All artifacts saved in:", OUT_DIR)




[INFO] train | label=0: 326 images from C:\Users\sagni\Downloads\DeepFakeShield\archive\train-20250112T065955Z-001\train\real
[INFO] train | label=1: 153 images from C:\Users\sagni\Downloads\DeepFakeShield\archive\train-20250112T065955Z-001\train\fake
[INFO] test | label=0: 110 images from C:\Users\sagni\Downloads\DeepFakeShield\archive\test-20250112T065939Z-001\test\real




[INFO] test | label=1: 389 images from C:\Users\sagni\Downloads\DeepFakeShield\archive\test-20250112T065939Z-001\test\fake
[INFO] sample | label=1: 5 images from C:\Users\sagni\Downloads\DeepFakeShield\archive\Sample_fake_images\Sample_fake_images\fake
[INFO] Training model...
[SAVED] Test predictions -> C:\Users\sagni\Downloads\DeepFakeShield\deepfakeshield_test_predictions.csv
[SAVED] Sample predictions -> C:\Users\sagni\Downloads\DeepFakeShield\deepfakeshield_sample_predictions.csv
[INFO] Test ROC-AUC: 0.7170 | PR-AUC: 0.8948
[INFO] Confusion matrix:
 [[ 70  40]
 [132 257]]
[INFO] Classification report:
               precision    recall  f1-score   support

           0     0.3465    0.6364    0.4487       110
           1     0.8653    0.6607    0.7493       389

    accuracy                         0.6553       499
   macro avg     0.6059    0.6485    0.5990       499
weighted avg     0.7510    0.6553    0.6830       499

[SAVED] Metrics JSON -> C:\Users\sagni\Downloads\DeepFakeS