In [7]:
# Cell 1 — CONFIG (edit these before running)
import os
WORK = r"D:\Infosys_AI-Tracefinder\Notebooks"   # <<-- Set this to your project working dir (where outputs will be written)
OFFICIAL_DIR = r"D:\Infosys_AI-Tracefinder\Data\Official"   # <<-- point to the Official dataset root (scanner folders)
FLATFIELD_DIR = r"D:\Infosys_AI-Tracefinder\Data\Flatfield" # <<-- flatfield images folder (optional but recommended)
OUTPUT_DIR = WORK   # where .pkl/.npy files will be saved
IMG_SIZE = (256, 256)
DENOISE_METHOD = "wavelet"   # keep "wavelet" (mentor's method); "wiener" optional
VALID_EXTS = ('.tif', '.tiff', '.png', '.jpg', '.jpeg')
os.makedirs(OUTPUT_DIR, exist_ok=True)
print("WORK:", WORK)
print("OFFICIAL_DIR exists:", os.path.exists(OFFICIAL_DIR))
print("FLATFIELD_DIR exists:", os.path.exists(FLATFIELD_DIR))


WORK: D:\Infosys_AI-Tracefinder\Notebooks
OFFICIAL_DIR exists: True
FLATFIELD_DIR exists: True


In [8]:
# Cell 2 — Imports & utilities
import cv2
import numpy as np
import pywt
import pickle
from tqdm import tqdm
import glob
from scipy.signal import wiener as scipy_wiener

def safe_makedirs(p): 
    os.makedirs(p, exist_ok=True)
def save_pickle(obj, path):
    with open(path, "wb") as f:
        pickle.dump(obj, f)
    print("Saved:", path)


In [9]:
# Cell 3 — Denoising & preprocessing 
def to_gray(img):
    if img is None:
        return None
    if img.ndim == 3:
        return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return img

def resize_to(img, size=IMG_SIZE):
    return cv2.resize(img, size, interpolation=cv2.INTER_AREA)

def normalize_img(img):
    # map to float32 [0,1] — consistent with mentor notebook
    return img.astype(np.float32) / 255.0

def denoise_wavelet_safe(img, wavelet='db4', level=2):
    """
    Soft-threshold wavelet denoising using robust MAD estimate.
    Returns denoised float32 image same shape as input.
    """
    arr = np.asarray(img, dtype=np.float32)
    try:
        coeffs = pywt.wavedec2(arr, wavelet=wavelet, level=level)
    except Exception as e:
        # fallback: return original if decomposition fails
        return arr.copy()
    # collect detail coeffs
    details = []
    for d in coeffs[1:]:
        for comp in d:
            details.append(np.ravel(np.nan_to_num(comp)))
    if len(details) == 0:
        return arr.copy()
    details = np.concatenate(details)
    mad = np.median(np.abs(details - np.median(details)))
    sigma = mad / 0.6745 if mad > 0 else 0.0
    uthresh = sigma * np.sqrt(2 * np.log(arr.size + 1e-12))
    # threshold details
    new_coeffs = [coeffs[0]]
    for d in coeffs[1:]:
        new_level = tuple(pywt.threshold(np.nan_to_num(comp), value=uthresh, mode='soft') for comp in d)
        new_coeffs.append(new_level)
    den = pywt.waverec2(new_coeffs, wavelet)
    den = den[:arr.shape[0], :arr.shape[1]]
    return np.nan_to_num(den, nan=arr).astype(np.float32)

def gaussian_highpass(img, ksize=11):
    arr = np.asarray(img, dtype=np.float32)
    if ksize % 2 == 0:
        ksize += 1
    blur = cv2.GaussianBlur(arr, (ksize, ksize), 0)
    return (arr - blur).astype(np.float32)

def denoise_mentor(img, method=DENOISE_METHOD):
    """
    Mentor-style denoise: default wavelet (db4 L2). 
    Returns denoised image float32.
    """
    if method == "wiener":
        return scipy_wiener(img, mysize=(5,5)).astype(np.float32)
    # default wavelet fallback
    return denoise_wavelet_safe(img, wavelet='db4', level=2)


