# 11 - SuperPoint + LightGlue Pair Localization Tuning


## Ziel
Dieses Notebook optimiert **nur** SuperPoint+LightGlue fuer die Paar-Lokalisierung `anchor -> val` gegen einen konstanten Map-Crop.

- Gray-basierte Filter (inkl. CLAHE, Denoise, Edge-Varianten)
- Rotation + Skalierung werden ueber Sweep beruecksichtigt
- Auto-Tuning erweitert Hyperparameter stufenweise, bis `TARGET_INLIERS` erreicht ist oder alle Runden durchlaufen sind
- Visualisierung wie in deinem Beispiel: 4-Spalten-Matchbild plus Map-Overlay mit GT vs Prediction


In [None]:
from dataclasses import dataclass
from pathlib import Path
from itertools import product
from copy import deepcopy
from time import perf_counter
from typing import Dict, List, Optional, Tuple

import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from IPython.display import display


In [None]:
# Dependencies: SuperPoint + LightGlue required
try:
    from lightglue import LightGlue, SuperPoint
    from lightglue.utils import rbd
except Exception as e:
    raise ImportError('This notebook requires LightGlue+SuperPoint. Install with: pip install lightglue') from e

# Paths
CANDIDATE_ROOTS = [Path.cwd(), Path.cwd().parent, Path('.'), Path('..')]
PROJECT_ROOT = None
_seen = set()
for cand in CANDIDATE_ROOTS:
    try:
        root = cand.resolve()
    except Exception:
        continue
    k = str(root)
    if k in _seen:
        continue
    _seen.add(k)
    if (root / 'data' / 'data').exists():
        PROJECT_ROOT = root
        break
if PROJECT_ROOT is None:
    raise FileNotFoundError('Could not find project root containing data/data.')

DATA_ROOT = PROJECT_ROOT / 'data' / 'data'
TRAIN_IMG_DIR = DATA_ROOT / 'train_data' / 'train_images'
TRAIN_POS_CSV = DATA_ROOT / 'train_data' / 'train_pos.csv'
TRAIN_CAM_CSV = DATA_ROOT / 'train_data' / 'train_cam.csv'
MAP_PATH = DATA_ROOT / 'map.png'

# Runtime
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
IMAGE_MAX_SIDE = 1280
RNG_SEED = 7

# Pair selection
PAIR_ANCHOR_ID = 13
PAIR_VAL_ID = 14
PREVIEW_PAIR_INDEX = 0

# Constant map crop around anchor GT
CONST_MAP_CROP_BASE_PX = 900
CONST_MAP_CROP_SCALE = 1.00
CONST_MAP_CROP_MIN_PX = 240
CONST_MAP_CROP_MAX_PX = 2200

# Matching goal
TARGET_INLIERS = 100
MIN_INLIERS_FOR_POSE = 8

# Tuning control
MAX_CONFIGS_PER_LEVEL = 100
AUTO_EXPAND_MAX_ROUNDS = 4

# Gray filter params
CLAHE_CLIP_LIMIT = 2.5
CLAHE_GRID = (8, 8)
EDGE_CANNY_LOW = 50
EDGE_CANNY_HIGH = 150
EDGE_DILATE_KERNEL = 2
EDGE_BLEND_GRAY = 0.75
EDGE_BLEND_EDGE = 0.25

print('project_root:', PROJECT_ROOT)
print('device:', DEVICE)


In [None]:
# Load train data and build consecutive pairs
train_pos_df = pd.read_csv(TRAIN_POS_CSV)
train_cam_df = pd.read_csv(TRAIN_CAM_CSV)
train_df = train_cam_df.merge(train_pos_df, on='id', how='inner').copy()
train_df['id'] = train_df['id'].astype(int)
train_df = train_df.sort_values('id').reset_index(drop=True)

for col in ['id', 'x_pixel', 'y_pixel']:
    if col not in train_df.columns:
        raise KeyError(f'Missing required column: {col}')

map_bgr = cv2.imread(str(MAP_PATH), cv2.IMREAD_COLOR)
if map_bgr is None:
    raise FileNotFoundError(f'Map not found: {MAP_PATH}')
map_rgb = cv2.cvtColor(map_bgr, cv2.COLOR_BGR2RGB)
MAP_H, MAP_W = map_rgb.shape[:2]

pairs = []
for i in range(len(train_df) - 1):
    r0 = train_df.iloc[i]
    r1 = train_df.iloc[i + 1]
    id0 = int(r0['id'])
    id1 = int(r1['id'])
    if id1 != id0 + 1:
        continue
    pairs.append({
        'pair_idx': int(len(pairs)),
        'anchor_id': id0,
        'val_id': id1,
        'anchor_x': float(r0['x_pixel']),
        'anchor_y': float(r0['y_pixel']),
        'val_gt_x': float(r1['x_pixel']),
        'val_gt_y': float(r1['y_pixel']),
    })

pair_df = pd.DataFrame(pairs)
if len(pair_df) == 0:
    raise RuntimeError('No consecutive pairs found in train data.')

print('consecutive pairs:', len(pair_df), '| map size:', (MAP_W, MAP_H))
display(pair_df.head(20))


In [None]:
# Utilities: image loading, crop, gray preprocessing
_IMAGE_CACHE: Dict[Tuple[int, Optional[int]], np.ndarray] = {}


def resolve_train_image_path(image_id: int) -> Path:
    stems = [f'{int(image_id):04d}', str(int(image_id))]
    exts = ['.JPG', '.jpg', '.jpeg', '.JPEG', '.png', '.PNG']
    for st in stems:
        for ext in exts:
            p = TRAIN_IMG_DIR / f'{st}{ext}'
            if p.exists():
                return p
    raise FileNotFoundError(f'Image not found for id={image_id} in {TRAIN_IMG_DIR}')


