In [2]:
pip install opencv-python

Note: you may need to restart the kernel to use updated packages.


In [3]:
import sys
import os
import glob
import numpy as np
import cv2
from PIL import Image
import matplotlib.pyplot as plt

# Add local PRNU repo to Python path
prnu_repo_path = r"C:\Users\Khushi\prnu-camera-source-detection\prnu-python"
if prnu_repo_path not in sys.path:
    sys.path.append(prnu_repo_path)

# Now import PRNU functions from the local repo
from prnu import extract_multiple_aligned, crosscorr_2d
from prnu.functions import noise_extract_compact, rgb2gray

print("‚úÖ Successfully imported PRNU from local repo")


‚úÖ Successfully imported PRNU from local repo


In [4]:
import sys
sys.path.append(r'c:\Users\inamy\Desktop\minor project\prnu-camera-source-detection\prnu-python')

In [None]:
import os, glob, gc
import numpy as np
from PIL import Image
import cv2
from prnu import extract_multiple_aligned

# -------------------
# SETTINGS
# -------------------

data_directory = r"C:\Users\Khushi\prnu-camera-source-detection\data"
fingerprint_directory = os.path.join(os.path.dirname(data_directory), 'fingerprints')
os.makedirs(fingerprint_directory, exist_ok=True)

TARGET_WIDTH = 512
TARGET_HEIGHT = 384
FRAMES_PER_VIDEO = 60       # Enough for PRNU
BATCH_SIZE = 15             # Process only 15 frames at a time


# -------------------
# HELPERS
# -------------------

def resize_with_padding(img_np, target_width, target_height):
    img = Image.fromarray(img_np)
    img.thumbnail((target_width, target_height), Image.LANCZOS)
    new_img = Image.new("RGB", (target_width, target_height))
    new_img.paste(img, ((target_width - img.width) // 2, (target_height - img.height) // 2))
    return np.array(new_img, dtype=np.uint8)


def extract_frames_from_video(video_path, frames_per_video, TARGET_WIDTH, TARGET_HEIGHT):
    frames = []
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"‚ö†Ô∏è Could not open video file: {video_path}")
        return frames

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if total_frames == 0:
        return frames

    frame_indices = np.linspace(0, total_frames - 1, frames_per_video, dtype=int)

    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img = resize_with_padding(rgb, TARGET_WIDTH, TARGET_HEIGHT)
            frames.append(img)

    cap.release()
    return frames

# -------------------
# MAIN PROCESS
# -------------------

device_folders = [f for f in os.listdir(data_directory) if os.path.isdir(os.path.join(data_directory, f))]
print(f"Found devices: {device_folders}")

for device in device_folders:
    print(f"\nüìç Processing fingerprints for: {device}")

    video_dir = os.path.join(data_directory, device, 'videos', 'fingerprint_set')
    video_paths = glob.glob(os.path.join(video_dir, '*.mp4')) + \
                  glob.glob(os.path.join(video_dir, '*.mov')) + \
                  glob.glob(os.path.join(video_dir, '*.avi'))

    if not video_paths:
        print(f"‚ö†Ô∏è No video files found for {device}. Skipping.")
        continue

    fingerprint = None

    for path in video_paths:
        print(f"  ‚Üí Extracting frames from: {os.path.basename(path)}")
        frames = extract_frames_from_video(path, FRAMES_PER_VIDEO, TARGET_WIDTH, TARGET_HEIGHT)

        if not frames:
            print(f"  ‚ö†Ô∏è No frames extracted from {path}")
            continue

        # Process in batches to avoid memory spike
        for i in range(0, len(frames), BATCH_SIZE):
            batch = frames[i:i+BATCH_SIZE]

            batch_uint8 = np.array(batch, dtype=np.uint8)

            if fingerprint is None:
                fingerprint = extract_multiple_aligned(batch_uint8, processes=0)
            else:
                fp_part = extract_multiple_aligned(batch_uint8, processes=0)
                fingerprint = (fingerprint + fp_part) / 2

            del batch_uint8
            gc.collect()

        del frames
        gc.collect()

    if fingerprint is None:
        print(f"‚ö†Ô∏è No fingerprint generated for {device}")
        continue

    save_path = os.path.join(fingerprint_directory, f"{device}_video_fingerprint.npy")
    np.save(save_path, fingerprint)
    print(f"‚úÖ Fingerprint saved for {device}: {save_path}, shape: {fingerprint.shape}")



Found devices: ['iphone15', 'OnePlus Nord CE4', 'Samsung S21 FE', 'Samsung S23 5g']

üìç Processing fingerprints for: iphone15
  ‚Üí Extracting frames from: IMG_5445.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.57it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.65it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.43it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.22it/s]


  ‚Üí Extracting frames from: IMG_5446.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  9.26it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  9.01it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  9.07it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  9.40it/s]


  ‚Üí Extracting frames from: IMG_5448.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.82it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.59it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.80it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.76it/s]


  ‚Üí Extracting frames from: IMG_5449.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.92it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.72it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.43it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.97it/s]


  ‚Üí Extracting frames from: IMG_5450.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.41it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.42it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.01it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.38it/s]


  ‚Üí Extracting frames from: IMG_5452.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.34it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.36it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.10it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.08it/s]


  ‚Üí Extracting frames from: IMG_5453.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.21it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.96it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  9.00it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.62it/s]


  ‚Üí Extracting frames from: IMG_5456.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.25it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.90it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.62it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.37it/s]


‚úÖ Fingerprint saved for iphone15: C:\Users\Khushi\prnu-camera-source-detection\fingerprints\iphone15_video_fingerprint.npy, shape: (384, 512)

üìç Processing fingerprints for: OnePlus Nord CE4
  ‚Üí Extracting frames from: VID20251030112943.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.00it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.23it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.57it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.52it/s]


  ‚Üí Extracting frames from: VID20251030112954.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.42it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.08it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.41it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.43it/s]


  ‚Üí Extracting frames from: VID20251030113117.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.19it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.37it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.22it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.45it/s]


  ‚Üí Extracting frames from: VID20251030113151.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.17it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.48it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.43it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.44it/s]


  ‚Üí Extracting frames from: VID20251030113219.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.02it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.33it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.14it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  6.79it/s]


  ‚Üí Extracting frames from: VID20251030113315.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.47it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.22it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.15it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.44it/s]


  ‚Üí Extracting frames from: VID20251030113406.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.13it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.37it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.13it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.88it/s]


  ‚Üí Extracting frames from: VID20251030113428.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.15it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.58it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.65it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.38it/s]


  ‚Üí Extracting frames from: VID20251030113546.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.29it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.61it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.50it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.10it/s]


  ‚Üí Extracting frames from: VID20251030113641.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  6.96it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  6.71it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.20it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.36it/s]


  ‚Üí Extracting frames from: VID20251030113846.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.33it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.46it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.42it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.68it/s]


‚úÖ Fingerprint saved for OnePlus Nord CE4: C:\Users\Khushi\prnu-camera-source-detection\fingerprints\OnePlus Nord CE4_video_fingerprint.npy, shape: (384, 512)

üìç Processing fingerprints for: Samsung S21 FE
  ‚Üí Extracting frames from: 20251017_194843.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.62it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.54it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.56it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.36it/s]


  ‚Üí Extracting frames from: 20251017_195010.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.62it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.33it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.23it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 14/14 [00:01<00:00,  7.74it/s]


  ‚Üí Extracting frames from: 20251017_195454.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.68it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.39it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.48it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.62it/s]


  ‚Üí Extracting frames from: 20251017_195706.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.50it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.14it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.46it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.40it/s]


  ‚Üí Extracting frames from: 20251017_200315.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.14it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.88it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.65it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.33it/s]


  ‚Üí Extracting frames from: 20251017_200651.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.81it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.48it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.75it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.51it/s]


  ‚Üí Extracting frames from: 20251017_200928.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  6.63it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.80it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.46it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.00it/s]


  ‚Üí Extracting frames from: 20251017_201833.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.65it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.16it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.29it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.44it/s]


‚úÖ Fingerprint saved for Samsung S21 FE: C:\Users\Khushi\prnu-camera-source-detection\fingerprints\Samsung S21 FE_video_fingerprint.npy, shape: (384, 512)

üìç Processing fingerprints for: Samsung S23 5g
  ‚Üí Extracting frames from: 20250306_131548.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.92it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.26it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.87it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.34it/s]


  ‚Üí Extracting frames from: 20250402_174439.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.03it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.51it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.48it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.04it/s]


  ‚Üí Extracting frames from: 20250413_081525.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  6.55it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  6.60it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  6.88it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  6.88it/s]


  ‚Üí Extracting frames from: 20250413_085810.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.03it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.51it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.57it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.33it/s]


  ‚Üí Extracting frames from: 20250413_185327.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.69it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.28it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.00it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.22it/s]


  ‚Üí Extracting frames from: 20250530_101017.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.08it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.46it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.64it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.68it/s]


  ‚Üí Extracting frames from: 20250531_171308.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.12it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.47it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.92it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:02<00:00,  7.33it/s]


  ‚Üí Extracting frames from: 20250612_173237.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.67it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.09it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.33it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.27it/s]


  ‚Üí Extracting frames from: 20250620_175107.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.90it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.97it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.75it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.71it/s]


  ‚Üí Extracting frames from: 20250709_204001.mp4


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.56it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.95it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  7.81it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 15/15 [00:01<00:00,  8.08it/s]


‚úÖ Fingerprint saved for Samsung S23 5g: C:\Users\Khushi\prnu-camera-source-detection\fingerprints\Samsung S23 5g_video_fingerprint.npy, shape: (384, 512)


