In [None]:
# Cell 1 — Imports
import os, time, math, glob, random
from pathlib import Path
import cv2
import numpy as np
import pandas as pd

from ultralytics import YOLO


In [None]:
# Cell 2 — Config
DATASET_YAML = r"S:\IntelliJ\Projects\ES_Drone_Detection\datasets\drone_mixed.yaml"   # adjust if your yaml has different name
SPLIT = "test"  # "val" or "test"
IMGSZ = (640, 384)  # (w, h) to match your common 640x384 inference
CONF = 0.25
IOU_NMS = 0.7
MAX_IMAGES = 3000   # set None to run full split; start with 500-3000 for iteration

# Your trained model path (edit to your actual best.pt)
MODEL_WEIGHTS = r"S:\IntelliJ\Projects\ES_Drone_Detection\runs\detect\yolo11\drone_finetune_full_mixed4\weights\best.pt"

# If your model was initialized from drone weights but you only have 1 class, keep it as-is.
# If you have multiple classes (e.g., drone/bird), the evaluator below supports multiple labels too.


In [None]:
# Cell 3 — Helpers: read YOLO yaml + resolve split paths
import yaml

def load_data_yaml(yaml_path):
    with open(yaml_path, "r", encoding="utf-8") as f:
        y = yaml.safe_load(f)
    return y

def resolve_split_images(data_yaml, split):
    # Ultralytics yaml may store train/val/test as relative paths
    base = Path(DATASET_YAML).parent
    split_key = split
    if split_key not in data_yaml:
        raise ValueError(f"Split '{split}' not found in data.yaml keys: {list(data_yaml.keys())}")
    split_path = data_yaml[split_key]
    split_path = (base / split_path).resolve() if not os.path.isabs(split_path) else Path(split_path)

    # Common YOLO layouts: images under split_path, or split_path itself is a txt list
    if split_path.suffix.lower() == ".txt":
        with open(split_path, "r", encoding="utf-8") as f:
            imgs = [line.strip() for line in f if line.strip()]
        return imgs
    else:
        exts = ("*.jpg", "*.jpeg", "*.png", "*.bmp", "*.webp")
        imgs = []
        for e in exts:
            imgs.extend(glob.glob(str(split_path / "**" / e), recursive=True))
        imgs = sorted(imgs)
        return imgs

data_yaml = load_data_yaml(DATASET_YAML)
img_paths = resolve_split_images(data_yaml, SPLIT)

if MAX_IMAGES is not None:
    img_paths = img_paths[:MAX_IMAGES]

len(img_paths), img_paths[0]


In [None]:
# Cell 4 — Label IO (YOLO txt)
def image_to_label_path(img_path):
    # standard: .../images/... -> .../labels/... and .jpg -> .txt
    p = Path(img_path)
    parts = list(p.parts)
    # Replace 'images' folder with 'labels' if present
    if "images" in parts:
        parts[parts.index("images")] = "labels"
    label_path = Path(*parts).with_suffix(".txt")
    return str(label_path)

def load_yolo_labels(label_path, img_w, img_h):
    # returns list of (cls, x1, y1, x2, y2) in pixel coords
    if not os.path.exists(label_path):
        return []
    out = []
    with open(label_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            cls, xc, yc, w, h = line.split()
            cls = int(float(cls))
            xc, yc, w, h = map(float, (xc, yc, w, h))
            x1 = (xc - w/2) * img_w
            y1 = (yc - h/2) * img_h
            x2 = (xc + w/2) * img_w
            y2 = (yc + h/2) * img_h
            out.append((cls, x1, y1, x2, y2))
    return out

def box_iou(a, b):
    # a,b: (x1,y1,x2,y2)
    ax1, ay1, ax2, ay2 = a
    bx1, by1, bx2, by2 = b
    inter_x1 = max(ax1, bx1)
    inter_y1 = max(ay1, by1)
    inter_x2 = min(ax2, bx2)
    inter_y2 = min(ay2, by2)
    iw = max(0.0, inter_x2 - inter_x1)
    ih = max(0.0, inter_y2 - inter_y1)
    inter = iw * ih
    area_a = max(0.0, ax2-ax1) * max(0.0, ay2-ay1)
    area_b = max(0.0, bx2-bx1) * max(0.0, by2-by1)
    union = area_a + area_b - inter + 1e-9
    return inter / union


In [None]:
# Cell 5 — Preprocessing functions (moving-camera-safe)
def to_ycrcb_clahe(bgr, clip_limit=2.0, tile_grid=(8,8)):
    ycrcb = cv2.cvtColor(bgr, cv2.COLOR_BGR2YCrCb)
    y, cr, cb = cv2.split(ycrcb)
    clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid)
    y2 = clahe.apply(y)
    out = cv2.merge([y2, cr, cb])
    return cv2.cvtColor(out, cv2.COLOR_YCrCb2BGR)