def resize_keep_aspect(img_rgb: np.ndarray, max_side: Optional[int]) -> np.ndarray:
    if max_side is None:
        return img_rgb
    h, w = img_rgb.shape[:2]
    m = max(h, w)
    if m <= int(max_side):
        return img_rgb
    s = float(max_side) / float(m)
    nw = max(32, int(round(w * s)))
    nh = max(32, int(round(h * s)))
    return cv2.resize(img_rgb, (nw, nh), interpolation=cv2.INTER_AREA)


def load_train_image_cached(image_id: int, max_side: Optional[int]) -> np.ndarray:
    key = (int(image_id), max_side)
    if key in _IMAGE_CACHE:
        return _IMAGE_CACHE[key]

    p = resolve_train_image_path(int(image_id))
    bgr = cv2.imread(str(p), cv2.IMREAD_COLOR)
    if bgr is None:
        raise RuntimeError(f'Cannot read image: {p}')
    rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    rgb = resize_keep_aspect(rgb, max_side=max_side)
    _IMAGE_CACHE[key] = rgb
    return rgb


def resize_rgb(img: np.ndarray, scale: float) -> np.ndarray:
    h, w = img.shape[:2]
    nw = max(12, int(round(w * float(scale))))
    nh = max(12, int(round(h * float(scale))))
    return cv2.resize(img, (nw, nh), interpolation=cv2.INTER_AREA)


def rotate_rgb_keep_size(img: np.ndarray, angle_deg: float) -> np.ndarray:
    h, w = img.shape[:2]
    M = cv2.getRotationMatrix2D((w / 2.0, h / 2.0), float(angle_deg), 1.0)
    return cv2.warpAffine(
        img,
        M,
        (w, h),
        flags=cv2.INTER_LINEAR,
        borderMode=cv2.BORDER_REFLECT101,
    )


def extract_crop_by_size(map_img: np.ndarray, center_xy: Tuple[float, float], crop_w: int, crop_h: int) -> Tuple[np.ndarray, int, int]:
    cw = int(max(32, crop_w))
    ch = int(max(32, crop_h))

    cx, cy = float(center_xy[0]), float(center_xy[1])
    x0 = int(round(cx - cw / 2.0))
    y0 = int(round(cy - ch / 2.0))
    x0 = int(np.clip(x0, 0, max(0, map_img.shape[1] - cw)))
    y0 = int(np.clip(y0, 0, max(0, map_img.shape[0] - ch)))

    x1 = min(map_img.shape[1], x0 + cw)
    y1 = min(map_img.shape[0], y0 + ch)
    crop = map_img[y0:y1, x0:x1]
    return crop, x0, y0


def to_gray(img_rgb: np.ndarray) -> np.ndarray:
    return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)


def _edges(g: np.ndarray) -> np.ndarray:
    e = cv2.Canny(g, int(EDGE_CANNY_LOW), int(EDGE_CANNY_HIGH))
    k = int(max(1, EDGE_DILATE_KERNEL))
    if k > 1:
        e = cv2.dilate(e, np.ones((k, k), dtype=np.uint8), iterations=1)
    return e


def preprocess_gray_variant(img_rgb: np.ndarray, variant: str) -> np.ndarray:
    g = to_gray(img_rgb)

    if variant == 'gray':
        out = g
    elif variant == 'gray_clahe':
        clahe = cv2.createCLAHE(clipLimit=float(CLAHE_CLIP_LIMIT), tileGridSize=tuple(CLAHE_GRID))
        out = clahe.apply(g)
    elif variant == 'gray_clahe_blur':
        clahe = cv2.createCLAHE(clipLimit=float(CLAHE_CLIP_LIMIT), tileGridSize=tuple(CLAHE_GRID))
        out = cv2.GaussianBlur(clahe.apply(g), (3, 3), 0)
    elif variant == 'gray_denoise':
        out = cv2.bilateralFilter(g, d=7, sigmaColor=50, sigmaSpace=50)
    elif variant == 'gray_edge_binary':
        out = _edges(g)
    elif variant == 'gray_edge_blend':
        e = _edges(g)
        out = cv2.addWeighted(g, float(EDGE_BLEND_GRAY), e, float(EDGE_BLEND_EDGE), 0.0)
    elif variant == 'gray_tophat':
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
        bg = cv2.morphologyEx(g, cv2.MORPH_OPEN, kernel)
        top = cv2.subtract(g, bg)
        out = cv2.normalize(top, None, 0, 255, cv2.NORM_MINMAX)
    else:
        raise KeyError(f'Unknown gray variant: {variant}')

    return cv2.cvtColor(out.astype(np.uint8), cv2.COLOR_GRAY2RGB)


def prepare_pair_data(pr: pd.Series) -> Dict[str, object]:
    anchor_xy = (float(pr['anchor_x']), float(pr['anchor_y']))
    val_xy = (float(pr['val_gt_x']), float(pr['val_gt_y']))

    anchor_rgb = load_train_image_cached(int(pr['anchor_id']), max_side=IMAGE_MAX_SIDE)
    val_rgb = load_train_image_cached(int(pr['val_id']), max_side=IMAGE_MAX_SIDE)

    tile_size = int(np.clip(
        round(float(CONST_MAP_CROP_BASE_PX) * float(CONST_MAP_CROP_SCALE)),
        int(CONST_MAP_CROP_MIN_PX),
        min(MAP_W, MAP_H, int(CONST_MAP_CROP_MAX_PX)),
    ))
    crop_rgb, x0, y0 = extract_crop_by_size(map_rgb, center_xy=anchor_xy, crop_w=tile_size, crop_h=tile_size)

    return {
        'anchor_rgb': anchor_rgb,
        'val_rgb': val_rgb,
        'crop_rgb': crop_rgb,
        'anchor_local': (float(anchor_xy[0] - x0), float(anchor_xy[1] - y0)),
        'val_local': (float(val_xy[0] - x0), float(val_xy[1] - y0)),
        'crop_origin': (int(x0), int(y0)),
    }


