# Label Studio to COCO Converter (LS2COCO.ipynb) — ARI3129 Assignment 2025/26

This notebook provides a structured and reliable method for converting **Label Studio (JSON) annotation exports** into a **COCO-formatted dataset** suitable for model training within the **ARI3129 Object Detection and Classification Assignment (2025/26)**.

## Preliminaries 

### Purpose of this Conversion
Label Studio produces annotation files in its own JSON format, which cannot be used directly by object detection frameworks such as **YOLO**, **Detectron2**, or **MMDetection**.  
The **COCO (Common Objects in Context)** format, on the other hand, is a well-defined and widely adopted standard for representing object detection datasets.  
This notebook bridges the gap by automatically converting and restructuring the exported data into the correct COCO format, ensuring full compatibility with training pipelines.

### Functionality Overview
The notebook:
- Accepts a **Label Studio `.json` export** and the corresponding **`.zip` of annotated images**.  
- Validates, cleans, and normalises the annotations to remove inconsistencies or formatting issues.  
- Generates the following outputs:  
  - **`COCO.json`** – a unified dataset containing all annotations in COCO format.  
  - **`COCO_[attribute].json`** – additional datasets automatically derived according to selected attributes such as *view_angle*, *mounting*, *condition*, and *sign_shape*.  
- Includes an optional **train/validation/test splitting utility** that produces reproducible dataset partitions with configurable ratios.

### When and How to Use
Run this notebook **after completing annotation in Label Studio** and **before initiating model training**.  
It ensures that all annotations are consistent, properly structured, and formatted in accordance with COCO standards, allowing for seamless integration into YOLO and other object detection frameworks.

<div style="border:3px solid #c00; background:#fff7f7; padding:12px; border-radius:6px">
    <strong style="color:#000000">⚠️ IMPORTANT</strong>
    <ul>
        <li style="color:#000000"><strong>Do NOT use "Run All".</strong> Execute cells individually in order to avoid re-initialising the UI or overwriting the environment.</li>
        <li style="color:#000000">If you see unexpected errors, re-run only the specific setup or UI cell needed — do not re-run all notebook cells.</li>
    </ul>
</div>

## 1) Environment and Dependency Setup

This section ensures that all required Python packages are available before the notebook is run.  
If any dependency such as `ipywidgets` or `IPython` is missing, it is automatically installed using `pip`.

In [None]:
import importlib
import subprocess
import sys

def ensure_package(pkg: str, import_name: str | None = None):
    try:
        return importlib.import_module(import_name or pkg)
    except ImportError:
        print(f"Installing missing package: {pkg}")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
        return importlib.import_module(import_name or pkg)

# --- Core dependencies ---
ipywidgets = ensure_package("ipywidgets")
IPython_display = ensure_package("IPython.display", "IPython.display")

# --- Standard library ---
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Tuple, Set
from collections import defaultdict
from datetime import datetime, timezone
from time import monotonic
import hashlib, json, os, shutil, zipfile

# --- Imports from installed packages ---
from IPython.display import display, Markdown, clear_output
import ipywidgets as widgets

print("All required packages are installed and imported.")

## 2) Singleton Guard and Conversion Setup

This code snippet makes sure the LS2COCO app only starts **once** per session.  
If the app is already running (like after re-running a cell), it reuses the existing version instead of creating a new one.  
This avoids problems and keeps the notebook clean.

After that, the notebook creates the folders it needs (`Temp`, `Outputs`, and `Outputs/images`) and sets up some helper functions that will be used later during the conversion process.

In [None]:
# If an app instance already exists, reuse it and skip setup.
_SKIP_INIT = False
if "__LS2COCO_APP__" in globals():
    app = globals()["__LS2COCO_APP__"]
    with app["status_output"]:
        clear_output(wait=False)
    display(app["controls"])
    _SKIP_INIT = True