def gray_world_wb(bgr):
    # Simple gray-world white balance
    b, g, r = cv2.split(bgr.astype(np.float32))
    mb, mg, mr = b.mean(), g.mean(), r.mean()
    m = (mb + mg + mr) / 3.0
    b *= (m / (mb + 1e-6))
    g *= (m / (mg + 1e-6))
    r *= (m / (mr + 1e-6))
    out = cv2.merge([b, g, r])
    return np.clip(out, 0, 255).astype(np.uint8)

def unsharp_mask(bgr, amount=0.6, blur_ksize=3):
    # Mild sharpening
    blur = cv2.GaussianBlur(bgr, (blur_ksize, blur_ksize), 0)
    sharp = cv2.addWeighted(bgr, 1.0 + amount, blur, -amount, 0)
    return sharp

def light_bilateral(bgr, d=5, sigma_color=40, sigma_space=40):
    # Edge-preserving denoise; keep small for speed
    return cv2.bilateralFilter(bgr, d=d, sigmaColor=sigma_color, sigmaSpace=sigma_space)

def preprocess_pipeline(name, bgr):
    # Compose a few curated pipelines
    if name == "baseline":
        return bgr
    if name == "clahe_y":
        return to_ycrcb_clahe(bgr, clip_limit=2.0, tile_grid=(8,8))
    if name == "clahe_y + unsharp":
        x = to_ycrcb_clahe(bgr, clip_limit=2.0, tile_grid=(8,8))
        return unsharp_mask(x, amount=0.5, blur_ksize=3)
    if name == "wb + clahe_y":
        x = gray_world_wb(bgr)
        return to_ycrcb_clahe(x, clip_limit=2.0, tile_grid=(8,8))
    if name == "clahe_y + bilateral + unsharp":
        x = to_ycrcb_clahe(bgr, clip_limit=2.0, tile_grid=(8,8))
        x = light_bilateral(x, d=5, sigma_color=35, sigma_space=35)
        return unsharp_mask(x, amount=0.4, blur_ksize=3)
    raise ValueError(f"Unknown pipeline: {name}")

PIPELINES = [
    "baseline",
    "clahe_y",
    "clahe_y + unsharp",
    "wb + clahe_y",
    "clahe_y + bilateral + unsharp",
]


In [None]:
# Cell 6 — Model load
model = YOLO(MODEL_WEIGHTS)


In [None]:
# Cell 7 — Inference + evaluation loop
def match_detections_to_gt(pred_boxes, gt_boxes, iou_thr=0.5):
    """
    pred_boxes: list of (cls, conf, x1,y1,x2,y2)
    gt_boxes:   list of (cls, x1,y1,x2,y2)
    Greedy matching by highest IoU per prediction.
    """
    gt_used = [False]*len(gt_boxes)
    tp = 0
    fp = 0
    ious = []

    # Sort preds by confidence desc
    pred_boxes = sorted(pred_boxes, key=lambda x: x[1], reverse=True)

    for p in pred_boxes:
        pcls, conf, px1, py1, px2, py2 = p
        best_iou = 0
        best_j = -1
        for j, g in enumerate(gt_boxes):
            if gt_used[j]:
                continue
            gcls, gx1, gy1, gx2, gy2 = g
            if pcls != gcls:
                continue
            iou = box_iou((px1,py1,px2,py2), (gx1,gy1,gx2,gy2))
            if iou > best_iou:
                best_iou = iou
                best_j = j
        if best_iou >= iou_thr and best_j >= 0:
            gt_used[best_j] = True
            tp += 1
            ious.append(best_iou)
        else:
            fp += 1

    fn = sum(1 for u in gt_used if not u)
    return tp, fp, fn, ious