In [13]:
# ---------------------------
# MAX-ACCURACY HYBRID EVALUATOR
# Run this cell after you've defined extract_frames_from_video(...) above.
# ---------------------------

import os, glob, math, gc
import numpy as np
import cv2
from typing import Dict, Tuple, List, Optional
from scipy import signal

# Settings (tweak if needed)
DATA_ROOT = r"C:\Users\Khushi\prnu-camera-source-detection\data"
FINGERPRINT_DIR = r"C:\Users\Khushi\prnu-camera-source-detection\fingerprints"

FRAMES_PER_VIDEO = 60     # more frames -> stronger PRNU evidence
DROP_BOTTOM_PERCENT = 30  # drop bottom 30% by quality
BATCH_SIZE = 15           # used only if we later process in batches (not strictly required here)

# Try BM3D for best denoising if installed; else fallback to OpenCV fastNlMeans
try:
    from bm3d import bm3d_rgb, bm3d
    BM3D_AVAILABLE = True
    print("BM3D available: using BM3D denoiser for best quality.")
except Exception:
    BM3D_AVAILABLE = False
    print("BM3D not available: falling back to OpenCV Non-local Means denoiser.")

# -------------------------
# Utility functions
# -------------------------
def compute_sharpness(gray: np.ndarray) -> float:
    lap = cv2.Laplacian(gray, cv2.CV_64F)
    return float(lap.var())

def compute_texture_strength(gray: np.ndarray, ksize: int = 7) -> float:
    img = gray.astype(np.float32)
    blurred = cv2.GaussianBlur(img, (ksize, ksize), 0)
    mean = cv2.boxFilter(blurred, ddepth=-1, ksize=(ksize, ksize))
    mean_sq = cv2.boxFilter(blurred * blurred, ddepth=-1, ksize=(ksize, ksize))
    local_var = mean_sq - (mean * mean)
    return float(np.median(local_var))

def standardize_scores(arr: np.ndarray) -> np.ndarray:
    arr = np.asarray(arr, dtype=np.float32)
    if arr.size == 0:
        return arr
    mn = arr.min()
    mx = arr.max()
    if mx <= mn:
        return np.ones_like(arr)
    return (arr - mn) / (mx - mn)

# Denoise using best available method
def denoise_frame_rgb(frame_rgb: np.ndarray) -> np.ndarray:
    """
    Input: RGB uint8 image (H, W, 3)
    Output: denoised RGB as float32
    """
    if BM3D_AVAILABLE:
        # bm3d expects float in 0..1 or 0..255 depending on implementation; here we normalize to 0..1
        im_f = frame_rgb.astype(np.float32) / 255.0
        try:
            den = bm3d_rgb(im_f, sigma_psd=10/255.0)  # heuristic sigma; BM3D sigma tuning may be needed
            den = np.clip(den, 0.0, 1.0)
            return (den * 255.0).astype(np.float32)
        except Exception:
            # fallback if bm3d_rgb signature differs
            den = bm3d(frame_rgb.astype(np.float32) / 255.0)
            return (np.clip(den, 0.0, 1.0) * 255.0).astype(np.float32)
    else:
        # OpenCV colored denoising is decent: convert to BGR since opencv function expects that
        bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
        # h parameters tuned for strong denoising but preserving PRNU as much as possible
        den_bgr = cv2.fastNlMeansDenoisingColored(bgr, None, h=10, hColor=10, templateWindowSize=7, searchWindowSize=21)
        den_rgb = cv2.cvtColor(den_bgr, cv2.COLOR_BGR2RGB)
        return den_rgb.astype(np.float32)

def extract_noise_residual(frame_rgb: np.ndarray) -> np.ndarray:
    """
    Compute noise residual = frame - denoised_frame, return grayscale residual normalized (zero-mean, unit-std).
    Input frame_rgb: uint8 RGB
    Output: float32 2D array
    """
    den = denoise_frame_rgb(frame_rgb)
    # compute residual in float
    residual = frame_rgb.astype(np.float32) - den.astype(np.float32)
    # convert to grayscale residual (weighted)
    if residual.ndim == 3:
        res_gray = 0.2989 * residual[...,0] + 0.5870 * residual[...,1] + 0.1140 * residual[...,2]
    else:
        res_gray = residual.astype(np.float32)
    # normalize to zero mean and unit var
    mu = np.mean(res_gray)
    sigma = np.std(res_gray) + 1e-10
    res_norm = (res_gray - mu) / sigma
    return res_norm.astype(np.float32)