if not _SKIP_INIT:
    # ---- CONSTANTS & DIRECTORIES ----
    ROOT_DIR   = Path.cwd()
    TEMP_DIR   = ROOT_DIR / "Temp"          # was: Inputs
    OUTPUT_DIR = ROOT_DIR / "Outputs"
    IMAGES_DIR = OUTPUT_DIR / "images"
    LOCK_FILE  = OUTPUT_DIR / ".run.lock"

    for d in [TEMP_DIR, OUTPUT_DIR, IMAGES_DIR]:
        d.mkdir(parents=True, exist_ok=True)

    IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"}

    # ---- HELPERS: geometry, normalisation, COCO conversion ----
    def _bbox_pct_to_abs(val: Dict[str, Any], W: float, H: float) -> Tuple[float, float, float, float]:
        x = float(val["x"]) * 0.01 * W
        y = float(val["y"]) * 0.01 * H
        w = float(val["width"]) * 0.01 * W
        h = float(val["height"]) * 0.01 * H
        return x, y, w, h

    def _float_box_key(val: Dict[str, Any]) -> Tuple[int, int, int, int]:
        return (round(val.get("x", 0), 4),
                round(val.get("y", 0), 4),
                round(val.get("width", 0), 4),
                round(val.get("height", 0), 4))

    def _norm_attr_name(name: str) -> str:
        name = (name or "").strip()
        return "".join(ch.lower() if ch.isalnum() else "_" for ch in name).strip("_")

    def _has_region_geometry(val: Dict[str, Any]) -> bool:
        return all(k in val for k in ("x", "y", "width", "height"))

    def convert_labelstudio_to_coco(ls_tasks: List[Dict[str, Any]]) -> Dict[str, Any]:
        images, annotations = [], []
        categories_by_name: Dict[str, int] = {}
        next_ann_id, next_img_id = 1, 1

        # Category discovery
        label_names = set()
        for task in ls_tasks:
            for ann in task.get("annotations", []):
                for res in ann.get("result", []):
                    if res.get("type") == "rectanglelabels":
                        labs = (res.get("value", {}) or {}).get("rectanglelabels") or []
                        if labs:
                            label_names.add(labs[0])
        if not label_names:
            label_names = {"object"}
        for i, name in enumerate(sorted(label_names), start=1):
            categories_by_name[name] = i

        # Build images + annotations
        for task in ls_tasks:
            file_name = task.get("file_upload") or Path(task.get("data", {}).get("image", "")).name

            # pick any dims
            W = H = None
            for ann in task.get("annotations", []):
                for res in ann.get("result", []):
                    ow, oh = res.get("original_width"), res.get("original_height")
                    if ow and oh:
                        W, H = float(ow), float(oh)
                        break
                if W and H:
                    break
            if not (W and H):
                continue

            image_id = next_img_id
            next_img_id += 1
            img_obj: Dict[str, Any] = {"id": image_id, "file_name": file_name, "width": int(W), "height": int(H)}
            images.append(img_obj)

            # Prepare holders
            attrs_by_region_id: Dict[str, Dict[str, Any]] = {}
            attrs_by_box_key: Dict[Tuple[int, int, int, int], Dict[str, Any]] = {}
            image_level_attrs: Dict[str, Any] = {}

            # Collect non-rectangle results
            for ann in task.get("annotations", []):
                for res in ann.get("result", []):
                    rtype = res.get("type")
                    if rtype == "rectanglelabels":
                        continue
                    val = res.get("value", {}) or {}
                    if not isinstance(val, dict):
                        continue
                    from_name = _norm_attr_name(res.get("from_name") or "")
                    if not from_name:
                        continue

                    # payload extraction
                    if rtype in ("choices", "labels", "select"):
                        payload = val.get("choices") or val.get("labels") or []
                    elif rtype in ("textarea", "text"):
                        t = val.get("text")
                        payload = t if isinstance(t, list) else (t or val.get("value"))
                    elif rtype == "number":
                        payload = val.get("number")
                    elif rtype == "rating":
                        payload = val.get("rating")
                    else:
                        payload = val  # fallback

                    # Decide: region-linked or GLOBAL
                    linked_region_id = res.get("from_id")
                    if linked_region_id or _has_region_geometry(val):
                        key = _float_box_key(val)
                        if linked_region_id:
                            attrs_by_region_id.setdefault(linked_region_id, {})
                            attrs_by_region_id[linked_region_id][from_name] = payload
                        if any(key):
                            attrs_by_box_key.setdefault(key, {})
                            attrs_by_box_key[key][from_name] = payload
                    else:
                        # GLOBAL (image-level) attribute
                        if isinstance(payload, list):
                            existing = image_level_attrs.get(from_name, [])
                            if not isinstance(existing, list):
                                existing = [existing] if existing is not None else []
                            image_level_attrs[from_name] = list(dict.fromkeys(existing + payload))
                        else:
                            image_level_attrs[from_name] = payload

            if image_level_attrs:
                img_obj["attributes"] = image_level_attrs

            # Emit rectangles with attributes
            for ann in task.get("annotations", []):
                for res in ann.get("result", []):
                    if res.get("type") != "rectanglelabels":
                        continue
                    rect_id = res.get("id")
                    val = res.get("value", {}) or {}
                    labs = val.get("rectanglelabels") or ["object"]
                    label = labs[0]
                    cat_id = categories_by_name[label]
                    x, y, w, h = _bbox_pct_to_abs(val, W, H)

                    merged: Dict[str, Any] = {}
                    if rect_id and rect_id in attrs_by_region_id:
                        merged.update(attrs_by_region_id[rect_id])
                    else:
                        k = _float_box_key(val)
                        if k in attrs_by_box_key:
                            merged.update(attrs_by_box_key[k])

                    alias_map = {
                        "view_angle": {"view_angle", "viewangle", "view", "view_angles"},
                        "mounting": {"mounting", "mount"},
                        "condition": {"condition", "state"},
                        "sign_shape": {"sign_shape", "shape", "signshape"},
                    }
                    surfaced = {}
                    for key, aliases in alias_map.items():
                        for a in aliases:
                            if a in merged:
                                surfaced[key] = merged[a]
                                break

                    attrs_out = dict(merged)
                    for k in ("view_angle", "mounting", "condition", "sign_shape"):
                        if k not in attrs_out:
                            attrs_out[k] = []
                    attrs_out.update(surfaced)

                    ann_obj = {
                        "id": next_ann_id,
                        "image_id": image_id,
                        "category_id": cat_id,
                        "bbox": [x, y, w, h],
                        "iscrowd": 0,
                        "area": float(w) * float(h),
                        "attributes": attrs_out,
                    }
                    annotations.append(ann_obj)
                    next_ann_id += 1

        coco = {
            "info": {
                "description": "LS→COCO with region + global attributes (view_angle, mounting, condition, sign_shape surfaced on annotations)",
                "date_created": datetime.now(timezone.utc).isoformat(),
            },
            "licenses": [],
            "images": images,
            "annotations": annotations,
            "categories": [{"id": cid, "name": name} for name, cid in categories_by_name.items()],
        }
        return coco

    def _ensure_coco_annotation(src_path: Path, out_path: Path) -> Path:
        with open(src_path, "r", encoding="utf-8") as f:
            data = json.load(f)

        # Already COCO
        if isinstance(data, dict) and {"images", "annotations", "categories"}.issubset(data.keys()):
            changed = False
            for ann in data.get("annotations", []):
                attrs = ann.setdefault("attributes", {})
                for k in ("view_angle", "mounting", "condition", "sign_shape"):
                    if k not in attrs:
                        attrs[k] = []
                        changed = True
            if changed:
                with open(out_path, "w", encoding="utf-8") as f:
                    json.dump(data, f, indent=2)
                return out_path
            return src_path

        # LS array → COCO
        if isinstance(data, list):
            coco = convert_labelstudio_to_coco(data)
            with open(out_path, "w", encoding="utf-8") as f:
                json.dump(coco, f, indent=2)
            print(f"Converted Label Studio JSON → COCO at {out_path}")
            return out_path

        raise ValueError(f"{src_path} is neither valid COCO nor LS array.")

    # ---- REMAP HELPERS ----
    def _build_image_index(images_root: Path):
        by_base = defaultdict(list)
        by_base_lower = defaultdict(list)
        by_stem_lower = defaultdict(list)
        for p in images_root.rglob("*"):
            if p.is_file() and not p.name.startswith("._") and p.suffix.lower() in IMAGE_EXTENSIONS:
                rel = p.relative_to(images_root).as_posix()
                base = p.name
                stem = p.stem
                by_base[base].append(rel)
                by_base_lower[base.lower()].append(rel)
                by_stem_lower[stem.lower()].append(rel)
        return by_base, by_base_lower, by_stem_lower

    def _pick_one(candidates):
        return sorted(candidates, key=lambda s: (len(s), s))[0]

    def remap_coco_file_names(coco_json_path: Path, images_root: Path):
        with open(coco_json_path, "r", encoding="utf-8") as f:
            coco = json.load(f)
        by_base, by_base_lower, by_stem_lower = _build_image_index(images_root)

        missing, fixed = [], 0
        for im in coco.get("images", []):
            fname = im.get("file_name", "")
            if not fname:
                missing.append("(empty)")
                continue

            candidate = images_root / fname
            if candidate.exists():
                continue

            base = Path(fname).name
            stem = Path(fname).stem
            lower_base, lower_stem = base.lower(), stem.lower()
            new_rel = None

            if base in by_base:
                new_rel = _pick_one(by_base[base])
            elif lower_base in by_base_lower:
                new_rel = _pick_one(by_base_lower[lower_base])
            elif lower_stem in by_stem_lower:
                new_rel = _pick_one(by_stem_lower[lower_stem])
            else:
                prefix = stem.split("-")[0].lower()
                if prefix and len(prefix) >= 4:
                    prefixed = []
                    for key_stem, paths in by_stem_lower.items():
                        if key_stem.startswith(prefix):
                            prefixed.extend(paths)
                    if prefixed:
                        new_rel = _pick_one(prefixed)

            if new_rel:
                im["file_name"] = new_rel
                fixed += 1
            else:
                missing.append(fname)

        with open(coco_json_path, "w", encoding="utf-8") as f:
            json.dump(coco, f, indent=2)

        print(f"Remapped {fixed} / {len(coco.get('images', []))} image paths. Missing/Duplicate: {len(missing)}")
        if missing:
            print('Missing/Duplicate examples:', ', '.join(missing[:10]), '...')
        return coco_json_path

    # Stash the environment so the next cell can build the UI.
    globals()["__LS2COCO_ENV__"] = {
        "ROOT_DIR": ROOT_DIR,
        "TEMP_DIR": TEMP_DIR,               
        "OUTPUT_DIR": OUTPUT_DIR,
        "IMAGES_DIR": IMAGES_DIR,
        "LOCK_FILE": LOCK_FILE,
        "IMAGE_EXTENSIONS": IMAGE_EXTENSIONS,
        "helpers": {
            "_ensure_coco_annotation": _ensure_coco_annotation,
            "remap_coco_file_names": remap_coco_file_names,
        },
    }

