In [1]:
import os
import cv2
import json
import numba
import numpy as np
import pandas as pd
import scipy.optimize
from typing import List
import numpy.typing as npt
from numba import jit, njit
from sklearn.cluster import DBSCAN

In [2]:
''' RLE-Encode '''
@numba.jit(nopython=True)
def _rle_encode_jit(x: npt.NDArray, fg_val: int = 1) -> list[int]:
    dots = np.where(x.T.flatten() == fg_val)[0]
    run_lengths = []
    prev = -2
    for b in dots:
        if b > prev + 1:
            run_lengths.extend((b + 1, 0))
        run_lengths[-1] += 1
        prev = b
    return run_lengths

def rle_encode(masks, fg_val: int = 1) -> str:
    return ';'.join([json.dumps(_rle_encode_jit(x, fg_val)) for x in masks])

In [3]:
''' Configuration '''
class Config:
    PATH = "/kaggle/input/recodai-luc-scientific-image-forgery-detection"
    TEST_IMAGES  = f"{PATH}/test_images"
    AUTH_IMAGES  = f"{PATH}/train_images/authentic"
    FORGE_IMAGES  = f"{PATH}/train_images/forged"

    # SIFT 
    SIFT_FEATURES = 8000
    SIFT_CONTRAST = 0.05
    USE_CLAHE = True
    USE_BLUR = False
    USE_SHARPEN = False
    MAX_IMAGE_SIZE = 2500

    # RANSAC + DBSCAN
    MIN_MATCHES = 2
    DBSCAN_EPS = 6.0
    DBSCAN_MIN_SAMPLES = 3
    RANSAC_THRESH = 5.5
    MASK_RADIUS = 13
    MIN_INLIERS = 10

config = Config()

In [4]:
''' Helpers '''
def preprocess_image(img_array, use_clahe, blur, sharpen, noise_std):
    if len(img_array.shape) == 3:
        if img_array.shape[2] == 4:
            img_array = img_array[:, :, :3]
        gray = cv2.cvtColor(img_array, cv2.COLOR_BGR2GRAY)
    else:
        gray = img_array.copy()
    
    gray = cv2.normalize(gray, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)

    if use_clahe:
        clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8))
        gray = clahe.apply(gray)

    if blur:
        gray = cv2.GaussianBlur(gray, (3, 3), 0)

    if sharpen:
        kernel = np.array([[0,-1,0],[-1,5,-1],[0,-1,0]], dtype=np.float32)
        gray = cv2.filter2D(gray, -1, kernel)

    if noise_std > 0.0:
        noise = np.random.normal(0, noise_std, gray.shape).astype(np.float32)
        gray = np.clip(gray.astype(np.float32) + noise, 0, 255).astype(np.uint8)

    return gray


def resize_smart(image, max_size):
    h, w = image.shape[:2]
    if max(h, w) <= max_size:
        return image, 1.0
    scale = max_size / max(h, w)
    resized = cv2.resize(image, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA)
    return resized, scale


def scale_mask_back(mask, orig_shape, scale):
    if scale == 1.0:
        return mask
    h, w = orig_shape[:2]
    scaled = cv2.resize(mask.astype(np.float32), (w, h), interpolation=cv2.INTER_LINEAR)
    return (scaled > 0.5).astype(np.uint8)


def generate_mask(src_pts, dst_pts, h, w, radius):
    mask = np.zeros((h, w), dtype=np.uint8)
    for pt in np.vstack([src_pts, dst_pts]):
        x, y = int(pt[0]), int(pt[1])
        if 0 <= x < w and 0 <= y < h:
            cv2.circle(mask, (x, y), radius, 1, -1)
    return mask

In [5]:
''' SIFT '''
class SIFTDetector:
    def __init__(self, config):
        self.config = config
        self.sift = cv2.SIFT_create(
            nfeatures=config.SIFT_FEATURES,
            contrastThreshold=config.SIFT_CONTRAST,
            edgeThreshold=10
        )

    def detect_features(self, image):
        gray = preprocess_image(
            image,
            use_clahe=self.config.USE_CLAHE,
            blur=self.config.USE_BLUR,
            sharpen=self.config.USE_SHARPEN,
            noise_std=0.5
        )
        gray_resized, scale = resize_smart(gray, self.config.MAX_IMAGE_SIZE)
        kp, desc = self.sift.detectAndCompute(gray_resized, None)
        return kp, desc, gray_resized, scale