# FFT-based cross-correlation (returns full correlation map)
def crosscorr_2d_fft(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    """
    Compute normalized cross-correlation map using FFT convolution.
    Both a and b are same-shape 2D arrays. We compute cross-correlation and normalize.
    """
    # ensure float32
    a = a.astype(np.float32)
    b = b.astype(np.float32)
    # zero-mean
    a = a - a.mean()
    b = b - b.mean()
    # flip b for cross-correlation equivalence with convolution
    b_flip = np.flipud(np.fliplr(b))
    conv = signal.fftconvolve(a, b_flip, mode='same')
    # normalisation: divide by sqrt(Ea * Eb) where Ea are local energy maps (approx)
    # We'll approximate denominator with global energies for speed (consistent across comparisons)
    denom = np.sqrt((a**2).sum() * (b**2).sum()) + 1e-10
    return conv / denom

def compute_pce_from_corrmap(corr: np.ndarray, ignore_radius: int = 11) -> float:
    """
    Compute Peak-to-Correlation Energy (PCE) from correlation map.
    PCE = (peak^2) / (variance of correlation excluding small neighborhood around peak)
    """
    if corr.size == 0:
        return float('-inf')
    peak_idx = np.unravel_index(np.argmax(corr), corr.shape)
    peak_val = corr[peak_idx]
    # mask out small neighborhood
    h, w = corr.shape
    Y, X = np.ogrid[:h, :w]
    mask = np.ones_like(corr, dtype=bool)
    y0, x0 = peak_idx
    rr = (Y - y0)**2 + (X - x0)**2
    mask[rr <= (ignore_radius**2)] = False
    outside = corr[mask]
    if outside.size <= 1:
        return float('-inf')
    var_out = outside.var()
    if var_out <= 1e-12:
        return float('inf') if peak_val > 0 else float('-inf')
    pce = (peak_val**2) / var_out
    return float(pce)

# rotation + flip scanning for best PCE
def best_pce_between(residual_frame: np.ndarray, fp_map: np.ndarray) -> float:
    best = float('-inf')
    # ensure same shape: if fingerprint differs, resize fingerprint to frame size
    if fp_map.shape != residual_frame.shape:
        fp_resized = cv2.resize(fp_map.astype(np.float32), (residual_frame.shape[1], residual_frame.shape[0]), interpolation=cv2.INTER_LINEAR)
    else:
        fp_resized = fp_map.astype(np.float32)
    # normalize fingerprint similarly
    fp_resized = (fp_resized - fp_resized.mean()) / (fp_resized.std() + 1e-10)
    # try 4 rotations and flips
    for k in range(4):
        rot = np.rot90(residual_frame, k)
        for flip in (False, True):
            test = np.fliplr(rot) if flip else rot
            corr = crosscorr_2d_fft(test, fp_resized)
            pce = compute_pce_from_corrmap(corr, ignore_radius=11)
            if pce > best:
                best = pce
    return best

# Adaptive aggregation across frames
def aggregate_video_scores(frame_pces: List[float], quality_vals: List[float], drop_bottom_pct: float = DROP_BOTTOM_PERCENT) -> Tuple[float, List[int]]:
    if len(frame_pces) == 0:
        return float('-inf'), []
    frame_pces = np.asarray(frame_pces, dtype=np.float32)
    quality_vals = np.asarray(quality_vals, dtype=np.float32)
    cutoff = np.percentile(quality_vals, drop_bottom_pct)
    keep_mask = quality_vals > cutoff
    if not np.any(keep_mask):
        keep_mask[int(np.argmax(quality_vals))] = True
    kept_idx = np.where(keep_mask)[0].tolist()
    kept_pces = frame_pces[keep_mask]
    kept_quals = quality_vals[keep_mask]
    q_norm = standardize_scores(kept_quals)
    if q_norm.sum() <= 0:
        weights = np.ones_like(q_norm) / len(q_norm)
    else:
        weights = q_norm / q_norm.sum()
    agg = float(np.sum(kept_pces * weights))
    return agg, kept_idx

# -------------------------
# Matching per video
# -------------------------
def match_video_to_fingerprints(video_path: str, fingerprints: Dict[str, np.ndarray],
                                frames_per_video: int = FRAMES_PER_VIDEO,
                                target_w: int = None, target_h: int = None) -> Tuple[Optional[str], float, Dict[str, float]]:
    """
    Returns (predicted_device, aggregated_score, per_device_score_map)
    """
    # Use your earlier extract_frames_from_video function ‚Äî ensure it's defined in the notebook
    if target_w is None or target_h is None:
        frames = extract_frames_from_video(video_path, frames_per_video, TARGET_WIDTH, TARGET_HEIGHT)
    else:
        frames = extract_frames_from_video(video_path, frames_per_video, target_w, target_h)
    if not frames:
        return None, float('-inf'), {}

    frame_pces_per_device = {dev: [] for dev in fingerprints.keys()}
    quality_vals = []

    # Process frames sequentially (memory safe)
    for frame in frames:
        # ensure uint8 RGB
        frame_uint8 = frame.astype(np.uint8)
        # compute quality from original frame
        gray = cv2.cvtColor(frame_uint8, cv2.COLOR_RGB2GRAY)
        sharp = compute_sharpness(gray)
        text = compute_texture_strength(gray)
        quality_vals.append(0.5 * sharp + 0.5 * text)

        # compute residual
        res = extract_noise_residual(frame_uint8)  # float32 normalized

        # for each device fingerprint compute pce
        for dev, fp in fingerprints.items():
            # prepare fingerprint grayscale map
            if fp.ndim == 3:
                fp_gray = 0.2989 * fp[...,0] + 0.5870 * fp[...,1] + 0.1140 * fp[...,2]
            else:
                fp_gray = fp.astype(np.float32)
            pce = best_pce_between(res, fp_gray)
            frame_pces_per_device[dev].append(pce)

    # Aggregate per device
    agg_scores = {}
    for dev, pces in frame_pces_per_device.items():
        agg, kept_idx = aggregate_video_scores(pces, quality_vals, drop_bottom_pct=DROP_BOTTOM_PERCENT)
        agg_scores[dev] = agg

    # choose best device
    best_dev = max(agg_scores.items(), key=lambda x: x[1])[0]
    best_score = agg_scores[best_dev]
    return best_dev, best_score, agg_scores

# -------------------------
# Main evaluation loop across devices' query_set
# -------------------------
def evaluate_all_query_sets(data_root: str = DATA_ROOT, fingerprint_dir: str = FINGERPRINT_DIR):
    # Load fingerprints from fingerprint_dir
    fingerprints = {}
    for f in os.listdir(fingerprint_dir):
        if f.endswith('_video_fingerprint.npy'):
            dev = f.replace('_video_fingerprint.npy', '')
            fp = np.load(os.path.join(fingerprint_dir, f))
            fingerprints[dev] = fp
    if not fingerprints:
        print("No fingerprints found. Generate fingerprints first.")
        return

    devices = [d for d in os.listdir(data_root) if os.path.isdir(os.path.join(data_root, d))]
    overall_total = 0
    overall_correct = 0

    print("\n=== QUERY SET EVALUATION (MAX ACCURACY HYBRID) ===\n")
    for dev in devices:
        qdir = os.path.join(data_root, dev, 'videos', 'query_set')
        if not os.path.isdir(qdir):
            print(f"No query_set for device {dev}, skipping.")
            continue
        video_paths = sorted(glob.glob(os.path.join(qdir, '*.mp4')) + glob.glob(os.path.join(qdir, '*.mov')) + glob.glob(os.path.join(qdir, '*.avi')))
        if not video_paths:
            print(f"No query videos found for {dev}.")
            continue

        correct = 0
        matched_list, notmatched_list = [], []
        print(f"\nüìç Device: {dev}  (query videos: {len(video_paths)})")

        for v in video_paths:
            print(f"  ‚Üí Evaluating: {os.path.basename(v)}")
            pred, score, per_dev = match_video_to_fingerprints(v, fingerprints)
            print(f"     predicted: {pred}   agg_score: {score:.4f}")
            if pred == dev:
                correct += 1
                matched_list.append(os.path.basename(v))
            else:
                notmatched_list.append(os.path.basename(v))

        overall_total += len(video_paths)
        overall_correct += correct

        pct = (correct / len(video_paths) * 100) if video_paths else 0.0
        print(f"\n  ‚úÖ Query results for {dev}: {correct}/{len(video_paths)} matched ({pct:.1f}%)")
        if matched_list:
            print(f"     ‚úì matched: {', '.join(matched_list)}")
        if notmatched_list:
            print(f"     ‚úó not matched: {', '.join(notmatched_list)}")

    overall_acc = (overall_correct / overall_total * 100) if overall_total > 0 else 0.0
    print("\n=== OVERALL SUMMARY ===")
    print(f"Total matched: {overall_correct} / {overall_total}  -> Accuracy: {overall_acc:.2f}%")
    print("===============================================")

# Run evaluation
evaluate_all_query_sets()


BM3D not available: falling back to OpenCV Non-local Means denoiser.

=== QUERY SET EVALUATION (MAX ACCURACY HYBRID) ===


üìç Device: iphone15  (query videos: 3)
  ‚Üí Evaluating: IMG_5454.mp4
     predicted: iphone15   agg_score: 71.2470
  ‚Üí Evaluating: IMG_5455.mp4
     predicted: iphone15   agg_score: 108.5406
  ‚Üí Evaluating: IMG_5456 copy.mp4
     predicted: iphone15   agg_score: 242.2021

  ‚úÖ Query results for iphone15: 3/3 matched (100.0%)
     ‚úì matched: IMG_5454.mp4, IMG_5455.mp4, IMG_5456 copy.mp4

üìç Device: OnePlus Nord CE4  (query videos: 4)
  ‚Üí Evaluating: VID20251030112624.mp4
     predicted: Samsung S21 FE   agg_score: 69.9761
  ‚Üí Evaluating: VID20251030112641.mp4
     predicted: Samsung S21 FE   agg_score: 72.2863
  ‚Üí Evaluating: VID20251030112807.mp4
     predicted: Samsung S21 FE   agg_score: 96.7378
  ‚Üí Evaluating: VID20251030112925.mp4
     predicted: iphone15   agg_score: 81.5727

  ‚úÖ Query results for OnePlus Nord CE4: 0/4 matched (0.0%)
    

In [22]:
import os
import numpy as np
import cv2
from glob import glob
from prnu import extract_multiple_aligned
from PIL import Image

# -------- CONFIG --------
DEVICES = ['iphone15', 'OnePlus Nord CE4', 'Samsung S21 FE', 'Samsung S23 5g']
FINGERPRINT_DIR = r"fingerprints"
BASE_QUERY_PATH = r"C:\Users\Khushi\prnu-camera-source-detection\data"

TARGET_WIDTH = 512
TARGET_HEIGHT = 384
FRAMES_PER_VIDEO = 50   # can reduce for speed
BATCH_SIZE = 10
# -------------------------


def resize_with_padding(img_np, target_width, target_height):
    img = Image.fromarray(img_np)
    img.thumbnail((target_width, target_height), Image.LANCZOS)
    new_img = Image.new("RGB", (target_width, target_height))
    new_img.paste(img, ((target_width - img.width) // 2, (target_height - img.height) // 2))
    return np.array(new_img, dtype=np.uint8)


def extract_prnu_query(video_path):
    """Extract PRNU using SAME method as fingerprint generation."""
    cap = cv2.VideoCapture(video_path)
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    if total_frames == 0:
        cap.release()
        return None

    frame_indices = np.linspace(0, total_frames - 1, FRAMES_PER_VIDEO, dtype=int)

    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            continue

        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = resize_with_padding(rgb, TARGET_WIDTH, TARGET_HEIGHT)
        frames.append(img)

    cap.release()

    if len(frames) == 0:
        return None

    # Compute PRNU using extract_multiple_aligned to match fingerprint method
    prnu = None
    for i in range(0, len(frames), BATCH_SIZE):
        batch = np.array(frames[i:i+BATCH_SIZE], dtype=np.uint8)
        prnu_part = extract_multiple_aligned(batch, processes=0)

        prnu = prnu_part if prnu is None else (prnu + prnu_part) / 2

    return prnu


def normalized_cross_correlation(a, b):
    a = (a - np.mean(a)) / (np.std(a) + 1e-8)
    b = (b - np.mean(b)) / (np.std(b) + 1e-8)
    return np.mean(a * b)


# Load stored fingerprints
fingerprints = {}
for device in DEVICES:
    fp_path = os.path.join(FINGERPRINT_DIR, f"{device}_video_fingerprint.npy")
    if os.path.exists(fp_path):
        fingerprints[device] = np.load(fp_path)
        print(f"[+] Loaded fingerprint for {device}, shape: {fingerprints[device].shape}")
    else:
        print(f"[!] Missing fingerprint for {device}")


results = {d: {"correct": 0, "total": 0} for d in DEVICES}


# ---- MATCHING ----
for device in DEVICES:
    query_folder = os.path.join(BASE_QUERY_PATH, device, "videos", "query_set")
    query_videos = glob(os.path.join(query_folder, "*.mp4"))

    print(f"\n--- Checking videos for {device} ---")
    for video in query_videos:
        query_prnu = extract_prnu_query(video)

        if query_prnu is None:
            print(f"‚ùå Could not extract PRNU for {video}")
            continue

        scores = {dev_name: normalized_cross_correlation(query_prnu, fp)
                  for dev_name, fp in fingerprints.items()}

        predicted_device = max(scores, key=scores.get)

        results[device]["total"] += 1
        if predicted_device == device:
            results[device]["correct"] += 1

        print(f"{os.path.basename(video)} ‚Üí Predicted: {predicted_device}")


# ---- SUMMARY ----
print("\n======== FINAL RESULTS ========")
total_correct = 0
total_videos = 0

for device, data in results.items():
    correct = data["correct"]
    total = data["total"]
    total_correct += correct
    total_videos += total
    print(f"{device} has {total} videos: correct categorizations = {correct}/{total}")

print("\nOverall Accuracy:", f"{total_correct}/{total_videos}")



[+] Loaded fingerprint for iphone15, shape: (384, 512)
[+] Loaded fingerprint for OnePlus Nord CE4, shape: (384, 512)
[+] Loaded fingerprint for Samsung S21 FE, shape: (384, 512)
[+] Loaded fingerprint for Samsung S23 5g, shape: (384, 512)

--- Checking videos for iphone15 ---


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.18it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.27it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.34it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.56it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.67it/s]


IMG_5454.mp4 ‚Üí Predicted: iphone15


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.73it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.50it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.70it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.15it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.33it/s]


IMG_5455.mp4 ‚Üí Predicted: iphone15


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.54it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.92it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.60it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.89it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  9.15it/s]


IMG_5456 copy.mp4 ‚Üí Predicted: iphone15

