# Podejście topologiczne: RF-DETR na punktach orientacyjnych → dominacja

**Cel:** Zamiast klasyfikatora right/left (ResNet) lub głosowania po segmentach — model wykrywa **landmarky anatomiczne**, a dominacja jest wyznaczana **post-processingiem** (odległość Crux vs dystalne końce RCA/LCx).

**Zalety:**
- RT-DETR (self-attention) widzi relacje globalne (ujście RCA ↔ crux cordis).
- Wynik interpretowalny: widać, gdzie model umieścił Crux i końce naczyń.
- Mniej zależny od pojedynczego segmentu (np. 4/PDA), który może być niewidoczny.

**Architektura:**
1. **Backbone + Transformer (RF-DETR)** — jedna klatka angiografii (np. max opacification).
2. **5 klas landmarków:** A=RCA Ostium, B=LCA Ostium, C=Crux Cordis, D=Distal RCA, E=Distal LCx.
3. **Logika dominacji:** odległość euklidesowa (lub IoU) Crux ↔ D vs Crux ↔ E → Right vs Left dominance.

## 1. Definicja 5 klas landmarków i mapowanie z segmentów ARCADE

| Klasa | Nazwa           | Źródło w ARCADE (segmenty) | Opis |
|-------|-----------------|----------------------------|------|
| 1     | RCA Ostium      | segment 1 (RCA prox)       | Ujście prawej tętnicy wieńcowej |
| 2     | LCA Ostium      | segment 5 (LM)             | Ujście lewej tętnicy (LM) |
| 3     | Crux Cordis     | segment 4 (PDA) / koniec RCA | Skrzyżowanie bruzd (tylna ściana) |
| 4     | Distal RCA      | segment 3 (RCA dist) lub 4 (PDA) | Dystalny koniec RCA |
| 5     | Distal LCx      | segment 15 (LCx dist) lub 16 (PLB) | Dystalny koniec gałęzi okalającej |

**Uwaga:** Landmarky można anotować ręcznie (punkt lub mały bbox) albo **wyprowadzić z masek 25-class** (centroid / punkt dystalny maski).

In [1]:
# Stałe dla pipeline'u landmarków (num_classes=5)
LANDMARK_CLASSES = [
    "rca_ostium",   # 1
    "lca_ostium",   # 2
    "crux_cordis",  # 3
    "distal_rca",   # 4
    "distal_lcx",   # 5
]
LANDMARK_ID_CRUX = 3
LANDMARK_ID_DISTAL_RCA = 4
LANDMARK_ID_DISTAL_LCX = 5

# Mapowanie: segment ARCADE (class_id 1–25) → landmark (1–5). Jeden segment → jeden landmark (priorytet).
SEGMENT_TO_LANDMARK = {
    1: 1,   # RCA prox → rca_ostium
    5: 2,   # LM → lca_ostium
    4: 3,   # PDA → crux_cordis
    3: 4,   # RCA dist → distal_rca
    19: 5,  # LCx dist → distal_lcx
    20: 5,  # PLB → distal_lcx
}

## 2. Generowanie datasetu COCO dla 5 landmarków z istniejącego datasetu 25-class

Dla każdego obrazu z `_annotations.coco.json` (25 segmentów):
- Dla każdej kategorii segmentu mapowanej do landmarku: oblicz bbox (lub centroid) maski → zapisz jako ann do kategorii landmarku 1–5.
- Jedna maska segmentu może dać jeden bbox/point na jeden landmark; przy konfliktach (np. segment 4 → crux i distal_rca) można duplikować lub wybrać jedną regułę (np. PDA tylko crux).

In [3]:
import json
import os
from pathlib import Path
from pycocotools import mask as mask_utils
import numpy as np