In [None]:
# SuperPoint + LightGlue matcher
@dataclass
class MatchResult:
    k0: np.ndarray
    k1: np.ndarray
    matches: np.ndarray
    conf: np.ndarray


class SuperPointLightGlueMatcher:
    def __init__(self, max_num_keypoints: int, device: Optional[str] = None):
        self.max_num_keypoints = int(max_num_keypoints)
        self.device = device or DEVICE
        self.extractor = None
        self.matcher = None

    def _lazy_init(self):
        if self.extractor is not None and self.matcher is not None:
            return
        self.extractor = SuperPoint(max_num_keypoints=self.max_num_keypoints).eval().to(self.device)
        self.matcher = LightGlue(features='superpoint').eval().to(self.device)

    def _to_tensor(self, img_rgb: np.ndarray) -> torch.Tensor:
        gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
        return (torch.from_numpy(gray).float()[None, None] / 255.0).to(self.device)

    @torch.inference_mode()
    def match(self, img0_rgb: np.ndarray, img1_rgb: np.ndarray, min_conf: float) -> MatchResult:
        self._lazy_init()

        t0 = self._to_tensor(img0_rgb)
        t1 = self._to_tensor(img1_rgb)

        f0 = self.extractor.extract(t0)
        f1 = self.extractor.extract(t1)
        out = self.matcher({'image0': f0, 'image1': f1})
        f0, f1, out = [rbd(x) for x in [f0, f1, out]]

        k0 = f0['keypoints'].detach().cpu().numpy().astype(np.float32)
        k1 = f1['keypoints'].detach().cpu().numpy().astype(np.float32)
        m = out['matches'].detach().cpu().numpy().astype(np.int32)

        if m.size == 0:
            return MatchResult(k0, k1, np.zeros((0, 2), np.int32), np.zeros((0,), np.float32))

        if m.ndim != 2:
            m = m.reshape(-1, 2)

        if 'scores' in out:
            conf = out['scores'].detach().cpu().numpy().astype(np.float32)
        else:
            conf = np.ones((m.shape[0],), dtype=np.float32)

        if conf.shape[0] != m.shape[0]:
            conf = np.ones((m.shape[0],), dtype=np.float32)

        keep = conf >= float(min_conf)
        return MatchResult(k0, k1, m[keep], conf[keep])


_MATCHER_CACHE: Dict[int, SuperPointLightGlueMatcher] = {}


def get_matcher(max_kp: int) -> SuperPointLightGlueMatcher:
    k = int(max_kp)
    if k not in _MATCHER_CACHE:
        _MATCHER_CACHE[k] = SuperPointLightGlueMatcher(max_num_keypoints=k, device=DEVICE)
    return _MATCHER_CACHE[k]


In [None]:
# Matching geometry + scale/rotation sweep

def magsac_homography(k0: np.ndarray, k1: np.ndarray, matches: np.ndarray, reproj_thr: float) -> Tuple[Optional[np.ndarray], np.ndarray]:
    if matches.shape[0] < 8:
        return None, np.zeros((matches.shape[0],), dtype=bool)

    p0 = k0[matches[:, 0]].astype(np.float32)
    p1 = k1[matches[:, 1]].astype(np.float32)

    H, mask = cv2.findHomography(
        p0,
        p1,
        method=cv2.USAC_MAGSAC,
        ransacReprojThreshold=float(reproj_thr),
        maxIters=10000,
        confidence=0.999,
    )
    if H is None or mask is None:
        return None, np.zeros((matches.shape[0],), dtype=bool)
    return H, mask.ravel().astype(bool)


def project_point_homography(H: np.ndarray, xy: Tuple[float, float]) -> Tuple[float, float]:
    p = np.array([float(xy[0]), float(xy[1]), 1.0], dtype=np.float64)
    q = H @ p
    if abs(float(q[2])) < 1e-12:
        return np.nan, np.nan
    return float(q[0] / q[2]), float(q[1] / q[2])


def estimate_rot_scale_from_h(H: np.ndarray, q_shape: Tuple[int, int, int]) -> Tuple[float, float]:
    hq, wq = q_shape[:2]
    c = (wq * 0.5, hq * 0.5)
    x = (wq * 0.75, hq * 0.5)
    y = (wq * 0.5, hq * 0.75)

    c2 = project_point_homography(H, c)
    x2 = project_point_homography(H, x)
    y2 = project_point_homography(H, y)

    if not all(np.isfinite(v) for v in [c2[0], c2[1], x2[0], x2[1], y2[0], y2[1]]):
        return np.nan, np.nan

    dx0 = x[0] - c[0]
    dy0 = x[1] - c[1]
    dx1 = x2[0] - c2[0]
    dy1 = x2[1] - c2[1]

    rot_deg = float(np.degrees(np.arctan2(dy1, dx1) - np.arctan2(dy0, dx0)))

    sx = np.hypot(dx1, dy1) / max(1e-8, np.hypot(dx0, dy0))
    sy = np.hypot(y2[0] - c2[0], y2[1] - c2[1]) / max(1e-8, np.hypot(y[0] - c[0], y[1] - c[1]))
    sc = float(0.5 * (sx + sy))
    return rot_deg, sc


def build_reference(crop_rgb: np.ndarray, anchor_local: Tuple[float, float], strategy: str, roi_factor: float) -> Tuple[np.ndarray, int, int]:
    if strategy == 'direct':
        return crop_rgb, 0, 0
    if strategy == 'anchor_roi':
        h, w = crop_rgb.shape[:2]
        rw = int(np.clip(round(float(w) * float(roi_factor)), 96, w))
        rh = int(np.clip(round(float(h) * float(roi_factor)), 96, h))
        return extract_crop_by_size(crop_rgb, center_xy=anchor_local, crop_w=rw, crop_h=rh)
    raise ValueError(f'Unknown strategy: {strategy}')