## 3) Collect Dataset inputs & Conversion to COCO

### What You Need to Upload

1. **Label Studio JSON file (`.json`)**  
   - This should be the *exported annotations file* containing all labelled tasks from Label Studio.  
   - Make sure it includes the annotations from all team members before uploading.  

2. **Images ZIP file (`.zip`)**  
   - This must contain *all the images used in annotation*.  
   - Combine all image folders into one archive before uploading.  
   - The file names in the ZIP must correspond exactly to those referenced in the Label Studio JSON file.

Once both files are uploaded:
- Press **“Run Conversion”** to start the process.  
- The notebook will automatically extract and validate the images, convert the JSON into COCO format, and generate a `COCO.json` file inside the `Outputs` directory.

### Output Files

-  **`COCO.json`** — Final, cleaned dataset in COCO format.  
- NB: A `.bak` file is automatically created when missing images are detected, allowing you to review the original data if needed.


In [None]:
# If an app already exists, show it and stop.
if "__LS2COCO_APP__" in globals():
    app = globals()["__LS2COCO_APP__"]
    with app["status_output"]:
        clear_output(wait=False)
    display(app["controls"])
else:
    # Ensure setup cell has been run
    if "__LS2COCO_ENV__" not in globals():
        raise RuntimeError("Please run the setup cell (1) before this UI cell (2).")

    # Pull environment
    ENV = globals()["__LS2COCO_ENV__"]
    ROOT_DIR   = ENV["ROOT_DIR"]
    TEMP_DIR   = ENV["TEMP_DIR"]          # was: INPUT_DIR
    OUTPUT_DIR = ENV["OUTPUT_DIR"]
    IMAGES_DIR = ENV["IMAGES_DIR"]
    LOCK_FILE  = ENV["LOCK_FILE"]
    IMAGE_EXTENSIONS = ENV["IMAGE_EXTENSIONS"]
    _ensure_coco_annotation = ENV["helpers"]["_ensure_coco_annotation"]
    remap_coco_file_names   = ENV["helpers"]["remap_coco_file_names"]

    # Widgets
    json_uploader = widgets.FileUpload(accept=".json", multiple=False, description="Upload JSON")
    zip_uploader  = widgets.FileUpload(accept=".zip",  multiple=False, description="Upload ZIP")
    run_button    = widgets.Button(description="Run Conversion", button_style="success", icon="check")
    status_output = widgets.Output(layout=widgets.Layout(border="1px solid #ddd", padding="8px"))

    controls = widgets.VBox([
        widgets.HTML("<b>1)</b> Upload the consolidated Label Studio JSON annotation file (include all team members)."),
        json_uploader,
        widgets.HTML("<b>2)</b> Upload a ZIP containing all images referenced by the JSON (merged into one archive)."),
        zip_uploader,
        run_button,
        status_output,
    ])
    display(controls)

    # --- Handler utilities ---
    def _purge_all_click_handlers(btn: widgets.Button):
        try:
            for cb in list(getattr(btn._click_handlers, "callbacks", [])):
                try:
                    btn.on_click(cb, remove=True)
                except Exception:
                    pass
            btn._click_handlers.callbacks = []
        except Exception:
            pass

    _purge_all_click_handlers(run_button)

    _last_click_ts = {"t": 0.0}
    _busy          = {"flag": False}
    LAST_RUN_SIG   = {"v": None}

    def _acquire_file_lock(lock_path: Path) -> bool:
        try:
            lock_path.parent.mkdir(parents=True, exist_ok=True)
            if lock_path.exists():
                return False
            lock_path.write_text(str(datetime.now()))
            return True
        except Exception:
            return False

    def _release_file_lock(lock_path: Path):
        try:
            if lock_path.exists():
                lock_path.unlink()
        except Exception:
            pass

    def _as_bytes(x):
        if isinstance(x, (bytes, bytearray)):
            return bytes(x)
        if isinstance(x, memoryview):
            return x.tobytes()
        try:
            return bytes(x)
        except Exception:
            raise TypeError(f"Unsupported content type for bytes: {type(x)}")

    def _get_upload_bytes(uploader: widgets.FileUpload) -> bytes:
        v = uploader.value
        if not v:
            raise RuntimeError("No file uploaded.")
        if isinstance(v, dict):          # ipywidgets 7
            first = next(iter(v.values()))
        elif isinstance(v, (tuple, list)):  # ipywidgets 8
            first = v[0]
        else:
            raise RuntimeError(f"Unexpected FileUpload.value type: {type(v)}")
        return _as_bytes(first["content"])

    # --- NEW: prune COCO to only include images that physically exist in IMAGES_DIR ---
    def _prune_missing_images(coco_path: Path, images_dir: Path) -> tuple[int, int, int]:
        """
        Returns (kept_images, kept_annotations, removed_images)
        """
        with coco_path.open("r", encoding="utf-8") as f:
            data = json.load(f)

        # Build a case-insensitive set of existing basenames
        existing = {p.name.lower() for p in images_dir.rglob("*") if p.is_file()}

        # Keep only images whose basename exists
        orig_images = data.get("images", [])
        keep_images = []
        kept_ids = set()
        removed = 0
        for img in orig_images:
            fname = Path(img.get("file_name", "")).name.lower()
            if fname in existing:
                keep_images.append(img)
                kept_ids.add(img.get("id"))
            else:
                removed += 1

        # Drop annotations whose image_id was removed
        orig_anns = data.get("annotations", [])
        keep_anns = [a for a in orig_anns if a.get("image_id") in kept_ids]

        # Write back (make backup first)
        backup = coco_path.with_suffix(".json.bak")
        try:
            if backup.exists():
                backup.unlink()
        except Exception:
            pass
        shutil.copy2(coco_path, backup)

        data["images"] = keep_images
        data["annotations"] = keep_anns
        with coco_path.open("w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)

        return len(keep_images), len(keep_anns), removed

    # --- Click handler ---
    def _handle_run(_):
        # Debounce: ignore clicks within 1.0s
        now = monotonic()
        if now - _last_click_ts["t"] < 1.0:
            return
        _last_click_ts["t"] = now

        # Busy gate
        if _busy["flag"]:
            return
        _busy["flag"] = True

        # File lock
        got_lock = _acquire_file_lock(LOCK_FILE)
        if not got_lock:
            _busy["flag"] = False
            return

        run_button.disabled = True
        status_output.clear_output(wait=True)
        with status_output:
            try:
                if not json_uploader.value:
                    raise RuntimeError("Please upload the annotation JSON.")
                if not zip_uploader.value:
                    raise RuntimeError("Please upload the images ZIP.")

                # Content-hash: suppress duplicate work for same inputs
                json_bytes = _get_upload_bytes(json_uploader)
                zip_bytes  = _get_upload_bytes(zip_uploader)
                run_sig = hashlib.sha256(json_bytes + b"|" + zip_bytes).hexdigest()
                if LAST_RUN_SIG["v"] == run_sig:
                    display(Markdown("**Inputs match the last run; no changes made.**"))
                    return
                LAST_RUN_SIG["v"] = run_sig

                # Clean temp/output folders
                for d in [TEMP_DIR, OUTPUT_DIR]:
                    if d.exists():
                        shutil.rmtree(d)
                    d.mkdir(parents=True, exist_ok=True)
                IMAGES_DIR.mkdir(parents=True, exist_ok=True)

                # Save uploads into Temp
                ann_in_path = TEMP_DIR / "input.json"
                zip_path    = TEMP_DIR / "images.zip"
                with open(ann_in_path, "wb") as f:
                    f.write(json_bytes)
                with open(zip_path, "wb") as f:
                    f.write(zip_bytes)

                # Extract and flatten images
                temp_extract = IMAGES_DIR.parent / "_temp_extract"
                if temp_extract.exists():
                    shutil.rmtree(temp_extract)
                temp_extract.mkdir(parents=True, exist_ok=True)
                with zipfile.ZipFile(zip_path, "r") as zf:
                    zf.extractall(temp_extract)

                IMAGES_DIR.mkdir(parents=True, exist_ok=True)
                for p in temp_extract.rglob("*"):
                    if p.is_file():
                        if p.name.startswith("._") or "DS_Store" in p.name:
                            try: p.unlink()
                            except Exception: pass
                            continue
                        if p.suffix.lower() in IMAGE_EXTENSIONS:
                            target = IMAGES_DIR / p.name
                            if not target.exists():
                                shutil.move(str(p), str(target))
                shutil.rmtree(temp_extract, ignore_errors=True)

                # Count images
                n_imgs = sum(1 for p in IMAGES_DIR.rglob("*")
                             if p.suffix.lower() in IMAGE_EXTENSIONS and not p.name.startswith("._"))
                if n_imgs == 0:
                    raise ValueError(f"No valid images found after extracting ZIP into {IMAGES_DIR}")

                # Convert / ensure COCO
                coco_out_path = OUTPUT_DIR / "COCO.json"
                coco_path = _ensure_coco_annotation(ann_in_path, coco_out_path)

                # Remap names to actual files
                coco_path = remap_coco_file_names(coco_path, IMAGES_DIR)

                # NEW: prune any images/annotations whose files are missing
                kept_imgs, kept_anns, removed_imgs = _prune_missing_images(coco_path, IMAGES_DIR)

                display(Markdown("**Your annotations are ready.**"))
                print(f"Temp:     {TEMP_DIR} (will be deleted after this run)")
                print(f"Outputs:  {OUTPUT_DIR}")
                print(f"COCO JSON:{coco_path}")
                print(f"Images:   {IMAGES_DIR} ({n_imgs} files)")
                print(f"Pruned:   removed {removed_imgs} missing/duplicate image entries; kept {kept_imgs} images and {kept_anns} annotations.")
            except Exception as exc:
                display(Markdown(f"**Error:** {exc}"))
            finally:
                # Always clean up Temp and release the lock
                shutil.rmtree(TEMP_DIR, ignore_errors=True)
                _release_file_lock(LOCK_FILE)
                _busy["flag"] = False
                run_button.disabled = False

    # Bind once and register app
    run_button.on_click(_handle_run)
    globals()["__LS2COCO_APP__"] = {
        "controls": controls,
        "run_button": run_button,
        "status_output": status_output,
    }

## 4) Attribute-Specific COCO File Generator

This section enables the creation of **attribute-specific COCO datasets** derived from the main `COCO.json` file produced in the previous step.  
It provides a graphical interface where you can select a particular attribute (*view_angle*, *mounting*, *condition*, or *sign_shape*) and automatically generate new COCO files that contain only the corresponding attribute values.

### Purpose of this Step
In the ARI3129 assignment, you need to train and evaluate models based on specific object attributes rather than the overall dataset.  
For instance, you might wish to train one model for different *sign shapes* and another for *view angles*.  
This tool simplifies that process by extracting attribute-focused subsets of your dataset while maintaining full COCO compatibility.

After running this step, one or more files will be created inside your `Outputs` directory: **`COCO_<attribute>.json`** — each containing annotations filtered by that specific attribute.  

In [None]:
# Reuse paths from earlier section
DATASET_DIR = OUTPUT_DIR
INPUT_JSON_PATH = OUTPUT_DIR / "COCO.json"

# ---- Constants ----
PLACEHOLDER = "Click to select attribute"
ALL_LABEL   = "All (Create JSONs for all attributes)"
ALL_KEYS    = ["view_angle", "mounting", "condition", "sign_shape"]

# ---- IO helpers ----
def load_coco(p: Path) -> Dict[str, Any]:
    with p.open("r", encoding="utf-8") as f:
        return json.load(f)

def save_coco(obj: Dict[str, Any], p: Path) -> None:
    p.parent.mkdir(parents=True, exist_ok=True)
    with p.open("w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2)

# ---- Scan attributes present on annotations ----
def scan_annotation_attributes(coco: Dict[str, Any]) -> Tuple[Set[str], Dict[str, Set[str]]]:
    keys: Set[str] = set()
    values_map: Dict[str, Set[str]] = {}
    for ann in coco.get("annotations", []):
        attrs = ann.get("attributes", {}) or {}
        for k, v in attrs.items():
            keys.add(k)
            if isinstance(v, list):
                vals = [str(x) for x in v if str(x).strip()]
            elif v is None:
                vals = []
            else:
                vals = [str(v)] if str(v).strip() else []
            if vals:
                values_map.setdefault(k, set()).update(vals)
    return keys, values_map

# ---- Core builder (strict: no missing values allowed) ----
def build_attribute_coco_strict(full_coco: Dict[str, Any], attribute_key: str) -> Dict[str, Any]:
    images = full_coco.get("images", [])
    anns   = full_coco.get("annotations", [])
    img_by_id = {im["id"]: im for im in images}

    # Validate: collect image names for any annotation missing/empty attribute
    missing_img_names: List[str] = []
    def _vals_of(v):
        if isinstance(v, list):
            return [str(x) for x in v if str(x).strip()]
        elif v is None:
            return []
        else:
            s = str(v).strip()
            return [s] if s else []

    for ann in anns:
        attrs = ann.get("attributes", {}) or {}
        vals = _vals_of(attrs.get(attribute_key, []))
        if not vals:
            im = img_by_id.get(ann["image_id"], {})
            fname = im.get("file_name", f"<image_id:{ann['image_id']}>")
            missing_img_names.append(fname)

    if missing_img_names:
        missing_img_names = sorted(set(missing_img_names))
        raise ValueError(
            "Missing required attribute values for '{}'.\nThe following image(s) contain "
            "at least one annotation with an empty/missing value:\n- ".format(attribute_key)
            + "\n- ".join(missing_img_names)
        )

    # Collect unique values
    value_set: List[str] = []
    value_to_id: Dict[str, int] = {}
    def _iter_vals():
        for ann in anns:
            vals = _vals_of(ann.get("attributes", {}).get(attribute_key, []))
            for v in vals:
                yield v

    for v in _iter_vals():
        if v not in value_to_id:
            value_to_id[v] = 0
            value_set.append(v)

    value_set_sorted = sorted(value_set)
    value_to_id = {v: i+1 for i, v in enumerate(value_set_sorted)}
    categories = [{"id": cid, "name": v} for v, cid in value_to_id.items()]

    # Build new annotations (explode multi)
    new_annotations: List[Dict[str, Any]] = []
    next_ann_id = 1
    for ann in anns:
        vals = [str(x) for x in (ann.get("attributes", {}).get(attribute_key, []) if isinstance(ann.get("attributes", {}).get(attribute_key, []), list)
                                 else [ann.get("attributes", {}).get(attribute_key, [])]) if str(x).strip()]
        for v in vals:
            new_annotations.append({
                "id": next_ann_id,
                "image_id": ann["image_id"],
                "category_id": value_to_id[v],
                "bbox": ann["bbox"],
                "iscrowd": ann.get("iscrowd", 0),
                "area": ann.get("area", float(ann["bbox"][2]) * float(ann["bbox"][3])),
                "attributes": ann.get("attributes", {}),
            })
            next_ann_id += 1

    out = {
        "info": {"description": f"Attribute COCO for '{attribute_key}'"},
        "licenses": full_coco.get("licenses", []),
        "images": images,
        "annotations": new_annotations,
        "categories": categories,
    }
    return out

# ---- UI (placeholder default + 'All' option) ----
attr_dropdown = widgets.Dropdown(
    options=[PLACEHOLDER], 
    value=PLACEHOLDER,
    description="Attribute:",
    layout=widgets.Layout(width="400px")
)
make_button   = widgets.Button(description="Create Attribute COCO", button_style="info", icon="cogs")
output_area   = widgets.Output(layout=widgets.Layout(border="1px solid #ddd", padding="8px"))

ui = widgets.VBox([
    widgets.HTML("<b>Create attribute-specific COCO Files</b>"),
    widgets.HBox([attr_dropdown]),
    make_button,
    output_area
])

def _refresh_attribute_choices():
    output_area.clear_output()
    if not INPUT_JSON_PATH.exists():
        with output_area:
            display(Markdown("**Outputs/COCO.json not found.** Run the conversion step first."))
        attr_dropdown.options = [PLACEHOLDER]
        attr_dropdown.value = PLACEHOLDER
        return

    coco = load_coco(INPUT_JSON_PATH)
    keys, values_map = scan_annotation_attributes(coco)
    viable_keys = sorted([k for k in keys if len(values_map.get(k, [])) > 0])

    # Assemble options: placeholder + ALL + viable keys
    options = [PLACEHOLDER, ALL_LABEL] + viable_keys
    attr_dropdown.options = options
    attr_dropdown.value = PLACEHOLDER

    with output_area:
        display(Markdown(f"Found {len(viable_keys)} candidate attribute key(s)."))
        for k in viable_keys[:12]:
            vals = sorted(list(values_map.get(k, [])))
            sample = ", ".join(vals[:6]) + (" ..." if len(vals) > 6 else "")
            print(f"- {k}: {len(vals)} values [{sample}]")
        if len(viable_keys) > 12:
            print(f"... and {len(viable_keys)-12} more.")

def _make_attribute_coco(_):
    with output_area:
        output_area.clear_output()
        if not INPUT_JSON_PATH.exists():
            display(Markdown("**COCO.json not found.** Run the previous section first."))
            return

        sel = attr_dropdown.value
        if sel == PLACEHOLDER:
            display(Markdown("**Select an attribute from the dropdown first.**"))
            return

        try:
            coco = load_coco(INPUT_JSON_PATH)

            if sel == ALL_LABEL:
                # Run for all four canonical keys; error if any is missing/empty anywhere
                created = []
                failures = []
                for key in ALL_KEYS:
                    try:
                        derived = build_attribute_coco_strict(coco, attribute_key=key)
                        safe_key = "".join(ch if ch.isalnum() or ch in ("_", "-") else "_" for ch in key)
                        out_path = DATASET_DIR / f"COCO_{safe_key}.json"
                        save_coco(derived, out_path)
                        created.append((key, out_path, len(derived["annotations"]), len(derived["categories"])))
                    except Exception as e:
                        failures.append((key, str(e)))

                if created:
                    display(Markdown("**Created attribute COCO files:**"))
                    for key, path, nann, ncat in created:
                        print(f"- {key}: {path}  | annotations={nann}  categories={ncat}")

                if failures:
                    display(Markdown("**Some attributes failed validation:**"))
                    for key, msg in failures:
                        print(f"\n[{key}]")
                        print(msg)
                return

            # Single selected attribute
            derived = build_attribute_coco_strict(coco, attribute_key=sel)
            safe_key = "".join(ch if ch.isalnum() or ch in ("_", "-") else "_" for ch in sel)
            out_path = DATASET_DIR / f"COCO_{safe_key}.json"
            save_coco(derived, out_path)

            display(Markdown(f"**Attribute '{sel}' COCO file created.**"))
            print(f"Saved to: {out_path}")
            print(f"Images: {len(derived['images'])}")
            print(f"Annotations: {len(derived['annotations'])}")
            print(f"Categories ({len(derived['categories'])}):")
            for c in derived["categories"]:
                print(f"  {c['id']}: {c['name']}")

        except ValueError as ve:
            display(Markdown(f"**Validation failed:**\n\n```\n{ve}\n```"))
        except Exception as exc:
            display(Markdown(f"**Error:** {exc}"))

make_button.on_click(_make_attribute_coco)
display(ui)

# Populate dropdown initially
_refresh_attribute_choices()

## 5) Train/Validation/Test Splitter and Format Converter

This section provides an interactive tool to (i) **split a COCO dataset** deterministically into Train/Validation/Test subsets and (ii) **materialise the dataset** in one of two formats:
- **YOLO (Ultralytics v8/v11/v12)**: creates `images/{train,val,test}` and `labels/{train,val,test}` with YOLO-TXT labels.
- **COCO-based**: creates `images/{train,val,test}` and `annotations/{train.json,val.json,test.json}`.

You will frequently need reproducible splits and a dataset layout that matches your chosen training framework. This tool guarantees the same split for the same image filenames and offers a one-click conversion to either YOLO or COCO split structure.

#### Usage
1. Select the **Source** COCO file (e.g., `COCO.json` or `COCO_view_angle.json`).  
2. Pick **Architecture** (YOLO or COCO-based).  
3. Set the **Train/Val/Test** sliders; choose a **Freeze** policy if needed (to keep one split fixed)
4. Check the **live readout** to confirm counts.  
5. Click **Convert**. The output directory will be created with a name like:
   - `YOLO_COCO` (for `COCO.json`) or `YOLO_COCO_view_angle` (for an attribute file), or
   - `COCO_COCO` / `COCO_COCO_view_angle` for COCO-based splits.

In [None]:
# === PATHS ===
COCO_SRC_DIR = OUTPUT_DIR                 # where COCO*.json files live
DATASET_DIR  = OUTPUT_DIR.parent          # where new converted datasets will be created
IMAGES_ROOT  = OUTPUT_DIR / "images"      # extracted images live here

# ---------- helpers ----------
def _load_json(p: Path) -> Dict[str, Any]:
    with p.open("r", encoding="utf-8") as f:
        return json.load(f)

def _save_json(obj: Dict[str, Any], p: Path) -> None:
    p.parent.mkdir(parents=True, exist_ok=True)
    with p.open("w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2)

def _list_candidate_coco_files() -> List[Path]:
    return sorted([p for p in COCO_SRC_DIR.glob("COCO*.json") if p.is_file()])

def _safe_name(s: str) -> str:
    return "".join(ch if ch.isalnum() or ch in ("_", "-", ".") else "_" for ch in s)

def _link_or_copy(src: Path, dst: Path):
    dst.parent.mkdir(parents=True, exist_ok=True)
    try:
        if dst.exists():
            return
        os.link(src, dst)
    except Exception:
        try:
            if dst.exists():
                return
            os.symlink(src, dst)
        except Exception:
            shutil.copy2(src, dst)

def _split_ids_deterministic_3way(images: List[Dict[str, Any]], tr: float, va: float) -> Tuple[Set[int], Set[int], Set[int]]:
    train_ids, val_ids, test_ids = set(), set(), set()
    for im in images:
        fname = str(im.get("file_name", ""))
        h = int(hashlib.md5(fname.encode("utf-8")).hexdigest(), 16)
        r = (h % 10_000_000) / 10_000_000.0
        if r < tr:
            train_ids.add(im["id"])
        elif r < tr + va:
            val_ids.add(im["id"])
        else:
            test_ids.add(im["id"])
    return train_ids, val_ids, test_ids

# ---------- converters ----------
def _coco_to_yolo(coco: Dict[str, Any], out_dir: Path, tr: float, va: float) -> Dict[str, Any]:
    out_dir.mkdir(parents=True, exist_ok=True)
    cats = sorted(coco.get("categories", []), key=lambda c: c["id"])
    class_names = [c["name"] for c in cats]
    catid_to_idx = {c["id"]: i for i, c in enumerate(cats)}

    images = coco.get("images", [])
    id2img = {im["id"]: im for im in images}
    train_ids, val_ids, test_ids = _split_ids_deterministic_3way(images, tr, va)

    for split in ("train", "val", "test"):
        (out_dir / f"images/{split}").mkdir(parents=True, exist_ok=True)
        (out_dir / f"labels/{split}").mkdir(parents=True, exist_ok=True)

    img_to_anns: Dict[int, List[Dict[str, Any]]] = {}
    for ann in coco.get("annotations", []):
        img_to_anns.setdefault(ann["image_id"], []).append(ann)

    def _write_split(split_name: str, id_set: Set[int]):
        for img_id in sorted(id_set):
            im = id2img[img_id]
            w, h = float(im["width"]), float(im["height"])
            src_img = (IMAGES_ROOT / im["file_name"]).resolve()
            if not src_img.exists():
                raise FileNotFoundError(f"Image not found: {src_img}")
            dst_img = out_dir / f"images/{split_name}/{_safe_name(Path(im['file_name']).name)}"
            _link_or_copy(src_img, dst_img)

            anns = img_to_anns.get(img_id, [])
            yolo_lines = []
            for a in anns:
                cls_idx = catid_to_idx[a["category_id"]]
                x, y, bw, bh = a["bbox"]
                xc = (x + bw / 2.0) / w
                yc = (y + bh / 2.0) / h
                nw = bw / w
                nh = bh / h
                yolo_lines.append(f"{cls_idx} {xc:.6f} {yc:.6f} {nw:.6f} {nh:.6f}")

            lbl_path = out_dir / f"labels/{split_name}/{dst_img.stem}.txt"
            with lbl_path.open("w", encoding="utf-8") as f:
                f.write("\n".join(yolo_lines))

    for s, ids in (("train", train_ids), ("val", val_ids), ("test", test_ids)):
        _write_split(s, ids)

    data_yaml = {
        "path": str(out_dir.resolve()),
        "train": "images/train",
        "val": "images/val",
        "test": "images/test",
        "names": class_names,
    }
    _save_json(data_yaml, out_dir / "data.yaml.json")

    return {
        "classes": class_names,
        "train": len(train_ids),
        "val": len(val_ids),
        "test": len(test_ids),
        "format": "YOLO",
    }

def _coco_to_coco_splits(coco: Dict[str, Any], out_dir: Path, tr: float, va: float) -> Dict[str, Any]:
    out_dir.mkdir(parents=True, exist_ok=True)
    images = coco.get("images", [])
    id2img = {im["id"]: im for im in images}
    train_ids, val_ids, test_ids = _split_ids_deterministic_3way(images, tr, va)

    def _filtered(ids: Set[int]) -> Dict[str, Any]:
        imgs = [id2img[i] for i in sorted(ids)]
        img_set = {im["id"] for im in imgs}
        anns = [a for a in coco.get("annotations", []) if a["image_id"] in img_set]
        return {
            "info": coco.get("info", {}),
            "licenses": coco.get("licenses", []),
            "images": imgs,
            "annotations": anns,
            "categories": coco.get("categories", []),
        }

    train_coco = _filtered(train_ids)
    val_coco   = _filtered(val_ids)
    test_coco  = _filtered(test_ids)

    ann_dir = out_dir / "annotations"
    _save_json(train_coco, ann_dir / "train.json")
    _save_json(val_coco,   ann_dir / "val.json")
    _save_json(test_coco,  ann_dir / "test.json")

    for split, ids in (("train", train_ids), ("val", val_ids), ("test", test_ids)):
        split_img_dir = out_dir / f"images/{split}"
        for img_id in ids:
            im = id2img[img_id]
            src = (IMAGES_ROOT / im["file_name"]).resolve()
            if not src.exists():
                raise FileNotFoundError(f"Image not found: {src}")
            dst = split_img_dir / _safe_name(Path(im["file_name"]).name)
            _link_or_copy(src, dst)

    return {
        "classes": [c["name"] for c in sorted(coco["categories"], key=lambda c: c["id"])],
        "train": len(train_ids),
        "val": len(val_ids),
        "test": len(test_ids),
        "format": "COCO",
    }

# ---------- UI ----------
ARCH_OPTS = [
    "YOLO (Ultralytics v8/v11/v12)",
    "COCO-based (MMDetection, RF-DETR, RetinaNet, Faster R-CNN, EfficientDet)"
]

_src_files = _list_candidate_coco_files()
src_dropdown = widgets.Dropdown(
    options=[str(p.name) for p in _src_files] if _src_files else ["<no COCO files found>"],
    description="Source:",
    layout=widgets.Layout(width="420px")
)
arch_dropdown = widgets.Dropdown(
    options=ARCH_OPTS,
    description="Architecture:",
    layout=widgets.Layout(width="420px")
)

# Three sliders + "Freeze" selector
train_slider = widgets.IntSlider(value=70, min=1, max=98, step=1, description="Train (%)", continuous_update=True)
val_slider   = widgets.IntSlider(value=15, min=1, max=98, step=1, description="Val (%)",   continuous_update=True)
test_slider  = widgets.IntSlider(value=15, min=1, max=98, step=1, description="Test (%)",  continuous_update=True)

hold_dropdown = widgets.Dropdown(
    options=["Freeze Test", "Freeze Val", "Freeze Train"],
    value="Freeze Test",
    description="Freeze:",
    layout=widgets.Layout(width="200px")
)

live_readout = widgets.HTML("")
convert_btn  = widgets.Button(description="Convert", button_style="warning", icon="exchange", disabled=False)
conv_out     = widgets.Output(layout=widgets.Layout(border="1px solid #ddd", padding="8px"))

ui_box = widgets.VBox([
    widgets.HTML("<b>Convert COCO with Train/Val/Test Split for Object Detection</b>"),
    src_dropdown, arch_dropdown,
    widgets.HBox([train_slider, val_slider, test_slider, hold_dropdown]),
    live_readout,
    convert_btn,
    conv_out
])
display(ui_box)

# ----- slider coupling -----
_updating = False

def _recompute(changed: str):
    global _updating
    if _updating:
        return
    _updating = True

    tr, va, te = train_slider.value, val_slider.value, test_slider.value
    hold = hold_dropdown.value

    tr = max(1, min(98, tr))
    va = max(1, min(98, va))
    te = max(1, min(98, te))

    if changed == "train":
        if hold == "Freeze Test":
            te = test_slider.value
            va = max(1, 100 - tr - te)
        elif hold == "Freeze Val":
            va = val_slider.value
            te = max(1, 100 - tr - va)
        else:
            tr = train_slider.value
            va = max(1, 100 - tr - te)

    elif changed == "val":
        if hold == "Freeze Test":
            te = test_slider.value
            tr = max(1, 100 - va - te)
        elif hold == "Freeze Val":
            va = val_slider.value
            te = max(1, 100 - tr - va)
        else:
            tr = train_slider.value
            te = max(1, 100 - tr - va)

    elif changed == "test":
        if hold == "Freeze Test":
            te = test_slider.value
            va = max(1, 100 - tr - te)
        elif hold == "Freeze Val":
            va = val_slider.value
            tr = max(1, 100 - va - te)
        else:
            tr = train_slider.value
            va = max(1, 100 - tr - te)

    total = tr + va + te
    if total != 100:
        delta = 100 - total
        if changed == "train":
            target = "val" if hold == "Freeze Test" else ("test" if hold == "Freeze Val" else "val")
        elif changed == "val":
            target = "train" if hold == "Freeze Test" else ("test" if hold == "Freeze Train" else "train")
        else:
            target = "val" if hold == "Freeze Train" else ("train" if hold == "Freeze Val" else "val")

        if target == "train":
            tr = max(1, min(98, tr + delta))
        elif target == "val":
            va = max(1, min(98, va + delta))
        else:
            te = max(1, min(98, te + delta))

    train_slider.value, val_slider.value, test_slider.value = tr, va, te
    _updating = False
    _update_readout()

def _current_coco():
    candidates = {p.name: p for p in _list_candidate_coco_files()}
    sel = src_dropdown.value
    if sel not in candidates:
        return None, None
    p = candidates[sel]
    return _load_json(p), p

def _update_readout(*_):
    coco, path = _current_coco()
    if coco is None:
        live_readout.value = "No COCO selected."
        convert_btn.disabled = True
        return
    tr, va, te = train_slider.value, val_slider.value, test_slider.value
    images = coco.get("images", [])
    tr_ids, va_ids, te_ids = _split_ids_deterministic_3way(images, tr/100.0, va/100.0)
    live_readout.value = (
        f"<b>Split:</b> Train {tr}% ({len(tr_ids)}) | Val {va}% ({len(va_ids)}) | "
        f"Test {te}% ({len(te_ids)}) | Total: {len(images)} | Source: {path.name}"
    )
    convert_btn.disabled = (tr + va + te != 100) or (tr < 1 or va < 1 or te < 1)

def _on_train_change(change):
    if change["name"] == "value":
        _recompute("train")

def _on_val_change(change):
    if change["name"] == "value":
        _recompute("val")

def _on_test_change(change):
    if change["name"] == "value":
        _recompute("test")

def _handle_convert(_):
    with conv_out:
        conv_out.clear_output()
        coco, src_path = _current_coco()
        if coco is None:
            display(Markdown("No valid COCO source selected."))
            return
        tr, va, te = train_slider.value, val_slider.value, test_slider.value
        if tr + va + te != 100 or min(tr, va, te) < 1:
            display(Markdown("Invalid split: ensure Train+Val+Test=100 and each ≥ 1%."))
            return

        arch = arch_dropdown.value
        arch_name = _safe_name(arch.split()[0])
        out_dir = DATASET_DIR / f"{arch_name}_{src_path.stem}"

        try:
            if arch.startswith("YOLO"):
                result = _coco_to_yolo(coco, out_dir, tr/100.0, va/100.0)
                display(Markdown(f"YOLO dataset ready at <b>{out_dir}</b>"))
            else:
                result = _coco_to_coco_splits(coco, out_dir, tr/100.0, va/100.0)
                display(Markdown(f"COCO split ready at <b>{out_dir}</b>"))
            print(f"Images — train: {result['train']}, val: {result['val']}, test: {result['test']}")
            print(f"Classes: {', '.join(result['classes'])}")
        except Exception as exc:
            display(Markdown(f"Error: {exc}"))

# wire events
train_slider.observe(_on_train_change, names="value")
val_slider.observe(_on_val_change, names="value")
test_slider.observe(_on_test_change, names="value")
src_dropdown.observe(lambda _ : _update_readout(), names="value")
hold_dropdown.observe(lambda _ : _update_readout(), names="value")

convert_btn.on_click(_handle_convert)

# initial normalize & preview
_recompute("train")

# What’s Next

After completing the conversion step above, your dataset has been properly split and formatted for training.  

If you selected **YOLO**, your next step is to **create the `data.yaml` file** that defines the dataset paths and class names. This file is required by YOLO to begin training.  

If you selected a **COCO-based architecture** (such as RF-DETR, RetinaNet, Faster R-CNN, or EfficientDet), your next step is to **register the dataset inside your chosen framework** by linking the generated `train.json`, `val.json`, and `test.json` files with their image directories.  

In both cases, ensure your splits are balanced and your class definitions are correct before proceeding to the model training phase.