def segments_coco_to_landmark_coco(
    coco_path: str,
    images_dir: str,
    output_dir: str,
    segment_to_landmark: dict,
    min_mask_area: int = 100,
) -> str:
    """
    Czyta COCO z 25 segmentami, dla każdej ann mapuje segment -> landmark,
    buduje bbox z maski (lub RLE) i zapisuje nowy _annotations.coco.json z 5 kategoriami.
    """
    with open(coco_path) as f:
        coco = json.load(f)
    os.makedirs(output_dir, exist_ok=True)
    out_images_dir = os.path.join(output_dir, "images")
    os.makedirs(out_images_dir, exist_ok=True)

    cat_id_old_to_new = {}
    new_cats = []
    for i, name in enumerate(LANDMARK_CLASSES, start=1):
        new_cats.append({"id": i, "name": name, "supercategory": "landmark"})
        cat_id_old_to_new[i] = i
    new_anns = []
    ann_id = 1
    for ann in coco["annotations"]:
        cat_old = ann["category_id"]
        if cat_old not in segment_to_landmark:
            continue
        landmark_id = segment_to_landmark[cat_old]
        if "segmentation" in ann and ann["segmentation"]:
            seg = ann["segmentation"]
            if isinstance(seg, list) and len(seg):
                poly = np.array(seg[0]).reshape(-1, 2)
                if poly.size < 6:
                    continue
                x, y = poly[:, 0], poly[:, 1]
                x_min, x_max = float(x.min()), float(x.max())
                y_min, y_max = float(y.min()), float(y.max())
            elif isinstance(seg, dict) and "counts" in seg:
                try:
                    m = mask_utils.decode(seg)
                    if m.size and m.ndim >= 2:
                        ys, xs = np.where(m > 0)
                        if xs.size:
                            x_min, x_max = float(xs.min()), float(xs.max())
                            y_min, y_max = float(ys.min()), float(ys.max())
                        else:
                            continue
                    else:
                        continue
                except Exception:
                    continue
            else:
                continue
        elif "bbox" in ann:
            x_min = ann["bbox"][0]
            y_min = ann["bbox"][1]
            x_max = x_min + ann["bbox"][2]
            y_max = y_min + ann["bbox"][3]
        else:
            continue
        w, h = x_max - x_min, y_max - y_min
        if w * h < min_mask_area:
            continue
        seg_poly = [[x_min, y_min, x_min + w, y_min, x_min + w, y_min + h, x_min, y_min + h]]
        new_anns.append({
            "id": ann_id,
            "image_id": ann["image_id"],
            "category_id": landmark_id,
            "bbox": [x_min, y_min, w, h],
            "area": w * h,
            "iscrowd": 0,
            "segmentation": seg_poly,
        })
        ann_id += 1
    image_ids_with_anns = {a["image_id"] for a in new_anns}
    new_images = [
        {"id": img["id"], "file_name": img["file_name"], "width": img["width"], "height": img["height"]}
        for img in coco["images"] if img["id"] in image_ids_with_anns
    ]
    out_coco = {"images": new_images, "annotations": new_anns, "categories": new_cats}
    out_path = os.path.join(output_dir, "_annotations.coco.json")
    with open(out_path, "w") as f:
        json.dump(out_coco, f, indent=2)
    return out_path

In [12]:
# Jednorazowa poprawka: dodaj supercategory do już wygenerowanego _annotations.coco.json
# (rfdetr wymaga tego pola w categories). Uruchom raz, potem trening zadziała.
def patch_coco_supercategory(dataset_dir: str = "arcade_landmark_5class", supercat: str = "landmark"):
    for split in ["train", "valid", "test"]:
        path = os.path.join(dataset_dir, split, "_annotations.coco.json")
        if not os.path.exists(path):
            continue
        with open(path) as f:
            data = json.load(f)
        for c in data.get("categories", []):
            if "supercategory" not in c:
                c["supercategory"] = supercat
        with open(path, "w") as f:
            json.dump(data, f, indent=2)
        print(f"Patched: {path}")
patch_coco_supercategory()

Patched: arcade_landmark_5class/train/_annotations.coco.json
Patched: arcade_landmark_5class/valid/_annotations.coco.json
Patched: arcade_landmark_5class/test/_annotations.coco.json