In [10]:
# Cell 4 — preprocess single file -> residual
def preprocess_image_path(path, method=DENOISE_METHOD, size=IMG_SIZE):
    img = cv2.imread(path, cv2.IMREAD_UNCHANGED)
    if img is None:
        return None
    img = to_gray(img)
    img = resize_to(img, size=size)
    img = normalize_img(img)
    den = denoise_mentor(img, method=method)
    # residual used by mentor: img - den
    residual = (img - den).astype(np.float32)
    return residual


In [11]:
# Cell 5 — process dataset directory and save official/flatfield residual dict
def process_dataset_dir(base_dir, save_path=None, expect_dpi=True):
    """
    Walks base_dir scanner folders and returns dict {scanner: [residuals...]}
    Handles DPI subfolders and images directly in scanner folder.
    If save_path is provided, saves the dict to that path.
    """
    residuals = {}
    if not os.path.exists(base_dir):
        print(f"⚠ Dataset not found: {base_dir}")
        return residuals
    scanners = sorted([d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))])
    for scanner in tqdm(scanners, desc=f"Processing {os.path.basename(base_dir)}"):
        scanner_dir = os.path.join(base_dir, scanner)
        collected = []
        # find dpi subfolders
        subdirs = [os.path.join(scanner_dir, s) for s in os.listdir(scanner_dir) if os.path.isdir(os.path.join(scanner_dir, s))]
        if subdirs:
            # prefer scanning DPI subfolders (150,300 etc.)
            for sdir in subdirs:
                files = [os.path.join(sdir, f) for f in os.listdir(sdir) if f.lower().endswith(VALID_EXTS)]
                for fp in files:
                    res = preprocess_image_path(fp)
                    if res is not None:
                        collected.append(res)
        else:
            # images directly inside scanner folder
            files = [os.path.join(scanner_dir, f) for f in os.listdir(scanner_dir) if f.lower().endswith(VALID_EXTS)]
            for fp in files:
                res = preprocess_image_path(fp)
                if res is not None:
                    collected.append(res)
        residuals[scanner] = collected
    if save_path:
        save_pickle(residuals, save_path)
    return residuals

# Run for Official dataset
OFFICIAL_OUT = os.path.join(OUTPUT_DIR, "official_residuals.pkl")
official_residuals = process_dataset_dir(OFFICIAL_DIR, save_path=OFFICIAL_OUT)
print("Scanners in official_residuals:", list(official_residuals.keys())[:10])

# Optionally run for flatfield (if you have flatfield images directory)
if os.path.exists(FLATFIELD_DIR):
    FLATFIELD_OUT = os.path.join(OUTPUT_DIR, "flatfield_residuals.pkl")
    flatfield_residuals = process_dataset_dir(FLATFIELD_DIR, save_path=FLATFIELD_OUT)
    print("Scanners in flatfield_residuals:", list(flatfield_residuals.keys())[:10])
else:
    print("Flatfield dir not found — skipping flatfield generation.")


Processing Official: 100%|██████████| 11/11 [02:51<00:00, 15.57s/it]


Saved: D:\Infosys_AI-Tracefinder\Notebooks\official_residuals.pkl
Scanners in official_residuals: ['Canon120-1', 'Canon120-2', 'Canon220', 'Canon9000-1', 'Canon9000-2', 'EpsonV370-1', 'EpsonV370-2', 'EpsonV39-1', 'EpsonV39-2', 'EpsonV550']


Processing Flatfield: 100%|██████████| 11/11 [00:01<00:00,  5.83it/s]

Saved: D:\Infosys_AI-Tracefinder\Notebooks\flatfield_residuals.pkl
Scanners in flatfield_residuals: ['Canon120-1', 'Canon120-2', 'Canon220', 'Canon9000-1', 'Canon9000-2', 'EpsonV370-1', 'EpsonV370-2', 'EpsonV39-1', 'EpsonV39-2', 'EpsonV550']





In [12]:
# Cell 6 — compute scanner_fingerprints.pkl from flatfield_residuals.pkl
FP_OUT = os.path.join(OUTPUT_DIR, "scanner_fingerprints.pkl")
FP_KEYS_OUT = os.path.join(OUTPUT_DIR, "fp_keys.npy")