def run_sp_lg_sweep(
    query_rgb: np.ndarray,
    crop_rgb: np.ndarray,
    anchor_local: Tuple[float, float],
    gt_local: Tuple[float, float],
    strategy: str,
    roi_factor: float,
    max_kp: int,
    min_conf: float,
    reproj_thr: float,
    scales: List[float],
    rots_deg: List[float],
) -> Dict[str, object]:
    ref_rgb, rx, ry = build_reference(crop_rgb, anchor_local=anchor_local, strategy=strategy, roi_factor=roi_factor)
    matcher = get_matcher(max_kp=max_kp)

    best = None

    for s in scales:
        s = float(s)
        if s <= 0.0:
            continue
        q_s = resize_rgb(query_rgb, s) if abs(s - 1.0) > 1e-6 else query_rgb

        for rdeg in rots_deg:
            rdeg = float(rdeg)
            q_sr = rotate_rgb_keep_size(q_s, rdeg) if abs(rdeg) > 1e-6 else q_s

            try:
                m = matcher.match(q_sr, ref_rgb, min_conf=min_conf)
                H, inl = magsac_homography(m.k0, m.k1, m.matches, reproj_thr=reproj_thr)
                raw = int(m.matches.shape[0])
                inliers = int(inl.sum()) if inl is not None else 0

                if H is not None and inliers >= int(MIN_INLIERS_FOR_POSE):
                    q_center = (float(q_sr.shape[1] * 0.5), float(q_sr.shape[0] * 0.5))
                    px_ref, py_ref = project_point_homography(H, q_center)
                    if np.isfinite(px_ref) and np.isfinite(py_ref):
                        pred = (float(px_ref + rx), float(py_ref + ry))
                        err_px = float(np.hypot(pred[0] - gt_local[0], pred[1] - gt_local[1]))
                    else:
                        pred = (np.nan, np.nan)
                        err_px = np.nan
                    est_rot_deg, est_scale = estimate_rot_scale_from_h(H, q_sr.shape)
                else:
                    pred = (np.nan, np.nan)
                    err_px = np.nan
                    est_rot_deg, est_scale = np.nan, np.nan

                conf_mean = float(m.conf.mean()) if m.conf.shape[0] > 0 else 0.0
                err_rank = -float(err_px) if np.isfinite(err_px) else -1e6
                score = (inliers, raw, err_rank, conf_mean)

                cand = {
                    'ok': True,
                    'query_used': q_sr,
                    'ref_used': ref_rgb,
                    'ref_offset': (int(rx), int(ry)),
                    'scale_used': s,
                    'rot_used_deg': rdeg,
                    'match': m,
                    'H': H,
                    'inlier_mask': inl,
                    'raw_matches': raw,
                    'inliers': inliers,
                    'pred_local': pred,
                    'err_px': err_px,
                    'est_rot_deg': est_rot_deg,
                    'est_scale': est_scale,
                    'score': score,
                }
            except Exception as e:
                cand = {
                    'ok': False,
                    'error': str(e),
                    'query_used': q_sr,
                    'ref_used': ref_rgb,
                    'ref_offset': (int(rx), int(ry)),
                    'scale_used': s,
                    'rot_used_deg': rdeg,
                    'match': MatchResult(
                        k0=np.zeros((0, 2), dtype=np.float32),
                        k1=np.zeros((0, 2), dtype=np.float32),
                        matches=np.zeros((0, 2), dtype=np.int32),
                        conf=np.zeros((0,), dtype=np.float32),
                    ),
                    'H': None,
                    'inlier_mask': np.zeros((0,), dtype=bool),
                    'raw_matches': 0,
                    'inliers': 0,
                    'pred_local': (np.nan, np.nan),
                    'err_px': np.nan,
                    'est_rot_deg': np.nan,
                    'est_scale': np.nan,
                    'score': (-1, -1, -1e9, -1.0),
                }

            if best is None or cand['score'] > best['score']:
                best = cand

    if best is None:
        best = {
            'ok': False,
            'query_used': query_rgb,
            'ref_used': ref_rgb,
            'ref_offset': (int(rx), int(ry)),
            'scale_used': 1.0,
            'rot_used_deg': 0.0,
            'match': MatchResult(
                k0=np.zeros((0, 2), dtype=np.float32),
                k1=np.zeros((0, 2), dtype=np.float32),
                matches=np.zeros((0, 2), dtype=np.int32),
                conf=np.zeros((0,), dtype=np.float32),
            ),
            'H': None,
            'inlier_mask': np.zeros((0,), dtype=bool),
            'raw_matches': 0,
            'inliers': 0,
            'pred_local': (np.nan, np.nan),
            'err_px': np.nan,
            'est_rot_deg': np.nan,
            'est_scale': np.nan,
            'score': (-1, -1, -1e9, -1.0),
        }

    return best


In [None]:
# Auto-tuning (hyperparameter search + auto-expansion until target)
BASE_SEARCH_LEVELS = [
    {
        'name': 'L1_coarse',
        'max_kp': [4096],
        'min_conf': [0.05, 0.03],
        'reproj_thr': [3.0, 4.0],
        'filter': ['gray', 'gray_clahe', 'gray_clahe_blur'],
        'strategy': ['direct'],
        'roi_factor': [1.0],
        'crop_scale': [1.0],
        'scales': [0.25, 0.35, 0.50, 0.70, 1.00],
        'rots': [-20, -10, 0, 10, 20],
    },
    {
        'name': 'L2_wider',
        'max_kp': [4096, 8192],
        'min_conf': [0.03, 0.01],
        'reproj_thr': [3.0, 4.0, 6.0],
        'filter': ['gray', 'gray_clahe', 'gray_clahe_blur', 'gray_denoise', 'gray_edge_blend'],
        'strategy': ['direct', 'anchor_roi'],
        'roi_factor': [1.0, 0.85, 0.70],
        'crop_scale': [1.0, 1.2],
        'scales': [0.18, 0.25, 0.35, 0.50, 0.70, 1.00, 1.30],
        'rots': [-35, -25, -15, -8, 0, 8, 15, 25, 35],
    },
    {
        'name': 'L3_aggressive',
        'max_kp': [8192, 12000],
        'min_conf': [0.01, 0.005],
        'reproj_thr': [4.0, 6.0, 8.0],
        'filter': ['gray', 'gray_clahe', 'gray_clahe_blur', 'gray_denoise', 'gray_edge_blend', 'gray_edge_binary', 'gray_tophat'],
        'strategy': ['direct', 'anchor_roi'],
        'roi_factor': [1.0, 0.85, 0.70, 0.55],
        'crop_scale': [1.0, 1.2, 1.5],
        'scales': [0.12, 0.16, 0.24, 0.35, 0.50, 0.70, 1.00, 1.40, 1.80],
        'rots': [-60, -45, -35, -25, -15, 0, 15, 25, 35, 45, 60],
    },
]


