In [43]:
from pathlib import Path
import cv2
import numpy as np
import json
import shutil, os

In [3]:
ROOT = Path("../") 

# 1. YOLO Labels for Kvasir Data:

In [21]:
kvasir_images_path =  ROOT / "data/kvasir-seg/images" 
kvasir_bboxes_path =  ROOT / "data/kvasir-seg/kavsir_bboxes.json"

with open(kvasir_bboxes_path, "r", encoding="utf-8") as f:
    kvasir_labels = json.load(f)


In [22]:
kvasir_labels

{'cju0qkwl35piu0993l0dewei2': {'height': 529,
  'width': 622,
  'bbox': [{'label': 'polyp',
    'xmin': 38,
    'ymin': 5,
    'xmax': 430,
    'ymax': 338}]},
 'cju0qoxqj9q6s0835b43399p4': {'height': 1070,
  'width': 1348,
  'bbox': [{'label': 'polyp',
    'xmin': 194,
    'ymin': 284,
    'xmax': 913,
    'ymax': 1049}]},
 'cju0qx73cjw570799j4n5cjze': {'height': 529,
  'width': 619,
  'bbox': [{'label': 'polyp',
    'xmin': 187,
    'ymin': 14,
    'xmax': 543,
    'ymax': 526}]},
 'cju0roawvklrq0799vmjorwfv': {'height': 528,
  'width': 622,
  'bbox': [{'label': 'polyp',
    'xmin': 486,
    'ymin': 42,
    'xmax': 622,
    'ymax': 339},
   {'label': 'polyp', 'xmin': 329, 'ymin': 71, 'xmax': 437, 'ymax': 150},
   {'label': 'polyp', 'xmin': 619, 'ymin': 151, 'xmax': 622, 'ymax': 285}]},
 'cju0rx1idathl0835detmsp84': {'height': 528,
  'width': 617,
  'bbox': [{'label': 'polyp',
    'xmin': 292,
    'ymin': 154,
    'xmax': 481,
    'ymax': 375}]},
 'cju0s2a9ekvms080138tjjpxr': {'height

In [37]:
# --------- utilities ---------
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}

def list_images(d: Path):
    return sorted(p for p in d.iterdir() if p.is_file() and p.suffix.lower() in IMG_EXTS)

def parse_bbox_txt(txt_path: Path | None):
    """Accepts lines like: 'polyp 773 760 1056 904' or '773 760 1056 904'."""
    if not txt_path or not txt_path.exists():
        return np.zeros((0,4), np.float32), np.zeros((0,), np.int64)
    boxes = []
    with txt_path.open("r") as f:
        for line in f:
            parts = line.strip().split()
            if not parts: 
                continue
            if parts[0].isalpha():
                parts = parts[1:]
            if len(parts) < 4:
                continue
            x1, y1, x2, y2 = map(float, parts[:4])
            boxes.append([x1, y1, x2, y2])
    if not boxes:
        return np.zeros((0,4), np.float32), np.zeros((0,), np.int64)
    return np.array(boxes, np.float32), np.zeros((len(boxes),), np.int64)  # class 0 = polyp

def clamp_boxes_xyxy(boxes, w, h):
    if boxes.size == 0:
        return boxes
    b = boxes.copy()
    b[:, [0,2]] = np.clip(b[:, [0,2]], 0, w - 1)
    b[:, [1,3]] = np.clip(b[:, [1,3]], 0, h - 1)
    b[:, [0,2]] = np.sort(b[:, [0,2]], axis=1)
    b[:, [1,3]] = np.sort(b[:, [1,3]], axis=1)
    keep = (b[:, 2] > b[:, 0]) & (b[:, 3] > b[:, 1])
    return b[keep]

def xyxy_to_yolo(boxes_xyxy, w, h):
    if boxes_xyxy.size == 0:
        return np.zeros((0,4), np.float32)
    x1, y1, x2, y2 = [boxes_xyxy[:, i] for i in range(4)]
    bw, bh = (x2 - x1), (y2 - y1)
    cx, cy = x1 + bw/2.0, y1 + bh/2.0
    return np.stack([cx/w, cy/h, bw/w, bh/h], axis=1).astype(np.float32)

def write_yolo_label_file(label_path: Path, rows):
    label_path.parent.mkdir(parents=True, exist_ok=True)
    with label_path.open("w") as f:
        for cls_id, (cx, cy, bw, bh) in rows:
            f.write(f"{cls_id} {cx:.6f} {cy:.6f} {bw:.6f} {bh:.6f}\n")

def index_txts_one_dir(bbox_dir: Path | None):
    if not (bbox_dir and bbox_dir.exists()):
        return {}
    return {p.stem: p for p in bbox_dir.iterdir() if p.is_file() and p.suffix.lower() == ".txt"}


In [38]:
def generate_yolo_labels_from_json(
    images_dir: Path,
    labels_dict: dict,
    out_labels_dir: Path | None = None,
    prefer_json_dims: bool = True,
    overwrite: bool = True,
    verbose: bool = True,
):
    if not images_dir.exists():
        raise FileNotFoundError(images_dir)

    out_labels_dir = out_labels_dir or (images_dir.parent / f"{images_dir.name}_yolo_labels")
    out_labels_dir.mkdir(parents=True, exist_ok=True)

    imgs = list_images(images_dir)
    total = pos = neg = bad = 0

    for img in imgs:
        stem = img.stem
        dest_lbl = out_labels_dir / f"{stem}.txt"
        if not overwrite and dest_lbl.exists():
            total += 1
            if dest_lbl.stat().st_size > 0: pos += 1
            else: neg += 1
            continue

        entry = labels_dict.get(stem)

        # choose dimensions
        if prefer_json_dims and entry and "width" in entry and "height" in entry:
            W, H = int(entry["width"]), int(entry["height"])
        else:
            im = cv2.imread(str(img))
            if im is None:
                bad += 1
                write_yolo_label_file(dest_lbl, [])
                total += 1; neg += 1
                if verbose: print(f"[WARN] unreadable image: {img}")
                continue
            H, W = im.shape[:2]

        if entry and entry.get("bbox"):
            xyxy = np.array([[b["xmin"], b["ymin"], b["xmax"], b["ymax"]] for b in entry["bbox"]], dtype=np.float32)
            xyxy = clamp_boxes_xyxy(xyxy, W, H)
            yolo = xyxy_to_yolo(xyxy, W, H)
            rows = [(0, t) for t in yolo]  # class 0 = polyp
            write_yolo_label_file(dest_lbl, rows)
            pos += int(len(rows) > 0); neg += int(len(rows) == 0)
        else:
            write_yolo_label_file(dest_lbl, [])
            neg += 1

        total += 1

    if verbose:
        print(f"[{images_dir.name}] → {out_labels_dir}")
        print(f"TOTAL={total}  POS={pos}  NEG={neg}  BAD={bad}")

    return out_labels_dir

In [41]:
kvasir_yolo_labels_path = generate_yolo_labels_from_json(kvasir_images_path, kvasir_labels)

[images] → ..\data\kvasir-seg\images_yolo_labels
TOTAL=1000  POS=1000  NEG=0  BAD=0


In [42]:
kvasir_yolo_labels_path

WindowsPath('../data/kvasir-seg/images_yolo_labels')

# 2. YOLO Labels for Polyps Data (Each Folder Generated Separately):


### 2.A. Generate Dictionary of All Folders and Paths

In [5]:
train_path = ROOT / "data/detection2/train"
valid_path = ROOT / "data/detection2/valid"
test_path = ROOT / "data/detection2/valid"

def _first_child_startswith(parent: Path, prefix: str, exclude_prefixes=()):
    """Return the first subdir whose name starts with `prefix` (case-insensitive) and
       does NOT start with any of the `exclude_prefixes`."""
    pref = prefix.lower()
    exclude = tuple(ep.lower() for ep in exclude_prefixes)
    for p in sorted(parent.iterdir()):
        if p.is_dir():
            name = p.name.lower()
            if name.startswith(pref) and not any(name.startswith(ep) for ep in exclude):
                return p
    return None

def build_sources_dict_from_schema(split_root: Path):
    """
    Build a dict of {name: {type, images_dir, bbox_dir}} for a split folder.

    - 'single' → top-level images/ (+ bbox/ if exists)
    - 'seqXX'  → seq with images_* and bbox_* (ignoring masks_* and bbox_image_*)
                  -> type='seq_pos' if bbox_* exists, else 'seq_neg'
    - 'seqXX_neg' → negatives: images directly under seq folder, no bbox (type='seq_neg')
    """
    out = {}

    # 1) Top-level single set
    images_dir = split_root / "images_single"
    bbox_dir   = split_root / "bbox"
    if images_dir.exists():
        out["single"] = {
            "type": "single",
            "images_dir": images_dir,
            "bbox_dir": bbox_dir if bbox_dir.exists() else None
        }

    # 2) Sequences (train/valid/test have seq/; test may have seq_test/)
    seq_root = None
    for cand in ("seq", "seq_test"):
        cand_path = split_root / cand
        if cand_path.exists():
            seq_root = cand_path
            break

    if seq_root:
        for seq_dir in sorted([p for p in seq_root.iterdir() if p.is_dir()]):
            name = seq_dir.name
            is_neg = name.lower().endswith("_neg")

            if is_neg:
                # negatives: images directly live in seq_dir; no bbox
                out[name] = {
                    "type": "seq_neg",
                    "images_dir": seq_dir,
                    "bbox_dir": None
                }
            else:
                # positives: look for images_* and bbox_*; ignore masks_* and bbox_image_*
                images_sub = _first_child_startswith(seq_dir, "images")
                # for bbox, exclude 'bbox_image_*'
                bbox_sub   = _first_child_startswith(seq_dir, "bbox", exclude_prefixes=("bbox_image",))

                if images_sub and images_sub.exists():
                    out[name] = {
                        "type": "seq_pos" if (bbox_sub and bbox_sub.exists()) else "seq_neg",
                        "images_dir": images_sub,
                        "bbox_dir": bbox_sub if (bbox_sub and bbox_sub.exists()) else None
                    }
                else:
                    # fallback: treat as negative if no images_* folder
                    out[name] = {
                        "type": "seq_neg",
                        "images_dir": seq_dir,
                        "bbox_dir": None
                    }

    return out

train_sources = build_sources_dict_from_schema(ROOT / "data/detection2/train")
valid_sources = build_sources_dict_from_schema(ROOT / "data/detection2/valid")
test_sources  = build_sources_dict_from_schema(ROOT / "data/detection2/test")

In [None]:
# print to see
test_sources

{'single': {'type': 'single',
  'images_dir': WindowsPath('../data/detection2/test/images_single'),
  'bbox_dir': WindowsPath('../data/detection2/test/bbox')},
 'seq15': {'type': 'seq_pos',
  'images_dir': WindowsPath('../data/detection2/test/seq_test/seq15/images_seq15'),
  'bbox_dir': WindowsPath('../data/detection2/test/seq_test/seq15/bbox_seq15')},
 'seq15_neg': {'type': 'seq_neg',
  'images_dir': WindowsPath('../data/detection2/test/seq_test/seq15_neg'),
  'bbox_dir': None},
 'seq16': {'type': 'seq_pos',
  'images_dir': WindowsPath('../data/detection2/test/seq_test/seq16/images_seq16'),
  'bbox_dir': WindowsPath('../data/detection2/test/seq_test/seq16/bbox_seq16')},
 'seq16_neg': {'type': 'seq_neg',
  'images_dir': WindowsPath('../data/detection2/test/seq_test/seq16_neg'),
  'bbox_dir': None},
 'seq17': {'type': 'seq_pos',
  'images_dir': WindowsPath('../data/detection2/test/seq_test/seq17/images_seq17'),
  'bbox_dir': WindowsPath('../data/detection2/test/seq_test/seq17/bbox_seq17

### 2.B. PolypGen: Generate YOLO Labels per Folder

In [None]:
# --------- core ---------
def generate_yolo_labels_for_folder(
    images_dir: Path,
    bbox_dir: Path | None = None,
    out_labels_dir: Path | None = None,
    selection_fn=None,
    overwrite: bool = True,
    verbose: bool = True,
):
    if not images_dir or not images_dir.exists():
        raise FileNotFoundError(f"images_dir not found: {images_dir}")

    out_labels_dir = out_labels_dir or (images_dir.parent / f"{images_dir.name}_yolo_labels")
    out_labels_dir.mkdir(parents=True, exist_ok=True)

    lbl_index = index_txts_one_dir(bbox_dir)
    imgs = list_images(images_dir)
    if selection_fn:
        imgs = selection_fn(imgs)

    total = pos = neg = bad = 0

    for img_path in imgs:
        dest_lbl = out_labels_dir / f"{img_path.stem}.txt"
        if not overwrite and dest_lbl.exists():
            total += 1
            if dest_lbl.stat().st_size > 0: pos += 1
            else: neg += 1
            continue

        im = cv2.imread(str(img_path))
        if im is None:
            bad += 1
            write_yolo_label_file(dest_lbl, [])  # keep pipeline robust
            total += 1; neg += 1
            if verbose: print(f"[WARN] unreadable image: {img_path}")
            continue

        h, w = im.shape[:2]
        lbl_path = lbl_index.get(img_path.stem)
        boxes_xyxy, labels = parse_bbox_txt(lbl_path)
        boxes_xyxy = clamp_boxes_xyxy(boxes_xyxy, w, h)
        yolo_boxes = xyxy_to_yolo(boxes_xyxy, w, h)
        rows = [(int(c), wh) for c, wh in zip(labels.tolist(), yolo_boxes)]

        write_yolo_label_file(dest_lbl, rows)
        total += 1
        if rows: pos += 1
        else:    neg += 1

    if verbose:
        print(f"[{images_dir.name}] → {out_labels_dir}")
        print(f"TOTAL={total}  POS={pos}  NEG={neg}  BAD={bad}")

    return out_labels_dir


In [17]:
def generate_yolo_labels_for_all_folders(sources_dict, **kwargs):
    if not sources_dict:
        return
    for _, meta in sources_dict.items():
        images_dir = meta["images_dir"]
        bbox_dir   = meta.get("bbox_dir")
        generate_yolo_labels_for_folder(images_dir, bbox_dir, **kwargs)

# Run for each split (skip if None/empty)
for sources in (train_sources, valid_sources, test_sources):
    generate_yolo_labels_for_all_folders(sources, overwrite=True, verbose=True)


[images_single] → ..\data\detection2\train\images_single_yolo_labels
TOTAL=1105  POS=1013  NEG=92  BAD=0
[images_seq10] → ..\data\detection2\train\seq\seq10\images_seq10_yolo_labels
TOTAL=25  POS=7  NEG=18  BAD=0
[images_seq11] → ..\data\detection2\train\seq\seq11\images_seq11_yolo_labels
TOTAL=228  POS=136  NEG=92  BAD=0
[images_seq12] → ..\data\detection2\train\seq\seq12\images_seq12_yolo_labels
TOTAL=250  POS=250  NEG=0  BAD=0
[images_seq13] → ..\data\detection2\train\seq\seq13\images_seq13_yolo_labels
TOTAL=250  POS=199  NEG=51  BAD=0
[seq16_neg] → ..\data\detection2\train\seq\seq16_neg_yolo_labels
TOTAL=141  POS=0  NEG=141  BAD=0
[seq1_neg] → ..\data\detection2\train\seq\seq1_neg_yolo_labels
TOTAL=315  POS=0  NEG=315  BAD=0
[images_seq2] → ..\data\detection2\train\seq\seq2\images_seq2_yolo_labels
TOTAL=63  POS=60  NEG=3  BAD=0
[seq22_neg] → ..\data\detection2\train\seq\seq22_neg_yolo_labels
TOTAL=82  POS=0  NEG=82  BAD=0
[seq2_neg] → ..\data\detection2\train\seq\seq2_neg_yolo_labe

# 3. Combine Images and YOLO Labels for Training

In [58]:
#train_list = ['images_single']
train_list = list(train_sources.keys())
val_list = ['single','seq14', 'seq15_neg', 'kvasir_images']

In [62]:
def _labels_sibling(images_dir: Path) -> Path:
    return images_dir.parent / f"{images_dir.name}_yolo_labels"

def _copy_or_link(src: Path, dst: Path, mode: str = "copy"):
    dst.parent.mkdir(parents=True, exist_ok=True)
    if dst.exists():
        return
    if mode == "hardlink":
        os.link(src, dst)  # same volume only
    elif mode == "symlink":
        os.symlink(src, dst)
    else:
        shutil.copy2(src, dst)

def _ensure_empty(path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    if not path.exists():
        path.write_text("")

def _resolve_images_dir(
    key: str,
    sources_dicts: list[dict],           # e.g., [train_sources, valid_sources]
    extra_map: dict[str, Path] | None,   # e.g., {"kvasir_images": ROOT/"data/kvasir-seg/images"}
) -> Path:
    # 1) exact key in any sources dict
    for sd in sources_dicts:
        if sd and key in sd:
            return sd[key]["images_dir"]

    # 2) match by folder name (images_dir.name) or parent folder name (seq14, etc.)
    for sd in sources_dicts:
        if not sd: 
            continue
        for _, meta in sd.items():
            imgd: Path = meta["images_dir"]
            if imgd.name == key or imgd.parent.name == key:
                return imgd

    # 3) fallback: extra map (e.g., Kvasir)
    if extra_map and key in extra_map:
        return extra_map[key]

    raise KeyError(f"Could not resolve images_dir for key='{key}'")

def merge_yolo_split_keys(
    dest_split_root: Path,
    keys: list[str],
    sources_dicts: list[dict],
    extra_map: dict[str, Path] | None = None,
    link_mode: str = "copy",
    verbose: bool = True,
):
    images_out = dest_split_root / "images"
    labels_out = dest_split_root / "labels"
    images_out.mkdir(parents=True, exist_ok=True)
    labels_out.mkdir(parents=True, exist_ok=True)

    total = pos = neg = miss = 0

    for key in keys:
        images_dir = _resolve_images_dir(key, sources_dicts, extra_map)
        labels_dir = _labels_sibling(images_dir)
        if verbose:
            print(f"[{dest_split_root.name}] add {key}  imgs={images_dir}  lbls={labels_dir}")

        imgs = list_images(images_dir)
        for img in imgs:
            stem, ext = img.stem, img.suffix.lower()
            dst_img = images_out / f"{key}__{stem}{ext}"
            dst_lbl = labels_out / f"{key}__{stem}.txt"

            _copy_or_link(img, dst_img, link_mode)

            lbl_src = labels_dir / f"{stem}.txt"
            if lbl_src.exists():
                shutil.copy2(lbl_src, dst_lbl)
                if dst_lbl.stat().st_size > 0: pos += 1
                else:                           neg += 1
            else:
                _ensure_empty(dst_lbl)
                neg += 1
                miss += 1

            total += 1

    if verbose:
        print(f"[{dest_split_root.name}] images -> {images_out}")
        print(f"[{dest_split_root.name}] labels -> {labels_out}")
        print(f"TOTAL={total}  POS={pos}  NEG={neg}  MISSING_LABELS={miss}")

    return images_out, labels_out

# ---------------- USE IT ----------------
dest_root =  ROOT / "data" / "detection2" / "yolo_split2"
train_dest = dest_root / "train"
val_dest   = dest_root / "val"


# external paths only for items not in train/valid sources (e.g., Kvasir)
extra_map = {
    "kvasir_images": (ROOT / "data/kvasir-seg/images"),
}

# merge (uses train_sources/valid_sources automatically)
merge_yolo_split_keys(train_dest, train_list, [train_sources], extra_map, link_mode="copy")
merge_yolo_split_keys(val_dest,   val_list,   [valid_sources], extra_map, link_mode="copy")

[train] add single  imgs=..\data\detection2\train\images_single  lbls=..\data\detection2\train\images_single_yolo_labels
[train] add seq10  imgs=..\data\detection2\train\seq\seq10\images_seq10  lbls=..\data\detection2\train\seq\seq10\images_seq10_yolo_labels
[train] add seq11  imgs=..\data\detection2\train\seq\seq11\images_seq11  lbls=..\data\detection2\train\seq\seq11\images_seq11_yolo_labels
[train] add seq12  imgs=..\data\detection2\train\seq\seq12\images_seq12  lbls=..\data\detection2\train\seq\seq12\images_seq12_yolo_labels
[train] add seq13  imgs=..\data\detection2\train\seq\seq13\images_seq13  lbls=..\data\detection2\train\seq\seq13\images_seq13_yolo_labels
[train] add seq16_neg  imgs=..\data\detection2\train\seq\seq16_neg  lbls=..\data\detection2\train\seq\seq16_neg_yolo_labels
[train] add seq1_neg  imgs=..\data\detection2\train\seq\seq1_neg  lbls=..\data\detection2\train\seq\seq1_neg_yolo_labels
[train] add seq2  imgs=..\data\detection2\train\seq\seq2\images_seq2  lbls=..\data

(WindowsPath('../data/detection2/yolo_split2/val/images'),
 WindowsPath('../data/detection2/yolo_split2/val/labels'))

In [51]:
from pathlib import Path

path_val = (ROOT / "data" / "detection2" / "yolo_split2").resolve().as_posix()
yaml_text = f"""\
path: {path_val}
train: train/images
val: val/images
# test: test/images
nc: 1
names: [polyp]
"""

dest = ROOT / "configs" / "data2.yaml"
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_text(yaml_text, encoding="utf-8")
print("Wrote:", dest)


Wrote: ..\configs\data2.yaml