In [11]:
# Przykład: konwersja train/valid z datasetu 25-class na dataset 5 landmarków
# ARCADE_COCO = "arcade_coco_dataset_multiclass_seg"
ARCADE_COCO = "/Users/rafalszulinski/Desktop/developing/IVES/coronary/rf-detr-seg/notebooks/notebooks/arcade_coco_detection2"  # COCO multiclass dataset

for split in ["train", "valid", "test"]:
    coco_path = os.path.join(ARCADE_COCO, split, "_annotations.coco.json")
    if os.path.exists(coco_path):
        segments_coco_to_landmark_coco(
            coco_path,
            images_dir=os.path.join(ARCADE_COCO, split),
            output_dir=os.path.join("arcade_landmark_5class", split),
            segment_to_landmark=SEGMENT_TO_LANDMARK,
        )
        print(f"Done: {split}")

Done: train
Done: valid
Done: test


## 3. Trening RF-DETR z 5 klasami (landmarky)

Model ten sam co w pipeline (RFDETRSegLarge), ale `num_classes=5`. Dataset w formacie COCO z 5 kategoriami (jak wyżej).

In [17]:
import logging
if not hasattr(logging, "warn"):
    logging.warn = logging.warning
from rfdetr import RFDETRSegSmall

LANDMARK_DATASET = "arcade_landmark_5class"
model_landmark = RFDETRSegSmall(num_classes=5)
model_landmark.train(
    dataset_dir=LANDMARK_DATASET,
    epochs=10,
    batch_size=2,
    lr=5e-4,
    lr_encoder=5e-5,
    lr_scheduler="cosine",
    warmup_epochs=3,
    use_ema=True,
    early_stopping=True,
    early_stopping_patience=12,
    checkpoint_interval=5,
    output_dir="checkpoints_landmark5",
)
model_landmark.optimize_for_inference()

[2026-02-16 13:25:34] [INFO] rf-detr - File rf-detr-seg-small.pt already exists with correct MD5 hash.




[2026-02-16 13:25:34] [INFO] rf-detr - Loading pretrain weights




[2026-02-16 13:25:35] [INFO] rf-detr - Not using distributed mode
[2026-02-16 13:25:35] [INFO] rf-detr - git:
  unknown