def expand_level(level: Dict[str, object], round_idx: int) -> Dict[str, object]:
    lvl = deepcopy(level)
    if round_idx <= 0:
        return lvl

    # progressively relax and enlarge search
    kp_extra = int(round(max(lvl['max_kp']) * (1.0 + 0.35 * round_idx)))
    lvl['max_kp'] = sorted(set([int(x) for x in lvl['max_kp']] + [kp_extra]))

    min_conf_floor = max(0.001, float(min(lvl['min_conf'])) * (0.6 ** round_idx))
    lvl['min_conf'] = sorted(set([float(x) for x in lvl['min_conf']] + [min_conf_floor]), reverse=True)

    max_rot = max(abs(float(x)) for x in lvl['rots']) + 10.0 * round_idx
    lvl['rots'] = sorted(set([float(x) for x in lvl['rots']] + [-max_rot, max_rot]))

    smin = min(float(x) for x in lvl['scales'])
    smax = max(float(x) for x in lvl['scales'])
    lvl['scales'] = sorted(set([float(x) for x in lvl['scales']] + [max(0.08, smin * 0.8), min(2.6, smax * 1.2)]))

    cmax = max(float(x) for x in lvl['crop_scale'])
    lvl['crop_scale'] = sorted(set([float(x) for x in lvl['crop_scale']] + [min(2.2, cmax + 0.2 * round_idx)]))

    if round_idx >= 2:
        lvl['strategy'] = sorted(set([str(x) for x in lvl['strategy']] + ['anchor_roi']))
        lvl['roi_factor'] = sorted(set([float(x) for x in lvl['roi_factor']] + [0.45]))

    return lvl


def build_configs(level: Dict[str, object]) -> List[Dict[str, object]]:
    cfgs = []
    for mkp, mcf, rpt, fil, strat, rfac, csc in product(
        level['max_kp'],
        level['min_conf'],
        level['reproj_thr'],
        level['filter'],
        level['strategy'],
        level['roi_factor'],
        level['crop_scale'],
    ):
        if str(strat) == 'direct' and float(rfac) != 1.0:
            continue
        cfgs.append({
            'level': str(level['name']),
            'max_kp': int(mkp),
            'min_conf': float(mcf),
            'reproj_thr': float(rpt),
            'filter': str(fil),
            'strategy': str(strat),
            'roi_factor': float(rfac),
            'crop_scale': float(csc),
            'scales': [float(x) for x in level['scales']],
            'rots': [float(x) for x in level['rots']],
        })
    return cfgs


def sample_configs(cfgs: List[Dict[str, object]], max_n: int, seed: int = 7) -> List[Dict[str, object]]:
    if len(cfgs) <= int(max_n):
        return cfgs
    rng = np.random.default_rng(int(seed))
    idx = np.sort(rng.choice(np.arange(len(cfgs)), size=int(max_n), replace=False))
    return [cfgs[int(i)] for i in idx]


def evaluate_config_for_pair(pr: pd.Series, cfg: Dict[str, object]) -> Dict[str, object]:
    data = prepare_pair_data(pr)

    # optional larger crop around anchor
    if abs(float(cfg['crop_scale']) - 1.0) > 1e-6:
        crop_h, crop_w = data['crop_rgb'].shape[:2]
        new_w = int(np.clip(round(crop_w * float(cfg['crop_scale'])), int(CONST_MAP_CROP_MIN_PX), min(MAP_W, int(CONST_MAP_CROP_MAX_PX))))
        new_h = int(np.clip(round(crop_h * float(cfg['crop_scale'])), int(CONST_MAP_CROP_MIN_PX), min(MAP_H, int(CONST_MAP_CROP_MAX_PX))))

        anchor_global = (float(pr['anchor_x']), float(pr['anchor_y']))
        crop2, x2, y2 = extract_crop_by_size(map_rgb, center_xy=anchor_global, crop_w=new_w, crop_h=new_h)
        data['crop_rgb'] = crop2
        data['crop_origin'] = (int(x2), int(y2))
        data['anchor_local'] = (float(anchor_global[0] - x2), float(anchor_global[1] - y2))
        data['val_local'] = (float(pr['val_gt_x'] - x2), float(pr['val_gt_y'] - y2))

    anchor_f = preprocess_gray_variant(data['anchor_rgb'], cfg['filter'])
    val_f = preprocess_gray_variant(data['val_rgb'], cfg['filter'])
    crop_f = preprocess_gray_variant(data['crop_rgb'], cfg['filter'])

    val_res = run_sp_lg_sweep(
        query_rgb=val_f,
        crop_rgb=crop_f,
        anchor_local=data['anchor_local'],
        gt_local=data['val_local'],
        strategy=cfg['strategy'],
        roi_factor=cfg['roi_factor'],
        max_kp=cfg['max_kp'],
        min_conf=cfg['min_conf'],
        reproj_thr=cfg['reproj_thr'],
        scales=cfg['scales'],
        rots_deg=cfg['rots'],
    )

    err_term = float(val_res['err_px']) if np.isfinite(val_res['err_px']) else 1e5
    score = float(2000.0 * val_res['inliers'] + 2.0 * val_res['raw_matches'] - 0.4 * err_term)

    return {
        'cfg': cfg,
        'data': data,
        'anchor_f': anchor_f,
        'val_f': val_f,
        'crop_f': crop_f,
        'val_res': val_res,
        'score': score,
    }