--- Checking videos for OnePlus Nord CE4 ---


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.84it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.97it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.51it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.65it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.08it/s]


VID20251030112624.mp4 ‚Üí Predicted: Samsung S21 FE


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.43it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.25it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.44it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.27it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 9/9 [00:01<00:00,  8.21it/s]


VID20251030112641.mp4 ‚Üí Predicted: iphone15


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.34it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.43it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.84it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.42it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.32it/s]


VID20251030112807.mp4 ‚Üí Predicted: iphone15


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.69it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.07it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.18it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.45it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.03it/s]


VID20251030112925.mp4 ‚Üí Predicted: OnePlus Nord CE4

--- Checking videos for Samsung S21 FE ---


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.73it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.60it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.91it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.72it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.53it/s]


20251017_194729.mp4 ‚Üí Predicted: Samsung S23 5g


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.87it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.88it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.58it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.51it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.81it/s]


20251017_202243.mp4 ‚Üí Predicted: Samsung S21 FE

--- Checking videos for Samsung S23 5g ---


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  6.52it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  6.31it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  6.21it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  6.60it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.14it/s]


20250712_120212.mp4 ‚Üí Predicted: iphone15


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.83it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.68it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.23it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.09it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.79it/s]


20250914_130351.mp4 ‚Üí Predicted: iphone15


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  7.58it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.56it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.60it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.41it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.49it/s]


20250927_205316.mp4 ‚Üí Predicted: Samsung S21 FE


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  5.74it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  5.92it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  6.74it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  6.43it/s]
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 10/10 [00:01<00:00,  8.35it/s]

20250930_173755.mp4 ‚Üí Predicted: iphone15

iphone15 has 3 videos: correct categorizations = 3/3
OnePlus Nord CE4 has 4 videos: correct categorizations = 1/4
Samsung S21 FE has 2 videos: correct categorizations = 1/2
Samsung S23 5g has 4 videos: correct categorizations = 0/4

Overall Accuracy: 5/13





In [18]:
def identify_device_from_video(video_path: str,
                               fingerprint_dir: str = r"C:\Users\Khushi\prnu-camera-source-detection\fingerprints") -> None:
    """
    Identify which device a single video most likely came from.
    Prints the result and returns (predicted_device, score, per_device_scores)
    """
    if not os.path.exists(video_path):
        print(f"‚ùå Video not found: {video_path}")
        return None, None, {}

    # Load fingerprints
    fingerprints = {}
    for f in os.listdir(fingerprint_dir):
        if f.endswith('_video_fingerprint.npy'):
            dev = f.replace('_video_fingerprint.npy', '')
            fp = np.load(os.path.join(fingerprint_dir, f))
            fingerprints[dev] = fp

    if not fingerprints:
        print("‚ùå No fingerprints found. Generate them first.")
        return None, None, {}

    print(f"\nüîç Identifying device for video: {os.path.basename(video_path)}")

    pred, score, per_device_scores = match_video_to_fingerprints(video_path, fingerprints)

    print("\n===== RESULT =====")
    print(f"üìå Predicted Device: **{pred}**")
    print(f"üìà Confidence Score: {score:.4f}")

    print("\n--- Scores for All Devices (Higher = Better) ---")
    for dev, sc in sorted(per_device_scores.items(), key=lambda x: x[1], reverse=True):
        print(f"{dev:20} : {sc:.4f}")

    print("==================\n")

    return pred, score, per_device_scores


In [None]:
# CELL 4: MATCHING WITH ROTATION HANDLING

import numpy as np

def normalize(img):
    """Zero-mean, unit-variance normalization."""
    return (img - np.mean(img)) / (np.std(img) + 1e-8)

best_match_device = None
highest_correlation = -1.0

print("\n--- Starting Cross-Correlation Matching (Including Rotation + Flipping) ---")

for device, fingerprint in fingerprints.items():
    
    # Convert fingerprint to grayscale if needed and normalize
    fp_gray = rgb2gray(fingerprint) if fingerprint.ndim == 3 else fingerprint
    fp_gray = normalize(fp_gray)

    device_scores = []
    
    for frame_idx, query_noise_gray in enumerate(query_noise_residuals):
        query_noise_gray = normalize(query_noise_gray)
        
        # Test 4 rotations and horizontal flipping
        frame_scores = []
        for k in range(4):  # 0, 90, 180, 270 degrees
            rotated_noise = np.rot90(query_noise_gray, k)
            
            for flip in [False, True]:
                noise_test = np.fliplr(rotated_noise) if flip else rotated_noise
                correlation_map = crosscorr_2d(noise_test, fp_gray)
                frame_scores.append(float(np.max(correlation_map)))
        
        device_scores.append(np.max(frame_scores))

    max_device_correlation = np.max(device_scores)
    print(f"  - Max Correlation with {device} (over {len(device_scores)} frames): {max_device_correlation:.4f}")

    if max_device_correlation > highest_correlation:
        highest_correlation = max_device_correlation
        best_match_device = device

print("\n--- Final Results ---")
if best_match_device is not None:
    print(f"üöÄ The best match for the test video is: **{best_match_device}** with a score of {highest_correlation:.4f}")
else:
    print("‚ö†Ô∏è No reliable match found.")


--- Starting Cross-Correlation Matching (Including Rotation + Flipping) ---
  - Max Correlation with iphone15 (over 5 frames): 7258.4644
  - Max Correlation with OnePlus Nord CE4 (over 5 frames): 7681.9429
  - Max Correlation with Samsung S21 FE (over 5 frames): 6285.2227
  - Max Correlation with Samsung S23 5g (over 5 frames): 7248.3530

--- Final Results ---
üöÄ The best match for the test video is: **OnePlus Nord CE4** with a score of 7681.9429


In [None]:
import os, glob, gc
import numpy as np
from PIL import Image
import cv2
from prnu import extract_multiple_aligned

# -------------------
# SETTINGS
# -------------------

data_directory = r"C:\Users\Khushi\prnu-camera-source-detection\data"
fingerprint_directory = os.path.join(os.path.dirname(data_directory), 'fingerprints')
os.makedirs(fingerprint_directory, exist_ok=True)

TARGET_WIDTH = 512
TARGET_HEIGHT = 384
FRAMES_PER_VIDEO = 60
BATCH_SIZE = 15
MAX_VIDEOS = 8    # Use EXACTLY 8 videos per device


# -------------------
# HELPERS
# -------------------