def run_experiment(pipeline_name):
    t0 = time.perf_counter()
    total_tp = total_fp = total_fn = 0
    all_ious = []

    # FPS timing includes: read + preprocess + inference + post
    for img_path in img_paths:
        bgr = cv2.imread(img_path, cv2.IMREAD_COLOR)
        if bgr is None:
            continue

        # preprocess
        bgr2 = preprocess_pipeline(pipeline_name, bgr)

        # YOLO inference (Ultralytics accepts numpy BGR)
        results = model.predict(
            source=bgr2,
            imgsz=list(IMGSZ)[::-1],  # Ultralytics expects (h,w) sometimes; keep consistent
            conf=CONF,
            iou=IOU_NMS,
            verbose=False,
            device=0  # GPU if available
        )

        # Extract predictions
        r0 = results[0]
        pred_boxes = []
        if r0.boxes is not None and len(r0.boxes) > 0:
            xyxy = r0.boxes.xyxy.cpu().numpy()
            confs = r0.boxes.conf.cpu().numpy()
            clss  = r0.boxes.cls.cpu().numpy().astype(int)
            for (x1,y1,x2,y2), c, k in zip(xyxy, confs, clss):
                pred_boxes.append((int(k), float(c), float(x1), float(y1), float(x2), float(y2)))

        # Ground truth
        h, w = bgr.shape[:2]
        gt = load_yolo_labels(image_to_label_path(img_path), w, h)

        # Match @ IoU 0.5
        tp, fp, fn, ious = match_detections_to_gt(pred_boxes, gt, iou_thr=0.5)
        total_tp += tp
        total_fp += fp
        total_fn += fn
        all_ious.extend(ious)

    t1 = time.perf_counter()
    secs = max(1e-9, t1 - t0)
    n = len(img_paths)
    fps = n / secs

    precision = total_tp / (total_tp + total_fp + 1e-9)
    recall    = total_tp / (total_tp + total_fn + 1e-9)
    f1        = 2*precision*recall / (precision + recall + 1e-9)
    miou      = float(np.mean(all_ious)) if all_ious else 0.0

    return {
        "pipeline": pipeline_name,
        "images": n,
        "tp": total_tp,
        "fp": total_fp,
        "fn": total_fn,
        "precision@0.5": precision,
        "recall@0.5": recall,
        "f1@0.5": f1,
        "mean_iou(tp)": miou,
        "fps_e2e": fps,
    }

rows = []
for p in PIPELINES:
    print("Running:", p)
    rows.append(run_experiment(p))

df = pd.DataFrame(rows).sort_values(by=["f1@0.5", "fps_e2e"], ascending=False)
df


In [None]:
# Cell 8 — Quick qualitative visualization (side-by-side)
def draw_boxes(img, boxes, color=(0,255,0), label_prefix="P"):
    out = img.copy()
    for b in boxes:
        if len(b) == 6:  # pred (cls, conf, x1,y1,x2,y2)
            cls, conf, x1,y1,x2,y2 = b
            txt = f"{label_prefix}:{cls} {conf:.2f}"
        else:            # gt (cls, x1,y1,x2,y2)
            cls, x1,y1,x2,y2 = b
            txt = f"GT:{cls}"
        x1,y1,x2,y2 = map(int, [x1,y1,x2,y2])
        cv2.rectangle(out, (x1,y1), (x2,y2), color, 2)
        cv2.putText(out, txt, (x1, max(0,y1-5)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)
    return out

def sample_and_show(pipeline_name, k=6, seed=0):
    random.seed(seed)
    picks = random.sample(img_paths, k=min(k, len(img_paths)))

    for img_path in picks:
        bgr = cv2.imread(img_path)
        bgr2 = preprocess_pipeline(pipeline_name, bgr)

        res = model.predict(source=bgr2, imgsz=list(IMGSZ)[::-1], conf=CONF, iou=IOU_NMS, verbose=False, device=0)[0]
        pred = []
        if res.boxes is not None and len(res.boxes) > 0:
            xyxy = res.boxes.xyxy.cpu().numpy()
            confs = res.boxes.conf.cpu().numpy()
            clss  = res.boxes.cls.cpu().numpy().astype(int)
            for (x1,y1,x2,y2), c, kcls in zip(xyxy, confs, clss):
                pred.append((int(kcls), float(c), float(x1), float(y1), float(x2), float(y2)))

        h, w = bgr.shape[:2]
        gt = load_yolo_labels(image_to_label_path(img_path), w, h)

        vis0 = draw_boxes(bgr, gt, color=(255,255,0), label_prefix="")   # GT
        vis1 = draw_boxes(bgr2, pred, color=(0,255,0), label_prefix="P") # Pred on preprocessed

        stacked = np.hstack([
            cv2.resize(vis0, (640, 384)),
            cv2.resize(vis1, (640, 384)),
        ])
        cv2.imshow(f"{pipeline_name} | Left: GT, Right: Pred (preprocessed)", stacked)
        cv2.waitKey(0)
    cv2.destroyAllWindows()


In [None]:
# Example:
sample_and_show("clahe_y + unsharp", k=10, seed=42)