def run_tuning(pr: pd.Series, target_inliers: int = 100) -> Tuple[pd.DataFrame, Dict[str, object]]:
    rows = []
    best = None
    reached = False

    for round_idx in range(int(AUTO_EXPAND_MAX_ROUNDS)):
        print(f'===== round {round_idx + 1}/{AUTO_EXPAND_MAX_ROUNDS} =====')

        for lvl_i, base_lvl in enumerate(BASE_SEARCH_LEVELS, 1):
            lvl = expand_level(base_lvl, round_idx=round_idx)
            cfgs_all = build_configs(lvl)
            cfgs = sample_configs(cfgs_all, max_n=int(MAX_CONFIGS_PER_LEVEL), seed=int(RNG_SEED + 100 * round_idx + lvl_i))
            print(f"[{lvl['name']}] evaluating {len(cfgs)} configs (from {len(cfgs_all)})")

            t0 = perf_counter()
            for j, cfg in enumerate(cfgs, 1):
                try:
                    out = evaluate_config_for_pair(pr, cfg)
                except Exception as e:
                    out = {
                        'cfg': cfg,
                        'data': None,
                        'anchor_f': None,
                        'val_f': None,
                        'crop_f': None,
                        'val_res': {
                            'raw_matches': 0,
                            'inliers': 0,
                            'err_px': np.nan,
                            'scale_used': np.nan,
                            'rot_used_deg': np.nan,
                            'est_rot_deg': np.nan,
                            'est_scale': np.nan,
                        },
                        'score': -1e12,
                        'error': str(e),
                    }

                vr = out['val_res']
                row = {
                    'round': int(round_idx + 1),
                    'level': cfg['level'],
                    'backend': 'superpoint_lightglue',
                    'filter': cfg['filter'],
                    'strategy': cfg['strategy'],
                    'roi_factor': cfg['roi_factor'],
                    'crop_scale': cfg['crop_scale'],
                    'max_kp': cfg['max_kp'],
                    'min_conf': cfg['min_conf'],
                    'reproj_thr': cfg['reproj_thr'],
                    'scale_used': float(vr['scale_used']) if np.isfinite(vr['scale_used']) else np.nan,
                    'rot_used_deg': float(vr['rot_used_deg']) if np.isfinite(vr['rot_used_deg']) else np.nan,
                    'est_rot_deg': float(vr['est_rot_deg']) if np.isfinite(vr['est_rot_deg']) else np.nan,
                    'est_scale': float(vr['est_scale']) if np.isfinite(vr['est_scale']) else np.nan,
                    'raw_matches': int(vr['raw_matches']),
                    'inliers': int(vr['inliers']),
                    'err_px': float(vr['err_px']) if np.isfinite(vr['err_px']) else np.nan,
                    'score': float(out['score']),
                }
                rows.append(row)

                if best is None or out['score'] > best['score']:
                    best = out

                if j % 20 == 0:
                    bvr = best['val_res']
                    print(f"  {j:>3}/{len(cfgs)} | best inl={int(bvr['inliers'])} raw={int(bvr['raw_matches'])} err={bvr['err_px']}")

            dt = perf_counter() - t0
            bvr = best['val_res']
            print(f"[{lvl['name']}] done in {dt:.1f}s | best inl={int(bvr['inliers'])} raw={int(bvr['raw_matches'])} err={bvr['err_px']}")

            if int(bvr['inliers']) >= int(target_inliers):
                print(f"Target reached: {int(bvr['inliers'])} >= {target_inliers}")
                reached = True
                break

        if reached:
            break

    if best is None:
        raise RuntimeError('No configuration could be evaluated.')

    df = pd.DataFrame(rows)
    if len(df):
        df = df.sort_values(['inliers', 'raw_matches', 'score'], ascending=False).reset_index(drop=True)
    return df, best


In [None]:
# Run tuning on selected pair
sel = pair_df[
    (pair_df['anchor_id'].astype(int) == int(PAIR_ANCHOR_ID)) &
    (pair_df['val_id'].astype(int) == int(PAIR_VAL_ID))
]
if len(sel) == 0:
    idx = int(np.clip(PREVIEW_PAIR_INDEX, 0, len(pair_df) - 1))
    pr = pair_df.iloc[idx]
    print(f'Pair {PAIR_ANCHOR_ID}->{PAIR_VAL_ID} not found, using pair index {idx}.')
else:
    pr = sel.iloc[0]

print('using pair:', int(pr['anchor_id']), '->', int(pr['val_id']))

# preview constant crop before tuning
preview = prepare_pair_data(pr)
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
axes[0].imshow(preview['anchor_rgb'])
axes[0].set_title(f"Anchor image | id={int(pr['anchor_id'])}")
axes[0].axis('off')

axes[1].imshow(preview['val_rgb'])
axes[1].set_title(f"Validation image | id={int(pr['val_id'])}")
axes[1].axis('off')

axes[2].imshow(preview['crop_rgb'])
axes[2].scatter([preview['anchor_local'][0]], [preview['anchor_local'][1]], s=35, c='red', label='anchor GT')
axes[2].scatter([preview['val_local'][0]], [preview['val_local'][1]], s=35, c='cyan', label='val GT')
axes[2].set_title(f"Constant map crop | {preview['crop_rgb'].shape[1]}x{preview['crop_rgb'].shape[0]}")
axes[2].legend(loc='upper right')
axes[2].axis('off')
plt.tight_layout()
plt.show()