def resize_with_padding(img_np, target_width, target_height):
    img = Image.fromarray(img_np)
    img.thumbnail((target_width, target_height), Image.LANCZOS)
    new_img = Image.new("RGB", (target_width, target_height))
    new_img.paste(img, ((target_width - img.width) // 2, (target_height - img.height) // 2))
    return np.array(new_img, dtype=np.uint8)


def extract_frames_g_channel(video_path, frames_per_video):
    frames = []
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    if total_frames == 0:
        cap.release()
        return frames

    frame_indices = np.linspace(0, total_frames - 1, frames_per_video, dtype=int)

    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            resized = resize_with_padding(rgb, TARGET_WIDTH, TARGET_HEIGHT)

            # Keep **only green channel**
            g_channel = resized[:, :, 1]
            frames.append(g_channel)

    cap.release()
    return frames


# -------------------
# MAIN PROCESS
# -------------------

device_folders = [f for f in os.listdir(data_directory) if os.path.isdir(os.path.join(data_directory, f))]
print(f"Found devices: {device_folders}")

for device in device_folders:
    print(f"\nüìç Processing fingerprints for: {device}")

    video_dir = os.path.join(data_directory, device, 'videos', 'fingerprint_set')
    video_paths = sorted(glob.glob(os.path.join(video_dir, '*.mp4')) +
                         glob.glob(os.path.join(video_dir, '*.mov')) +
                         glob.glob(os.path.join(video_dir, '*.avi')))

    if len(video_paths) == 0:
        print(f"‚ö†Ô∏è No fingerprint videos found for {device}. Skipping.")
        continue

    # Balance: Use only first 8 videos
    video_paths = video_paths[:MAX_VIDEOS]
    print(f"Using {len(video_paths)} videos for fingerprint.")

    fp_sum = None
    count = 0

    for path in video_paths:
        print(f"  ‚Üí Extracting frames from: {os.path.basename(path)}")
        frames = extract_frames_g_channel(path, FRAMES_PER_VIDEO)

        if not frames:
            print(f"  ‚ö†Ô∏è No frames extracted from {path}")
            continue

        # Process in batches to avoid memory spike
        for i in range(0, len(frames), BATCH_SIZE):
            batch = np.array(frames[i:i+BATCH_SIZE], dtype=np.uint8)
            batch = np.expand_dims(batch, axis=-1)  # shape (batch, H, W, 1)

            prnu_part = extract_multiple_aligned(batch, processes=0)

            if fp_sum is None:
                fp_sum = prnu_part
            else:
                fp_sum += prnu_part

            count += 1
            del batch
            gc.collect()

        del frames
        gc.collect()

    if fp_sum is None:
        print(f"‚ö†Ô∏è No fingerprint generated for {device}")
        continue

    fingerprint = fp_sum / count
    save_path = os.path.join(fingerprint_directory, f"{device}_video_fingerprint.npy")
    np.save(save_path, fingerprint)

    print(f"‚úÖ Saved fingerprint for {device}: {save_path}, shape: {fingerprint.shape}, averaged over {count} PRNU chunks")

In [24]:
!pip install bm3d
!pip install scikit-image opencv-python numpy

Collecting bm3d
  Downloading bm3d-4.0.3-py3-none-any.whl.metadata (2.3 kB)
Collecting bm4d>=4.2.5 (from bm3d)
  Downloading bm4d-4.2.5-py3-none-any.whl.metadata (3.1 kB)
Downloading bm3d-4.0.3-py3-none-any.whl (10 kB)
Downloading bm4d-4.2.5-py3-none-any.whl (862 kB)
   ---------------------------------------- 0.0/862.0 kB ? eta -:--:--
   ------------ --------------------------- 262.1/862.0 kB ? eta -:--:--
   ---------------------------------------- 862.0/862.0 kB 3.0 MB/s  0:00:00
Installing collected packages: bm4d, bm3d

   ---------------------------------------- 0/2 [bm4d]
   ---------------------------------------- 2/2 [bm3d]

Successfully installed bm3d-4.0.3 bm4d-4.2.5



[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


Collecting scikit-image
  Downloading scikit_image-0.25.2-cp312-cp312-win_amd64.whl.metadata (14 kB)
Collecting imageio!=2.35.0,>=2.33 (from scikit-image)
  Downloading imageio-2.37.0-py3-none-any.whl.metadata (5.2 kB)
Collecting tifffile>=2022.8.12 (from scikit-image)
  Downloading tifffile-2025.10.16-py3-none-any.whl.metadata (31 kB)
Collecting lazy-loader>=0.4 (from scikit-image)
  Downloading lazy_loader-0.4-py3-none-any.whl.metadata (7.6 kB)
Downloading scikit_image-0.25.2-cp312-cp312-win_amd64.whl (12.9 MB)
   ---------------------------------------- 0.0/12.9 MB ? eta -:--:--
    --------------------------------------- 0.3/12.9 MB ? eta -:--:--
   - -------------------------------------- 0.5/12.9 MB 2.1 MB/s eta 0:00:06
   --- ------------------------------------ 1.0/12.9 MB 1.8 MB/s eta 0:00:07
   ---- ----------------------------------- 1.6/12.9 MB 2.1 MB/s eta 0:00:06
   ------ --------------------------------- 2.1/12.9 MB 2.2 MB/s eta 0:00:05
   -------- ---------------------


[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [26]:
# Jupyter / VSCode cell
!pip install bm3d opencv-python-headless numpy scipy tqdm

Collecting opencv-python-headless
  Downloading opencv_python_headless-4.12.0.88-cp37-abi3-win_amd64.whl.metadata (20 kB)
Downloading opencv_python_headless-4.12.0.88-cp37-abi3-win_amd64.whl (38.9 MB)
   ---------------------------------------- 0.0/38.9 MB ? eta -:--:--
   ---------------------------------------- 0.0/38.9 MB ? eta -:--:--
   ---------------------------------------- 0.3/38.9 MB ? eta -:--:--
    --------------------------------------- 0.8/38.9 MB 1.3 MB/s eta 0:00:30
   - -------------------------------------- 1.0/38.9 MB 1.4 MB/s eta 0:00:28
   - -------------------------------------- 1.6/38.9 MB 1.6 MB/s eta 0:00:25
   - -------------------------------------- 1.8/38.9 MB 1.6 MB/s eta 0:00:23
   -- ------------------------------------- 2.6/38.9 MB 1.9 MB/s eta 0:00:20
   --- ------------------------------------ 3.1/38.9 MB 2.0 MB/s eta 0:00:18
   ---- ----------------------------------- 3.9/38.9 MB 2.2 MB/s eta 0:00:16
   ---- ----------------------------------- 4.5/38

ERROR: Could not install packages due to an OSError: [WinError 5] Access is denied: 'C:\\Users\\Khushi\\AppData\\Local\\Programs\\Python\\Python312\\Lib\\site-packages\\cv2\\cv2.pyd'
Consider using the `--user` option or check the permissions.


[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
"""
generate_fingerprints_fast.py
- Fast BM3D-based fingerprint generation (parallel per device)
- Y-channel, BM3D mild, target size 1024x768
- Uses up to 8 videos per device, 8 frames per video
"""

import os, glob, gc
import numpy as np
from PIL import Image
import cv2
from bm3d import bm3d
from tqdm import tqdm
from multiprocessing import Pool, cpu_count

# ------- CONFIG -------
DATA_DIR = r"C:\Users\Khushi\prnu-camera-source-detection\data"
FINGERPRINT_DIR = os.path.join(os.path.dirname(DATA_DIR), "fingerprints")
os.makedirs(FINGERPRINT_DIR, exist_ok=True)

TARGET_WIDTH = 1024
TARGET_HEIGHT = 768
MAX_VIDEOS = 8
FRAMES_PER_VIDEO = 8          # reduced for speed, still good quality
BATCH_SIZE = 8
BM3D_SIGMA = 2.5 / 255.0      # mild BM3D
WORKERS = max(1, min(cpu_count() - 1, 6))  # cap workers to avoid oversubscribe
# ----------------------

def resize_with_padding(img_np, target_w, target_h):
    img = Image.fromarray(img_np)
    img.thumbnail((target_w, target_h), Image.LANCZOS)
    new_img = Image.new("RGB", (target_w, target_h))
    new_img.paste(img, ((target_w - img.width) // 2, (target_h - img.height) // 2))
    return np.array(new_img, dtype=np.uint8)

def extract_y_frames_from_video(video_path, frames_per_video):
    frames_y = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        cap.release()
        return frames_y
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
    if total == 0:
        cap.release()
        return frames_y
    indices = np.linspace(0, total - 1, frames_per_video, dtype=int)
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
        ret, frame = cap.read()
        if not ret:
            continue
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        resized = resize_with_padding(rgb, TARGET_WIDTH, TARGET_HEIGHT)
        ycc = cv2.cvtColor(resized, cv2.COLOR_RGB2YCrCb)
        y = ycc[:, :, 0]
        frames_y.append(y.astype(np.float32) / 255.0)  # normalize
    cap.release()
    return frames_y

def process_video_frames_to_residuals(frames_y):
    """Denoise frames with BM3D and return residuals list."""
    residuals = []
    for imgf in frames_y:
        try:
            den = bm3d(imgf, BM3D_SIGMA, stage_arg=bm3d.STAGE_ALL)
        except Exception:
            den = bm3d(imgf, BM3D_SIGMA)
        resid = imgf - den
        residuals.append(resid.astype(np.float32))
    return residuals

def compute_device_fingerprint(device_name):
    """Compute fingerprint for one device (used by Pool)."""
    video_dir = os.path.join(DATA_DIR, device_name, "videos", "fingerprint_set")
    video_paths = sorted(glob.glob(os.path.join(video_dir, "*.mp4")) +
                         glob.glob(os.path.join(video_dir, "*.mov")) +
                         glob.glob(os.path.join(video_dir, "*.avi")))
    if len(video_paths) == 0:
        print(f"[!] No fingerprint videos for {device_name} in {video_dir}")
        return device_name, None

    use_paths = video_paths[:MAX_VIDEOS]
    fp_sum = None
    count = 0

    # iterate videos, process frames -> residuals
    for vp in use_paths:
        frames_y = extract_y_frames_from_video(vp, FRAMES_PER_VIDEO)
        if not frames_y:
            continue
        residuals = process_video_frames_to_residuals(frames_y)
        for r in residuals:
            if fp_sum is None:
                fp_sum = np.zeros_like(r, dtype=np.float64)
            fp_sum += r
            count += 1
        # free
        del frames_y, residuals
        gc.collect()

    if fp_sum is None or count == 0:
        print(f"[!] Could not produce fingerprint for {device_name}")
        return device_name, None

    fingerprint = (fp_sum / float(count)).astype(np.float32)
    save_path = os.path.join(FINGERPRINT_DIR, f"{device_name}_video_fingerprint.npy")
    np.save(save_path, fingerprint)
    print(f"[+] Saved fingerprint for {device_name}: {save_path} shape={fingerprint.shape} (avg over {count} frames)")
    return device_name, save_path

def main():
    device_name = "Samsung S23 5g"   # <-- change this to your device folder name
    print(f"Generating fingerprint for: {device_name}")
    dev, path = compute_device_fingerprint(device_name)

    if path is None:
        print(f"[!] Fingerprint generation failed for {device_name}")
    else:
        print(f"[+] Fingerprint generated and saved at: {path}")

if __name__ == "__main__":
    main()


'''
def main():
    devices = [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))]
    if not devices:
        print("[!] No device folders found in DATA_DIR")
        return

    print("Devices:", devices)
    print("\n[Running sequential mode to avoid Windows multiprocessing hang]\n")

    results = []
    for dev in devices:
        print(f"\nProcessing fingerprint for: {dev}")
        result = compute_device_fingerprint(dev)
        results.append(result)

    # summary
    print("\n======== SUMMARY ========")
    for dev, path in results:
        if path is None:
            print(f"[!] Fingerprint failed for {dev}")
        else:
            print(f"[+] Fingerprint ready: {dev} -> {path}")

if __name__ == "__main__":
    main()
'''

Generating fingerprint for: Samsung S23 5g
[+] Saved fingerprint for Samsung S23 5g: C:\Users\Khushi\prnu-camera-source-detection\fingerprints\Samsung S23 5g_video_fingerprint.npy shape=(768, 1024) (avg over 64 frames)
[+] Fingerprint generated and saved at: C:\Users\Khushi\prnu-camera-source-detection\fingerprints\Samsung S23 5g_video_fingerprint.npy




In [9]:
"""
match_query_videos_fast.py
- Load fingerprints and classify query videos using hybrid PCE+NCC
- Query extraction uses BM3D mild on Y-channel, resized to 1024x768
- FRAMES_PER_VIDEO_QUERY set to 6 for speed
"""

import os, glob
import numpy as np
import cv2
from PIL import Image
from bm3d import bm3d
from tqdm import tqdm

# ------- CONFIG -------
DATA_DIR = r"C:\Users\Khushi\prnu-camera-source-detection\data"
FINGERPRINT_DIR = os.path.join(os.path.dirname(DATA_DIR), "fingerprints")
DEVICES = ['iphone15', 'OnePlus Nord CE4', 'Samsung S21 FE', 'Samsung S23 5g']
TARGET_WIDTH = 1024
TARGET_HEIGHT = 768
FRAMES_PER_VIDEO_QUERY = 6
BM3D_SIGMA = 2.6 / 255.0 #(2.5 was giving 9/13)
ALPHA = 0.7   # weight for PCE in hybrid score
EPS = 1e-12
# ----------------------

def resize_with_padding(img_np, target_w, target_h):
    img = Image.fromarray(img_np)
    img.thumbnail((target_w, target_h), Image.LANCZOS)
    new_img = Image.new("RGB", (target_w, target_h))
    new_img.paste(img, ((target_w - img.width) // 2, (target_h - img.height) // 2))
    return np.array(new_img, dtype=np.uint8)

def extract_query_prnu(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        cap.release()
        return None
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
    if total == 0:
        cap.release()
        return None
    indices = np.linspace(0, total - 1, FRAMES_PER_VIDEO_QUERY, dtype=int)
    residuals = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
        ret, frame = cap.read()
        if not ret:
            continue
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        resized = resize_with_padding(rgb, TARGET_WIDTH, TARGET_HEIGHT)
        ycc = cv2.cvtColor(resized, cv2.COLOR_RGB2YCrCb)
        y = ycc[:, :, 0].astype(np.float32) / 255.0
        try:
            den = bm3d(y, BM3D_SIGMA, stage_arg=bm3d.STAGE_ALL)
        except Exception:
            den = bm3d(y, BM3D_SIGMA)
        resid = y - den
        residuals.append(resid.astype(np.float32))
    cap.release()
    if not residuals:
        return None
    return np.mean(residuals, axis=0).astype(np.float32)

def ncc_score(a, b):
    a_flat = a.ravel()
    b_flat = b.ravel()
    a_z = a_flat - a_flat.mean()
    b_z = b_flat - b_flat.mean()
    denom = np.sqrt((a_z**2).sum() * (b_z**2).sum()) + EPS
    return float((a_z * b_z).sum() / denom)

def compute_pce(a, b):
    # For speed
    H, W = a.shape
    fa = a - a.mean()
    fb = b - b.mean()
    FA = np.fft.fft2(fa)
    FB = np.fft.fft2(fb)
    corr = np.fft.ifft2(FA * np.conj(FB)).real
    corr = np.fft.fftshift(corr)
    peak = corr.max()
    peak_idx = np.unravel_index(np.argmax(corr), corr.shape)
    # exclude small neighborhood
    rx = max(3, min(H // 40, 10))
    ry = max(3, min(W // 40, 10))
    y0, x0 = peak_idx
    mask = np.ones_like(corr, dtype=bool)
    y1 = max(0, y0 - rx); y2 = min(H, y0 + rx + 1)
    x1 = max(0, x0 - ry); x2 = min(W, x0 + ry + 1)
    mask[y1:y2, x1:x2] = False
    outside = corr[mask]
    outside_energy = (outside**2).mean() if outside.size > 0 else EPS
    pce = (peak**2) / (outside_energy + EPS)
    return float(pce)

def load_fingerprints(devices):
    fps = {}
    for d in devices:
        p = os.path.join(FINGERPRINT_DIR, f"{d}_video_fingerprint.npy")
        if not os.path.exists(p):
            print(f"[!] Fingerprint missing for {d}: {p}")
            fps[d] = None
        else:
            fps[d] = np.load(p).astype(np.float32)
            if fps[d].shape != (TARGET_HEIGHT, TARGET_WIDTH):
                print(f"[!] Warning: {d} fingerprint shape {fps[d].shape} != expected {(TARGET_HEIGHT, TARGET_WIDTH)}")
    return fps

def hybrid_scores(query_prnu, fingerprints):
    pce_vals = {}
    ncc_vals = {}
    for d, fp in fingerprints.items():
        if fp is None:
            pce_vals[d] = -1.0
            ncc_vals[d] = -1.0
            continue
        pce_vals[d] = compute_pce(query_prnu, fp)
        ncc_vals[d] = ncc_score(query_prnu, fp)
    # normalize
    pce_arr = np.array([max(0.0, pce_vals[d]) for d in fingerprints.keys()], dtype=np.float32)
    ncc_arr = np.array([ncc_vals[d] for d in fingerprints.keys()], dtype=np.float32)
    pce_min, pce_max = float(pce_arr.min()), float(pce_arr.max())
    denom_pce = (pce_max - pce_min) if (pce_max - pce_min) > EPS else 1.0
    ncc_min, ncc_max = float(ncc_arr.min()), float(ncc_arr.max())
    denom_ncc = (ncc_max - ncc_min) if (ncc_max - ncc_min) > EPS else 1.0
    pce_norm = {d: float((max(0.0, pce_vals[d]) - pce_min) / denom_pce) for d in fingerprints.keys()}
    ncc_norm = {d: float((ncc_vals[d] - ncc_min) / denom_ncc) for d in fingerprints.keys()}
    hybrid = {d: ALPHA * pce_norm[d] + (1.0 - ALPHA) * ncc_norm[d] for d in fingerprints.keys()}
    return hybrid, pce_vals, ncc_vals

def main():
    fingerprints = load_fingerprints(DEVICES)
    results = {d: {"correct": 0, "total": 0} for d in DEVICES}

    for dev in DEVICES:
        query_folder = os.path.join(DATA_DIR, dev, "videos", "query_set")
        qpaths = sorted(glob.glob(os.path.join(query_folder, "*.mp4")) +
                        glob.glob(os.path.join(query_folder, "*.mov")) +
                        glob.glob(os.path.join(query_folder, "*.avi")))
        print(f"\n--- Checking videos for {dev}: {len(qpaths)} files found ---")
        for q in qpaths:
            query_prnu = extract_query_prnu(q)
            if query_prnu is None:
                print(f"[!] Could not extract PRNU for {q}")
                continue
            hybrid, pce_vals, ncc_vals = hybrid_scores(query_prnu, fingerprints)
            predicted = max(hybrid, key=hybrid.get)
            results[dev]["total"] += 1
            if predicted == dev:
                results[dev]["correct"] += 1
            # print concise info
            top_k = sorted(hybrid.items(), key=lambda x: x[1], reverse=True)[:2]
            print(f"{os.path.basename(q)} -> Pred: {predicted} (hybrid={hybrid[predicted]:.3f}) | top2: {top_k}")

    # Summary
    print("\n======== FINAL RESULTS ========")
    total_correct = 0
    total_videos = 0
    for d, vals in results.items():
        c = vals["correct"]; t = vals["total"]
        total_correct += c; total_videos += t
        print(f"{d} has {t} videos: correct categorizations = {c}/{t}")
    print(f"\nOverall Accuracy: {total_correct}/{total_videos}")

if __name__ == "__main__":
    main()



--- Checking videos for iphone15: 3 files found ---
IMG_5454.mp4 -> Pred: iphone15 (hybrid=0.897) | top2: [('iphone15', 0.8970188550468834), ('OnePlus Nord CE4', 0.7)]
IMG_5455.mp4 -> Pred: iphone15 (hybrid=1.000) | top2: [('iphone15', 1.0), ('OnePlus Nord CE4', 0.2976830026209783)]
IMG_5456 copy.mp4 -> Pred: iphone15 (hybrid=1.000) | top2: [('iphone15', 1.0), ('OnePlus Nord CE4', 0.0038381056888906733)]

--- Checking videos for OnePlus Nord CE4: 4 files found ---
VID20251030112624.mp4 -> Pred: OnePlus Nord CE4 (hybrid=0.818) | top2: [('OnePlus Nord CE4', 0.8178160013724185), ('Samsung S23 5g', 0.7264715105406965)]
VID20251030112641.mp4 -> Pred: OnePlus Nord CE4 (hybrid=0.943) | top2: [('OnePlus Nord CE4', 0.9430856511899364), ('Samsung S23 5g', 0.7)]
VID20251030112807.mp4 -> Pred: iphone15 (hybrid=0.739) | top2: [('iphone15', 0.7386631117895808), ('Samsung S23 5g', 0.7)]
VID20251030112925.mp4 -> Pred: OnePlus Nord CE4 (hybrid=0.700) | top2: [('OnePlus Nord CE4', 0.7), ('Samsung S23 5

In [8]:
import os, glob
import numpy as np
import cv2
from PIL import Image
from bm3d import bm3d
from tqdm import tqdm

# ------- CONFIG -------
DATA_DIR = r"C:\Users\Khushi\prnu-camera-source-detection\data"
FINGERPRINT_DIR = os.path.join(os.path.dirname(DATA_DIR), "fingerprints")
DEVICES = ['iphone15', 'OnePlus Nord CE4', 'Samsung S21 FE', 'Samsung S23 5g']
TARGET_WIDTH = 1024
TARGET_HEIGHT = 768
FRAMES_PER_VIDEO_QUERY = 6

# Adaptive BM3D bounds (in image-value units where images are in [0,1])
MIN_SIGMA = 1.0 / 255.0    # mild floor
MAX_SIGMA = 3.0 / 255.0    # avoid over-denoising
ALPHA = 0.7   # weight for PCE in hybrid score
EPS = 1e-12
# ----------------------

def resize_with_padding(img_np, target_w, target_h):
    img = Image.fromarray(img_np)
    img.thumbnail((target_w, target_h), Image.LANCZOS)
    new_img = Image.new("RGB", (target_w, target_h))
    new_img.paste(img, ((target_w - img.width) // 2, (target_h - img.height) // 2))
    return np.array(new_img, dtype=np.uint8)

def estimate_frame_noise_sigma(imgf):
    """
    Robust noise estimate for a single grayscale image in [0,1].
    Uses MAD on a small high-pass (image - gaussian_blur) map.
    Returns sigma estimate (float).
    """
    # small Gaussian blur to remove high-frequency noise, keep low-freq content
    blur = cv2.GaussianBlur(imgf.astype(np.float32), (3, 3), 0)
    high = imgf - blur
    mad = np.median(np.abs(high))
    # Convert MAD to sigma estimate (for Gaussian noise)
    sigma_est = mad / 0.6745
    # clamp to realistic bounds
    sigma_est = float(max(MIN_SIGMA, min(MAX_SIGMA, sigma_est)))
    return sigma_est

def extract_query_prnu(video_path):
    """
    Extract query PRNU using adaptive BM3D sigma:
    1) sample FRAMES_PER_VIDEO_QUERY frames
    2) estimate sigma per frame via estimate_frame_noise_sigma
    3) use median sigma across frames for BM3D denoising for that video
    4) residuals = img - denoised; return mean residual
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        cap.release()
        return None
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
    if total == 0:
        cap.release()
        return None

    indices = np.linspace(0, total - 1, FRAMES_PER_VIDEO_QUERY, dtype=int)
    frames = []
    sigmas = []

    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
        ret, frame = cap.read()
        if not ret:
            continue
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        resized = resize_with_padding(rgb, TARGET_WIDTH, TARGET_HEIGHT)
        ycc = cv2.cvtColor(resized, cv2.COLOR_RGB2YCrCb)
        y = ycc[:, :, 0].astype(np.float32) / 255.0  # normalized [0,1]
        frames.append(y)
        sigmas.append(estimate_frame_noise_sigma(y))

    cap.release()

    if len(frames) == 0:
        return None

    # Use median sigma for stability
    median_sigma = float(np.median(sigmas))
    # final safety clamps (redundant but explicit)
    median_sigma = max(MIN_SIGMA, min(MAX_SIGMA, median_sigma))

    print(f"[INFO] {os.path.basename(video_path)} -> median estimated sigma = {median_sigma:.6f}")

    residuals = []
    for y in frames:
        try:
            den = bm3d(y, median_sigma, stage_arg=bm3d.STAGE_ALL)
        except Exception:
            den = bm3d(y, median_sigma)
        resid = y - den
        residuals.append(resid.astype(np.float32))

    prnu = np.mean(residuals, axis=0).astype(np.float32)
    return prnu

def ncc_score(a, b):
    a_flat = a.ravel()
    b_flat = b.ravel()
    a_z = a_flat - a_flat.mean()
    b_z = b_flat - b_flat.mean()
    denom = np.sqrt((a_z**2).sum() * (b_z**2).sum()) + EPS
    return float((a_z * b_z).sum() / denom)

def compute_pce(a, b):
    # For speed
    H, W = a.shape
    fa = a - a.mean()
    fb = b - b.mean()
    FA = np.fft.fft2(fa)
    FB = np.fft.fft2(fb)
    corr = np.fft.ifft2(FA * np.conj(FB)).real
    corr = np.fft.fftshift(corr)
    peak = corr.max()
    peak_idx = np.unravel_index(np.argmax(corr), corr.shape)
    # exclude small neighborhood
    rx = max(3, min(H // 40, 10))
    ry = max(3, min(W // 40, 10))
    y0, x0 = peak_idx
    mask = np.ones_like(corr, dtype=bool)
    y1 = max(0, y0 - rx); y2 = min(H, y0 + rx + 1)
    x1 = max(0, x0 - ry); x2 = min(W, x0 + ry + 1)
    mask[y1:y2, x1:x2] = False
    outside = corr[mask]
    outside_energy = (outside**2).mean() if outside.size > 0 else EPS
    pce = (peak**2) / (outside_energy + EPS)
    return float(pce)

def load_fingerprints(devices):
    fps = {}
    for d in devices:
        p = os.path.join(FINGERPRINT_DIR, f"{d}_video_fingerprint.npy")
        if not os.path.exists(p):
            print(f"[!] Fingerprint missing for {d}: {p}")
            fps[d] = None
        else:
            fps[d] = np.load(p).astype(np.float32)
            if fps[d].shape != (TARGET_HEIGHT, TARGET_WIDTH):
                print(f"[!] Warning: {d} fingerprint shape {fps[d].shape} != expected {(TARGET_HEIGHT, TARGET_WIDTH)}")
    return fps

def hybrid_scores(query_prnu, fingerprints):
    pce_vals = {}
    ncc_vals = {}
    for d, fp in fingerprints.items():
        if fp is None:
            pce_vals[d] = -1.0
            ncc_vals[d] = -1.0
            continue
        pce_vals[d] = compute_pce(query_prnu, fp)
        ncc_vals[d] = ncc_score(query_prnu, fp)
    # normalize
    pce_arr = np.array([max(0.0, pce_vals[d]) for d in fingerprints.keys()], dtype=np.float32)
    ncc_arr = np.array([ncc_vals[d] for d in fingerprints.keys()], dtype=np.float32)
    pce_min, pce_max = float(pce_arr.min()), float(pce_arr.max())
    denom_pce = (pce_max - pce_min) if (pce_max - pce_min) > EPS else 1.0
    ncc_min, ncc_max = float(ncc_arr.min()), float(ncc_arr.max())
    denom_ncc = (ncc_max - ncc_min) if (ncc_max - ncc_min) > EPS else 1.0
    pce_norm = {d: float((max(0.0, pce_vals[d]) - pce_min) / denom_pce) for d in fingerprints.keys()}
    ncc_norm = {d: float((ncc_vals[d] - ncc_min) / denom_ncc) for d in fingerprints.keys()}
    hybrid = {d: ALPHA * pce_norm[d] + (1.0 - ALPHA) * ncc_norm[d] for d in fingerprints.keys()}
    return hybrid, pce_vals, ncc_vals

def main():
    fingerprints = load_fingerprints(DEVICES)
    results = {d: {"correct": 0, "total": 0} for d in DEVICES}

    for dev in DEVICES:
        query_folder = os.path.join(DATA_DIR, dev, "videos", "query_set")
        qpaths = sorted(glob.glob(os.path.join(query_folder, "*.mp4")) +
                        glob.glob(os.path.join(query_folder, "*.mov")) +
                        glob.glob(os.path.join(query_folder, "*.avi")))
        print(f"\n--- Checking videos for {dev}: {len(qpaths)} files found ---")
        for q in qpaths:
            query_prnu = extract_query_prnu(q)
            if query_prnu is None:
                print(f"[!] Could not extract PRNU for {q}")
                continue
            hybrid, pce_vals, ncc_vals = hybrid_scores(query_prnu, fingerprints)
            predicted = max(hybrid, key=hybrid.get)
            results[dev]["total"] += 1
            if predicted == dev:
                results[dev]["correct"] += 1
            # print concise info
            top_k = sorted(hybrid.items(), key=lambda x: x[1], reverse=True)[:2]
            print(f"{os.path.basename(q)} -> Pred: {predicted} (hybrid={hybrid[predicted]:.3f}) | top2: {top_k}")

    # Summary
    print("\n======== FINAL RESULTS ========")
    total_correct = 0
    total_videos = 0
    for d, vals in results.items():
        c = vals["correct"]; t = vals["total"]
        total_correct += c; total_videos += t
        print(f"{d} has {t} videos: correct categorizations = {c}/{t}")
    print(f"\nOverall Accuracy: {total_correct}/{total_videos}")

if __name__ == "__main__":
    main()



--- Checking videos for iphone15: 3 files found ---
[INFO] IMG_5454.mp4 -> median estimated sigma = 0.003922
IMG_5454.mp4 -> Pred: iphone15 (hybrid=1.000) | top2: [('iphone15', 1.0), ('Samsung S23 5g', 0.6997091065168597)]
[INFO] IMG_5455.mp4 -> median estimated sigma = 0.003922
IMG_5455.mp4 -> Pred: iphone15 (hybrid=1.000) | top2: [('iphone15', 1.0), ('OnePlus Nord CE4', 0.3791451073468297)]
[INFO] IMG_5456 copy.mp4 -> median estimated sigma = 0.003922
IMG_5456 copy.mp4 -> Pred: iphone15 (hybrid=1.000) | top2: [('iphone15', 1.0), ('Samsung S21 FE', 0.005413854386907948)]

--- Checking videos for OnePlus Nord CE4: 4 files found ---
[INFO] VID20251030112624.mp4 -> median estimated sigma = 0.003922
VID20251030112624.mp4 -> Pred: Samsung S23 5g (hybrid=0.762) | top2: [('Samsung S23 5g', 0.7615750160454334), ('OnePlus Nord CE4', 0.6964218053709379)]
[INFO] VID20251030112641.mp4 -> median estimated sigma = 0.003922
VID20251030112641.mp4 -> Pred: Samsung S23 5g (hybrid=0.903) | top2: [('Sam

In [6]:
"""
match_query_videos_hybrid.py
- Loads fingerprints from fingerprints/{device}_video_fingerprint.npy
- Extracts PRNU from each query video using SAME BM3D+Y extraction (resizing/padding to 512x384)
- Computes PCE and NCC between query residual and each device fingerprint
- Uses hybrid score: 0.7*PCE_norm + 0.3*NCC_norm to pick predicted device
- Prints results in your requested format.
"""

import os, glob
import numpy as np
import cv2
from PIL import Image
from bm3d import bm3d
from tqdm import tqdm

# -------- CONFIG (match generate script) --------
DATA_DIR = r"C:\Users\Khushi\prnu-camera-source-detection\data"
FINGERPRINT_DIR = os.path.join(os.path.dirname(DATA_DIR), "fingerprints")
DEVICES = ['iphone15', 'OnePlus Nord CE4', 'Samsung S21 FE', 'Samsung S23 5g']
TARGET_WIDTH = 1024
TARGET_HEIGHT = 768
FRAMES_PER_VIDEO = 50
BATCH_SIZE = 15
BM3D_SIGMA = 2.5 / 255.0
ALPHA = 0.7  # weight for PCE in hybrid score
EPS = 1e-12
# ------------------------------------------------

def resize_with_padding(img_np, target_w, target_h):
    img = Image.fromarray(img_np)
    img.thumbnail((target_w, target_h), Image.LANCZOS)
    new_img = Image.new("RGB", (target_w, target_h))
    new_img.paste(img, ((target_w - img.width) // 2, (target_h - img.height) // 2))
    return np.array(new_img, dtype=np.uint8)

def extract_query_prnu(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        cap.release()
        return None
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
    if total == 0:
        cap.release()
        return None
    indices = np.linspace(0, total-1, FRAMES_PER_VIDEO, dtype=int)
    residuals = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
        ret, frame = cap.read()
        if not ret:
            continue
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        resized = resize_with_padding(rgb, TARGET_WIDTH, TARGET_HEIGHT)
        ycc = cv2.cvtColor(resized, cv2.COLOR_RGB2YCrCb)
        y = ycc[:, :, 0].astype(np.float32) / 255.0
        try:
            den = bm3d(y, BM3D_SIGMA, stage_arg=bm3d.STAGE_ALL)
        except Exception:
            den = bm3d(y, BM3D_SIGMA)
        resid = y - den
        residuals.append(resid)
    cap.release()
    if not residuals:
        return None
    prnu = np.mean(residuals, axis=0).astype(np.float32)
    return prnu

def ncc_score(a, b):
    # a,b floats same shape
    a_flat = a.ravel()
    b_flat = b.ravel()
    a_z = a_flat - a_flat.mean()
    b_z = b_flat - b_flat.mean()
    denom = np.sqrt((a_z**2).sum() * (b_z**2).sum()) + EPS
    return float((a_z * b_z).sum() / denom)

def compute_pce(a, b):
    """
    Approximate PCE:
    - compute cross-correlation via FFT
    - find peak value
    - compute energy of correlation excluding small neighborhood around peak
    - return peak^2 / mean(outside_energy)
    """
    # both a and b are floats (residuals), same shape
    H, W = a.shape
    # zero-mean
    fa = a - a.mean()
    fb = b - b.mean()
    # compute FFTs
    FA = np.fft.fft2(fa)
    FB = np.fft.fft2(fb)
    corr = np.fft.ifft2(FA * np.conj(FB))
    corr = np.fft.fftshift(corr.real)
    # find peak
    peak = corr.max()
    # exclude small window around peak
    peak_idx = np.unravel_index(np.argmax(corr), corr.shape)
    # define exclusion radius
    rx = max(3, min(H//40, 10))
    ry = max(3, min(W//40, 10))
    y0, x0 = peak_idx
    mask = np.ones_like(corr, dtype=bool)
    y1 = max(0, y0 - rx); y2 = min(H, y0 + rx + 1)
    x1 = max(0, x0 - ry); x2 = min(W, x0 + ry + 1)
    mask[y1:y2, x1:x2] = False
    outside = corr[mask]
    # energy measures
    outside_energy = (outside**2).mean() if outside.size > 0 else EPS
    pce = (peak**2) / (outside_energy + EPS)
    return float(pce)

def load_fingerprints(devices):
    fps = {}
    for d in devices:
        p = os.path.join(FINGERPRINT_DIR, f"{d}_video_fingerprint.npy")
        if not os.path.exists(p):
            print(f"[!] Fingerprint missing for {d}: {p}")
            fps[d] = None
        else:
            fps[d] = np.load(p).astype(np.float32)
            # ensure shape matches target
            if fps[d].shape != (TARGET_HEIGHT, TARGET_WIDTH):
                print(f"[!] Warning: fingerprint shape {fps[d].shape} for {d} doesn't match expected {(TARGET_HEIGHT, TARGET_WIDTH)}")
    return fps

def hybrid_scores_for_query(query_prnu, fingerprints):
    # compute raw PCE and NCC for each device
    pce_vals = {}
    ncc_vals = {}
    for d, fp in fingerprints.items():
        if fp is None:
            pce_vals[d] = -1.0
            ncc_vals[d] = -1.0
            continue
        pce_vals[d] = compute_pce(query_prnu, fp)
        ncc_vals[d] = ncc_score(query_prnu, fp)
    # normalize PCE (non-negative) and NCC across devices to [0,1]
    pce_arr = np.array([max(0.0, pce_vals[d]) for d in fingerprints.keys()], dtype=np.float32)
    ncc_arr = np.array([ncc_vals[d] for d in fingerprints.keys()], dtype=np.float32)
    # PCE normalization
    pce_min, pce_max = float(pce_arr.min()), float(pce_arr.max())
    denom_pce = (pce_max - pce_min) if (pce_max - pce_min) > EPS else 1.0
    pce_norm = {d: float((max(0.0, pce_vals[d]) - pce_min) / denom_pce) for d in fingerprints.keys()}
    # NCC normalization (map from observed range to [0,1])
    ncc_min, ncc_max = float(ncc_arr.min()), float(ncc_arr.max())
    denom_ncc = (ncc_max - ncc_min) if (ncc_max - ncc_min) > EPS else 1.0
    ncc_norm = {d: float((ncc_vals[d] - ncc_min) / denom_ncc) for d in fingerprints.keys()}
    # hybrid score
    hybrid = {d: ALPHA * pce_norm[d] + (1.0 - ALPHA) * ncc_norm[d] for d in fingerprints.keys()}
    return hybrid, pce_vals, ncc_vals

def main():
    fingerprints = load_fingerprints(DEVICES)
    results = {d: {"correct": 0, "total": 0} for d in DEVICES}

    for dev in DEVICES:
        query_folder = os.path.join(DATA_DIR, dev, "videos", "query_set")
        qpaths = sorted(glob.glob(os.path.join(query_folder, "*.mp4")) +
                        glob.glob(os.path.join(query_folder, "*.mov")) +
                        glob.glob(os.path.join(query_folder, "*.avi")))
        print(f"\n--- Checking videos for {dev}: found {len(qpaths)} files ---")
        for q in qpaths:
            query_prnu = extract_query_prnu(q)
            if query_prnu is None:
                print(f"Could not extract PRNU for {q}")
                continue
            hybrid, pce_vals, ncc_vals = hybrid_scores_for_query(query_prnu, fingerprints)
            predicted = max(hybrid, key=hybrid.get)
            results[dev]["total"] += 1
            if predicted == dev:
                results[dev]["correct"] += 1
            # Print short per-video line with PCE and NCC for top-2 (helpful)
            sorted_by_score = sorted(hybrid.items(), key=lambda x: x[1], reverse=True)
            top = sorted_by_score[0][0]
            second = sorted_by_score[1][0] if len(sorted_by_score) > 1 else None
            print(f"{os.path.basename(q)} -> Pred: {predicted} (top PCE:{pce_vals[predicted]:.2f}, NCC:{ncc_vals[predicted]:.4f}) ; 2nd:{second} PCE:{pce_vals.get(second,0):.2f}")

    # Summary output in requested format
    print("\n======== FINAL RESULTS ========")
    total_correct = 0
    total_videos = 0
    for d, vals in results.items():
        c = vals["correct"]; t = vals["total"]
        total_correct += c; total_videos += t
        print(f"{d} has {t} videos: correct categorizations = {c}/{t}")
    print(f"\nOverall Accuracy: {total_correct}/{total_videos}")

if __name__ == "__main__":
    main()



--- Checking videos for iphone15: found 3 files ---
IMG_5454.mp4 -> Pred: iphone15 (top PCE:96.71, NCC:0.0102) ; 2nd:OnePlus Nord CE4 PCE:46.35
IMG_5455.mp4 -> Pred: iphone15 (top PCE:56.53, NCC:0.0096) ; 2nd:OnePlus Nord CE4 PCE:46.67
IMG_5456 copy.mp4 -> Pred: iphone15 (top PCE:12114.03, NCC:0.1941) ; 2nd:Samsung S23 5g PCE:45.87

--- Checking videos for OnePlus Nord CE4: found 4 files ---
VID20251030112624.mp4 -> Pred: OnePlus Nord CE4 (top PCE:57.34, NCC:0.0010) ; 2nd:Samsung S23 5g PCE:51.87
VID20251030112641.mp4 -> Pred: OnePlus Nord CE4 (top PCE:51.89, NCC:0.0055) ; 2nd:Samsung S23 5g PCE:48.20
VID20251030112807.mp4 -> Pred: OnePlus Nord CE4 (top PCE:45.16, NCC:0.0015) ; 2nd:Samsung S23 5g PCE:46.75
VID20251030112925.mp4 -> Pred: OnePlus Nord CE4 (top PCE:46.39, NCC:0.0003) ; 2nd:Samsung S23 5g PCE:47.67

--- Checking videos for Samsung S21 FE: found 2 files ---
20251017_194729.mp4 -> Pred: OnePlus Nord CE4 (top PCE:50.48, NCC:-0.0005) ; 2nd:iphone15 PCE:47.25
20251017_202243.m

KeyboardInterrupt: 