if os.path.exists(os.path.join(OUTPUT_DIR, "flatfield_residuals.pkl")):
    with open(os.path.join(OUTPUT_DIR, "flatfield_residuals.pkl"), "rb") as f:
        flatfield_res = pickle.load(f)
    scanner_fps = {}
    for scanner, res_list in tqdm(flatfield_res.items(), desc="Computing fingerprints"):
        if not res_list:
            continue
        stack = np.stack(res_list, axis=0)   # shape (N, H, W)
        scanner_fps[scanner] = np.mean(stack, axis=0).astype(np.float32)
    save_pickle(scanner_fps, FP_OUT)
    np.save(FP_KEYS_OUT, np.array(sorted(scanner_fps.keys())))
    print("Saved fingerprints:", FP_OUT, "keys:", FP_KEYS_OUT)
else:
    print("flatfield_residuals.pkl not found — run Cell 5 for FLATFIELD_DIR first.")


Computing fingerprints: 100%|██████████| 11/11 [00:00<00:00, 3550.94it/s]

Saved: D:\Infosys_AI-Tracefinder\Notebooks\scanner_fingerprints.pkl
Saved fingerprints: D:\Infosys_AI-Tracefinder\Notebooks\scanner_fingerprints.pkl keys: D:\Infosys_AI-Tracefinder\Notebooks\fp_keys.npy





In [13]:
# Cell 7 — extract handcrafted features (requires scanner_fps)
from skimage.feature import local_binary_pattern
from scipy.fft import fft2, fftshift
import math

def corr2d(a, b):
    a = a.astype(np.float32).ravel()
    b = b.astype(np.float32).ravel()
    a -= a.mean(); b -= b.mean()
    denom = np.linalg.norm(a) * np.linalg.norm(b)
    return float((a @ b) / denom) if denom > 0 else 0.0

def fft_radial_energy(img, K=6):
    f = fftshift(fft2(img))
    mag = np.abs(f)
    h, w = mag.shape
    cy, cx = h//2, w//2
    yy, xx = np.ogrid[:h, :w]
    r = np.sqrt((yy - cy)**2 + (xx - cx)**2)
    bins = np.linspace(0, r.max()+1e-6, K+1)
    feats = []
    for i in range(K):
        mask = (r >= bins[i]) & (r < bins[i+1])
        sel = mag[mask]
        feats.append(float(np.mean(sel)) if sel.size else 0.0)
    return feats

def lbp_hist_safe(img, P=8, R=1.0):
    rng = float(np.ptp(img))
    if rng < 1e-12:
        return [0.0] * (P + 2)
    g = (img - img.min()) / (rng + 1e-8)
    g8 = (g * 255).astype(np.uint8)
    codes = local_binary_pattern(g8, P=P, R=R, method="uniform")
    hist, _ = np.histogram(codes, bins=np.arange(P+3), density=True)
    return hist.astype(np.float32).tolist()

# require fingerprints
if not os.path.exists(FP_OUT):
    print("Fingerprints (scanner_fingerprints.pkl) missing. Run Cell 6 first.")
else:
    with open(FP_OUT, "rb") as f:
        scanner_fps = pickle.load(f)
    fp_keys = np.load(FP_KEYS_OUT, allow_pickle=True).tolist()
    features = []
    labels = []
    # official_residuals must exist (we want features for Official)
    if not os.path.exists(OFFICIAL_OUT):
        raise FileNotFoundError("official_residuals.pkl missing. Run Cell 5 for OFFICIAL_DIR.")
    with open(OFFICIAL_OUT, "rb") as f:
        official = pickle.load(f)
    for scanner, res_list in tqdm(official.items(), desc="Extracting features (official)"):
        for res in res_list:
            v_corr = [corr2d(res, scanner_fps[k]) for k in fp_keys]
            v_fft = fft_radial_energy(res)
            v_lbp = lbp_hist_safe(res)
            feat_vec = v_corr + v_fft + v_lbp
            features.append(feat_vec)
            labels.append(scanner)
    FEATURES_OUT = os.path.join(OUTPUT_DIR, "official_features.pkl")
    save_pickle({"features": features, "labels": labels}, FEATURES_OUT)
    print("Feature length:", len(features[0]), "samples:", len(features))