tune_df, best = run_tuning(pr=pr, target_inliers=int(TARGET_INLIERS))

print('best config:', best['cfg'])
print('best val result:', {
    'raw_matches': int(best['val_res']['raw_matches']),
    'inliers': int(best['val_res']['inliers']),
    'err_px': float(best['val_res']['err_px']) if np.isfinite(best['val_res']['err_px']) else np.nan,
    'scale_used': float(best['val_res']['scale_used']) if np.isfinite(best['val_res']['scale_used']) else np.nan,
    'rot_used_deg': float(best['val_res']['rot_used_deg']) if np.isfinite(best['val_res']['rot_used_deg']) else np.nan,
    'est_rot_deg': float(best['val_res']['est_rot_deg']) if np.isfinite(best['val_res']['est_rot_deg']) else np.nan,
    'est_scale': float(best['val_res']['est_scale']) if np.isfinite(best['val_res']['est_scale']) else np.nan,
})

display(tune_df.head(30))


In [None]:
# Visualization: 4 columns with matches + map overlay

def pick_draw_idx(m: MatchResult, inl: np.ndarray, max_draw: int = 160, seed: int = 7):
    n = int(m.matches.shape[0])
    if n == 0:
        return np.zeros((0,), dtype=np.int32)

    if inl is not None and inl.shape[0] == n and np.any(inl):
        idx = np.where(inl.astype(bool))[0]
    else:
        idx = np.arange(n, dtype=np.int32)

    if idx.size <= max_draw:
        return idx

    if m.conf is not None and m.conf.shape[0] == n:
        order = np.argsort(-m.conf[idx])[:max_draw]
        return idx[order]

    rng = np.random.default_rng(int(seed))
    return np.sort(rng.choice(idx, size=max_draw, replace=False))


def draw_heading(ax, x: float, y: float, deg: float, color: str, length: float = 55.0):
    if not np.isfinite(deg):
        return
    t = np.deg2rad(float(deg))
    dx = float(length * np.cos(t))
    dy = float(length * np.sin(t))
    ax.arrow(x, y, dx, dy, color=color, width=0.8, head_width=10, head_length=12, length_includes_head=True)


def draw_best_4col(best: Dict[str, object], pr: pd.Series, max_draw: int = 160, seed: int = 7):
    cfg = best['cfg']
    data = best['data']

    # Anchor run with same tuned config for side-by-side comparison
    anchor_res = run_sp_lg_sweep(
        query_rgb=best['anchor_f'],
        crop_rgb=best['crop_f'],
        anchor_local=data['anchor_local'],
        gt_local=data['anchor_local'],
        strategy=cfg['strategy'],
        roi_factor=cfg['roi_factor'],
        max_kp=cfg['max_kp'],
        min_conf=cfg['min_conf'],
        reproj_thr=cfg['reproj_thr'],
        scales=cfg['scales'],
        rots_deg=cfg['rots'],
    )
    val_res = best['val_res']

    a = anchor_res['query_used']
    v = val_res['query_used']
    c = best['crop_f']

    # [anchor | crop] [val | crop]
    h = max(a.shape[0], c.shape[0], v.shape[0], c.shape[0])
    w1, w2, w3, w4 = a.shape[1], c.shape[1], v.shape[1], c.shape[1]
    W = w1 + w2 + w3 + w4
    canvas = np.zeros((h, W, 3), dtype=np.uint8)

    def put(im, x):
        y = (h - im.shape[0]) // 2
        canvas[y:y + im.shape[0], x:x + im.shape[1]] = im
        return (x, y, im.shape[1], im.shape[0])

    p1 = put(a, 0)
    p2 = put(c, w1)
    p3 = put(v, w1 + w2)
    p4 = put(c, w1 + w2 + w3)

    fig, ax = plt.subplots(1, 1, figsize=(30, 8))
    ax.imshow(canvas)
    ax.axis('off')

    for xs in [p2[0] - 0.5, p3[0] - 0.5, p4[0] - 0.5]:
        ax.axvline(xs, color='white', linewidth=1.0, alpha=0.55)

    # anchor -> crop lines (cyan)
    am = anchor_res['match']
    aidx = pick_draw_idx(am, anchor_res['inlier_mask'], max_draw=max_draw, seed=seed)
    x1, y1, _, _ = p1
    x2, y2, _, _ = p2
    aoffx, aoffy = anchor_res['ref_offset']
    for i in aidx:
        ii = int(i)
        qi = int(am.matches[ii, 0]); ri = int(am.matches[ii, 1])
        q = am.k0[qi]; r = am.k1[ri]
        ok_inl = bool(anchor_res['inlier_mask'][ii]) if anchor_res['inlier_mask'] is not None and ii < len(anchor_res['inlier_mask']) else False
        ax.plot(
            [float(q[0] + x1), float(r[0] + aoffx + x2)],
            [float(q[1] + y1), float(r[1] + aoffy + y2)],
            color='cyan', linewidth=1.3 if ok_inl else 0.6, alpha=0.9 if ok_inl else 0.2,
        )

    # val -> crop lines (orange)
    vm = val_res['match']
    vidx = pick_draw_idx(vm, val_res['inlier_mask'], max_draw=max_draw, seed=seed)
    x3, y3, _, _ = p3
    x4, y4, _, _ = p4
    voffx, voffy = val_res['ref_offset']
    for i in vidx:
        ii = int(i)
        qi = int(vm.matches[ii, 0]); ri = int(vm.matches[ii, 1])
        q = vm.k0[qi]; r = vm.k1[ri]
        ok_inl = bool(val_res['inlier_mask'][ii]) if val_res['inlier_mask'] is not None and ii < len(val_res['inlier_mask']) else False
        ax.plot(
            [float(q[0] + x3), float(r[0] + voffx + x4)],
            [float(q[1] + y3), float(r[1] + voffy + y4)],
            color='orange', linewidth=1.3 if ok_inl else 0.6, alpha=0.9 if ok_inl else 0.2,
        )

    al = data['anchor_local']
    vl = data['val_local']

    # GT points in both crop columns
    ax.scatter([al[0] + x2], [al[1] + y2], s=32, c='red')
    ax.scatter([vl[0] + x2], [vl[1] + y2], s=30, c='cyan')
    ax.scatter([al[0] + x4], [al[1] + y4], s=32, c='red')
    ax.scatter([vl[0] + x4], [vl[1] + y4], s=30, c='cyan')

    # predicted val center in right crop
    pred = val_res['pred_local']
    if np.isfinite(pred[0]) and np.isfinite(pred[1]):
        px = float(pred[0] + x4)
        py = float(pred[1] + y4)
        ax.scatter([px], [py], s=100, c='yellow', marker='*')
        draw_heading(ax, px, py, float(val_res['est_rot_deg']), color='yellow', length=50)

    ax.set_title(
        f"{cfg['filter']} | superpoint_lightglue | {cfg['strategy']} | "
        f"A raw={anchor_res['raw_matches']} inl={anchor_res['inliers']} | "
        f"V raw={val_res['raw_matches']} inl={val_res['inliers']} err={val_res['err_px']:.1f} "
        f"est_rot={val_res['est_rot_deg']:.1f}deg est_scale={val_res['est_scale']:.3f}"
    )
    plt.tight_layout()
    plt.show()