In [6]:
''' RANSAC with clustering '''
class RANSACVerifier:
    def __init__(self, config):
        self.config = config

    def verify(self, kp, desc, resized_shape):
        if desc is None or len(desc) < self.config.MIN_MATCHES:
            return []

        index_params = dict(algorithm=1, trees=6)
        search_params = dict(checks=64)
        flann = cv2.FlannBasedMatcher(index_params, search_params)
        matches = flann.knnMatch(desc, desc, k=2)

        good = []
        ratio = 0.75
        for m, n in matches:
            if m.queryIdx == m.trainIdx:
                if n.queryIdx != n.trainIdx:
                    good.append(n)
            elif m.distance < ratio * n.distance:
                good.append(m)

        if len(good) < self.config.MIN_MATCHES:
            return []

        src_pts = np.float32([kp[m.queryIdx].pt for m in good])
        dst_pts = np.float32([kp[m.trainIdx].pt for m in good])
        disp = dst_pts - src_pts

        clustering = DBSCAN(
            eps=self.config.DBSCAN_EPS,
            min_samples=max(2, self.config.DBSCAN_MIN_SAMPLES)
        )
        labels = clustering.fit_predict(disp)
        unique_labels = [l for l in set(labels) if l != -1]
        if len(unique_labels) == 0:
            return []

        h, w = resized_shape[:2]
        instances = []

        for label in unique_labels:
            cluster_src = src_pts[labels == label]
            cluster_dst = dst_pts[labels == label]

            if len(cluster_src) < self.config.MIN_MATCHES:
                continue

            M, inliers = cv2.estimateAffine2D(
                cluster_src, cluster_dst,
                method=cv2.RANSAC,
                ransacReprojThreshold=self.config.RANSAC_THRESH
            )
            if M is None:
                continue

            inliers = inliers.flatten()
            n_inliers = inliers.sum()
            if n_inliers < self.config.MIN_MATCHES:
                continue

            mask = generate_mask(
                cluster_src[inliers > 0],
                cluster_dst[inliers > 0],
                h, w,
                radius=self.config.MASK_RADIUS
            )

            instances.append({
                "mask": mask,
                "num_inliers": int(n_inliers)
            })

        return instances

In [7]:
''' Final Mask '''
def final_decision(instances, img_shape, scale):
    if len(instances) == 0:
        return "authentic", np.zeros(img_shape[:2], dtype=np.uint8)

    H, W = img_shape[:2]
    final_mask = np.zeros((H, W), dtype=np.uint8)

    inliers_arr = np.array([inst["num_inliers"] for inst in instances])
    max_inliers = inliers_arr.max()
    median_inliers = np.median(inliers_arr)

    # absolute_floor = 2 if max_inliers < 10 else 10
    absolute_floor = 200

    min_threshold = int(max(
        absolute_floor,
        0.35 * max_inliers,
        median_inliers * 1.5
    ))

    kept = [inst for inst in instances if inst["num_inliers"] >= min_threshold]

    if len(kept) < 2:
        return "authentic", np.zeros(img_shape[:2], dtype=np.uint8)

    for inst in kept:
        mask_scaled = scale_mask_back(inst["mask"], img_shape, scale)
        final_mask = np.maximum(final_mask, mask_scaled)

    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15,15))
    final_mask = cv2.dilate(final_mask, kernel, iterations=1)
    final_mask = cv2.morphologyEx(final_mask, cv2.MORPH_CLOSE, kernel)

    final_mask = (final_mask > 0).astype(np.uint8)

    return "forged", final_mask

In [8]:
''' Prediction '''
detector = SIFTDetector(config)
verifier = RANSACVerifier(config)

def predict_image(path):
    img = cv2.imread(path)
    kp, desc, gray_resized, scale = detector.detect_features(img)
    instances = verifier.verify(kp, desc, gray_resized.shape)
    decision, mask = final_decision(instances, img.shape, scale)

    if decision == "authentic":
        return "authentic"
    else:
        return rle_encode([mask])

In [9]:
dirs = config.AUTH_IMAGES,config.FORGE_IMAGES
for d in dirs:
    auth_count,forge_count = 0,0
    print("testing directory: "+d)
    path_images = sorted(
        f for f in os.listdir(d)
        if f.lower().endswith((".png", ".jpg", ".jpeg"))
    )[1000:1200]
    for fname in path_images:
        pred = predict_image(os.path.join(d, fname))
        if pred[:4] == 'auth':
            auth_count += 1
        else:
            forge_count += 1
    print(f"auth count:{auth_count}, forge count: {forge_count}")
        

testing directory: /kaggle/input/recodai-luc-scientific-image-forgery-detection/train_images/authentic
auth count:200, forge count: 0
testing directory: /kaggle/input/recodai-luc-scientific-image-forgery-detection/train_images/forged
auth count:170, forge count: 30


In [10]:
''' Generate Submission File '''
test_dir = config.TEST_IMAGES
test_images = sorted(
    f for f in os.listdir(test_dir)
    if f.lower().endswith((".png", ".jpg", ".jpeg"))
)

rows = []

for fname in test_images:
    case_id = os.path.splitext(fname)[0]
    pred = predict_image(os.path.join(test_dir, fname))
    rows.append({"case_id": case_id, "annotation": pred})

submission = pd.DataFrame(rows)

sample_path = os.path.join(config.PATH, "sample_submission.csv")
if os.path.exists(sample_path):
    ss = pd.read_csv(sample_path)
    ss["case_id"] = ss["case_id"].astype(str)
    submission["case_id"] = submission["case_id"].astype(str)
    submission = ss[["case_id"]].merge(submission, on="case_id", how="left")
    submission["annotation"] = submission["annotation"].fillna("authentic")
else:
    submission["case_id"] = submission["case_id"].astype(str)

submission.to_csv("/kaggle/working/submission.csv", index=False)
submission.head()

Unnamed: 0,case_id,annotation
0,45,authentic