Extracting features (official): 100%|██████████| 11/11 [00:18<00:00,  1.69s/it]

Saved: D:\Infosys_AI-Tracefinder\Notebooks\official_features.pkl
Feature length: 27 samples: 2200





In [14]:
# Cell 8 — prepare final dataset (X_img.npy, X_feat.npy, y.npy) and save encoders
from sklearn.preprocessing import LabelEncoder, StandardScaler

FEATURES_OUT = os.path.join(OUTPUT_DIR, "official_features.pkl")
if not os.path.exists(FEATURES_OUT):
    raise FileNotFoundError("official_features.pkl not found — run Cell 7 first.")

with open(FEATURES_OUT, "rb") as f:
    feat_data = pickle.load(f)
features = np.array(feat_data["features"], dtype=np.float32)
labels = np.array(feat_data["labels"])

# X_img: build from official_residuals (order must match labels)
with open(OFFICIAL_OUT, "rb") as f:
    official = pickle.load(f)

X_img = []
y_img = []
for scanner, res_list in official.items():
    for res in res_list:
        X_img.append(np.expand_dims(res, -1))   # (H,W,1)
        y_img.append(scanner)

X_img = np.array(X_img, dtype=np.float32)
y_img = np.array(y_img)

# Quick check: lengths must match features/labels
print("X_img shape:", X_img.shape)
print("features shape:", features.shape)
print("num labels:", labels.shape)

# Encode labels
le = LabelEncoder()
y_encoded = le.fit_transform(labels)

# Scale handcrafted features
scaler = StandardScaler()
X_feat_scaled = scaler.fit_transform(features)

# Save final artifacts
np.save(os.path.join(OUTPUT_DIR, "X_img.npy"), X_img)
np.save(os.path.join(OUTPUT_DIR, "X_feat.npy"), X_feat_scaled)
np.save(os.path.join(OUTPUT_DIR, "y.npy"), y_encoded)

with open(os.path.join(OUTPUT_DIR, "hybrid_label_encoder.pkl"), "wb") as f:
    pickle.dump(le, f)
with open(os.path.join(OUTPUT_DIR, "hybrid_feat_scaler.pkl"), "wb") as f:
    pickle.dump(scaler, f)

print("Saved: X_img.npy, X_feat.npy, y.npy and encoder/scaler in", OUTPUT_DIR)


X_img shape: (2200, 256, 256, 1)
features shape: (2200, 27)
num labels: (2200,)
Saved: X_img.npy, X_feat.npy, y.npy and encoder/scaler in D:\Infosys_AI-Tracefinder\Notebooks


In [15]:
# Cell 9 — sanity checks
for p in ["official_residuals.pkl", "flatfield_residuals.pkl", "scanner_fingerprints.pkl", 
          "fp_keys.npy", "official_features.pkl", "X_img.npy", "X_feat.npy", "y.npy"]:
    full = os.path.join(OUTPUT_DIR, p)
    print(p, "->", "FOUND" if os.path.exists(full) else "MISSING")
# show classes
import numpy as np
with open(os.path.join(OUTPUT_DIR, "hybrid_label_encoder.pkl"), "rb") as f:
    le = pickle.load(f)
print("Classes:", list(le.classes_))


official_residuals.pkl -> FOUND
flatfield_residuals.pkl -> FOUND
scanner_fingerprints.pkl -> FOUND
fp_keys.npy -> FOUND
official_features.pkl -> FOUND
X_img.npy -> FOUND
X_feat.npy -> FOUND
y.npy -> FOUND
Classes: [np.str_('Canon120-1'), np.str_('Canon120-2'), np.str_('Canon220'), np.str_('Canon9000-1'), np.str_('Canon9000-2'), np.str_('EpsonV370-1'), np.str_('EpsonV370-2'), np.str_('EpsonV39-1'), np.str_('EpsonV39-2'), np.str_('EpsonV550'), np.str_('HP')]