[2026-02-16 13:25:35] [INFO] rf-detr - Namespace(num_classes=6, grad_accum_steps=4, print_freq=10, amp=True, lr=0.0005, lr_encoder=5e-05, batch_size=2, weight_decay=0.0001, epochs=10, lr_drop=100, clip_max_norm=0.1, lr_vit_layer_decay=0.8, lr_component_decay=0.7, do_benchmark=False, dropout=0, drop_path=0.0, drop_mode='standard', drop_schedule='constant', cutoff_epoch=0, pretrained_encoder=None, pretrain_weights='rf-detr-seg-small.pt', pretrain_exclude_keys=None, pretrain_keys_modify_to_load=None, pretrained_distiller=None, encoder='dinov2_windowed_small', vit_encoder_num_layers=12, window_block_indexes=None, position_embedding='sine', out_feature_indexes=[3, 6, 9, 12], freeze_encoder=False, layer_norm=True, rms_norm=False, backbone_lora=False, force_no_pretrain=False, dec_layers=4, dim_feedforward=2048, hidden_dim=256, sa_nheads=8, ca_nheads=16, num_queries=100, gro

fatal: to nie jest repozytorium gita (ani żaden z katalogów nadrzędnych): .git


KeyboardInterrupt: 

## 4. Logika dominacji (post-processing)

Po inference RF-DETR zwraca bboxe (lub maski) dla każdego landmarku. Dla dominacji potrzebujemy:
- **Crux Cordis** (class_id=3)
- **Distal RCA** (4)
- **Distal LCx** (5)

**Reguła:**
- Środek bboxa = reprezentacja punktu landmarku.
- `d_crux_rca` = odległość euklidesowa (środek Crux, środek Distal RCA).
- `d_crux_lcx` = odległość (Crux, Distal LCx).
- **Right dominance:** `d_crux_rca < d_crux_lcx` (koniec RCA bliżej crux).
- **Left dominance:** `d_crux_lcx < d_crux_rca` (koniec LCx bliżej crux).
- Jeśli brak któregoś z detekcji — fallback (np. confidence-weighted lub "unknown").

In [None]:
import numpy as np
from typing import Optional, Literal

def bbox_center(xyxy: np.ndarray) -> np.ndarray:
    """xyxy [x1,y1,x2,y2] -> [cx, cy]."""
    return np.array([(xyxy[0] + xyxy[2]) / 2, (xyxy[1] + xyxy[3]) / 2])

def landmark_detections_to_centers(detections) -> dict:
    """Ze supervision Detections wyciąga centra bboxów per class_id (1–5)."""
    centers = {}
    if hasattr(detections, "xyxy") and detections.xyxy is not None and len(detections.xyxy):
        for i, cid in enumerate(detections.class_id):
            cid_int = int(cid) if hasattr(cid, "__int__") else cid
            centers[cid_int] = bbox_center(detections.xyxy[i])
    return centers

def dominance_from_landmarks(
    centers: dict,
    crux_id: int = LANDMARK_ID_CRUX,
    distal_rca_id: int = LANDMARK_ID_DISTAL_RCA,
    distal_lcx_id: int = LANDMARK_ID_DISTAL_LCX,
) -> Literal["right", "left", "unknown"]:
    """
    Right: dystalny RCA bliżej Crux. Left: dystalny LCx bliżej Crux.
    """
    if crux_id not in centers:
        return "unknown"
    crux = np.array(centers[crux_id])
    d_rca = np.inf
    d_lcx = np.inf
    if distal_rca_id in centers:
        d_rca = np.linalg.norm(crux - np.array(centers[distal_rca_id]))
    if distal_lcx_id in centers:
        d_lcx = np.linalg.norm(crux - np.array(centers[distal_lcx_id]))
    if d_rca == np.inf and d_lcx == np.inf:
        return "unknown"
    if d_rca < d_lcx:
        return "right"
    if d_lcx < d_rca:
        return "left"
    return "unknown"

In [None]:
# Przykład użycia na wyjściu modelu (detections = sv.Detections z model_landmark.predict(...))
# centers = landmark_detections_to_centers(detections)
# dom = dominance_from_landmarks(centers)
# print("Dominance:", dom)

## 5. Pełna inferencja: obraz → detekcje landmarków → dominacja

Dla każdej detekcji per klasa można brać **najwyższy confidence** (jedna reprezentacja punktu na klasę). Integracja z pipeline: zamiast `side_classifier` można użyć wyjścia tego modelu + `dominance_from_landmarks`.

In [None]:
def landmarks_to_centers_best_per_class(detections, class_ids: list = None):
    """
    Dla każdej klasy landmarku (1–5) wybiera detekcję z najwyższym confidence,
    zwraca dict class_id -> center [cx, cy].
    """
    if class_ids is None:
        class_ids = list(range(1, 6))
    centers = {}
    if detections.xyxy is None or len(detections.xyxy) == 0:
        return centers
    conf = getattr(detections, "confidence", None)
    if conf is None:
        conf = np.ones(len(detections.class_id))
    for cid in class_ids:
        idx = np.where(np.array(detections.class_id) == cid)[0]
        if len(idx) == 0:
            continue
        best_i = idx[np.argmax(conf[idx])]
        centers[int(cid)] = bbox_center(detections.xyxy[best_i])
    return centers

# Pełny flow:
# detections = model_landmark.predict(image, threshold=0.3)
# centers = landmarks_to_centers_best_per_class(detections)
# dominance = dominance_from_landmarks(centers)
# print("Dominance:", dominance)