def draw_map_overlay(best: Dict[str, object], pr: pd.Series):
    data = best['data']
    x0, y0 = data['crop_origin']
    vpred = best['val_res']['pred_local']

    if np.isfinite(vpred[0]) and np.isfinite(vpred[1]):
        pred_x = float(np.clip(vpred[0] + x0, 0, MAP_W - 1))
        pred_y = float(np.clip(vpred[1] + y0, 0, MAP_H - 1))
    else:
        pred_x, pred_y = np.nan, np.nan

    gt_x, gt_y = float(pr['val_gt_x']), float(pr['val_gt_y'])
    anc_x, anc_y = float(pr['anchor_x']), float(pr['anchor_y'])

    fig, axes = plt.subplots(1, 2, figsize=(20, 8))

    axes[0].imshow(map_rgb)
    axes[0].scatter([anc_x], [anc_y], s=45, c='red', label='anchor GT')
    axes[0].scatter([gt_x], [gt_y], s=45, c='cyan', label='val GT')
    if np.isfinite(pred_x) and np.isfinite(pred_y):
        axes[0].scatter([pred_x], [pred_y], s=80, c='yellow', marker='*', label='val pred')
        axes[0].plot([gt_x, pred_x], [gt_y, pred_y], color='orange', linewidth=1.3, alpha=0.8)
    axes[0].set_title('Global map overlay')
    axes[0].legend(loc='upper right')
    axes[0].axis('off')

    crop_vis = best['crop_f']
    axes[1].imshow(crop_vis)
    al = data['anchor_local']
    vl = data['val_local']
    axes[1].scatter([al[0]], [al[1]], s=45, c='red', label='anchor GT')
    axes[1].scatter([vl[0]], [vl[1]], s=45, c='cyan', label='val GT')
    if np.isfinite(vpred[0]) and np.isfinite(vpred[1]):
        axes[1].scatter([vpred[0]], [vpred[1]], s=80, c='yellow', marker='*', label='val pred')
        axes[1].plot([vl[0], vpred[0]], [vl[1], vpred[1]], color='orange', linewidth=1.3, alpha=0.8)
    axes[1].set_title('Local crop overlay')
    axes[1].legend(loc='upper right')
    axes[1].axis('off')

    plt.tight_layout()
    plt.show()


draw_best_4col(best, pr, max_draw=180, seed=RNG_SEED)
draw_map_overlay(best, pr)


In [None]:
# Diagnostics: which hyperparameters worked best?
if len(tune_df) == 0:
    print('No tuning rows available.')
else:
    cols = [
        'round', 'level', 'filter', 'strategy', 'crop_scale',
        'max_kp', 'min_conf', 'reproj_thr',
        'scale_used', 'rot_used_deg', 'est_rot_deg', 'est_scale',
        'raw_matches', 'inliers', 'err_px', 'score'
    ]
    display(tune_df[cols].head(25))

    by_filter = (
        tune_df
        .groupby('filter', as_index=False)
        .agg(
            best_inliers=('inliers', 'max'),
            mean_inliers=('inliers', 'mean'),
            best_err_px=('err_px', 'min'),
            runs=('inliers', 'size'),
        )
        .sort_values(['best_inliers', 'mean_inliers'], ascending=False)
        .reset_index(drop=True)
    )
    display(by_filter)

    topn = min(20, len(tune_df))
    plot_df = tune_df.head(topn).iloc[::-1]
    fig, ax = plt.subplots(1, 1, figsize=(10, max(4, 0.35 * topn)))
    labels = [f"{r['filter']} | kp={int(r['max_kp'])} conf={r['min_conf']:.3f}" for _, r in plot_df.iterrows()]
    ax.barh(np.arange(topn), plot_df['inliers'].to_numpy(), color='tab:blue', alpha=0.85)
    ax.set_yticks(np.arange(topn))
    ax.set_yticklabels(labels)
    ax.set_xlabel('Inliers')
    ax.set_title('Top configurations by inliers')
    ax.grid(axis='x', alpha=0.25)
    plt.tight_layout()
    plt.show()

best_inl = int(best['val_res']['inliers'])
if best_inl >= int(TARGET_INLIERS):
    print(f'Target achieved: {best_inl} inliers >= {TARGET_INLIERS}')
else:
    print(f'Target not reached: {best_inl} inliers < {TARGET_INLIERS}')
    print('Increase AUTO_EXPAND_MAX_ROUNDS or MAX_CONFIGS_PER_LEVEL, then rerun tuning cell.')
