## Adding COCO Data Using Custom Generator

In [None]:
import glob
import json
import os
import zipfile

import cv2
import gdown
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm

from luxonis_ml.data import LuxonisDataset, LuxonisLoader, DatasetIterator

In [None]:
# Create the dataset

dataset_name = "coco_test"
dataset = LuxonisDataset(dataset_name, delete_existing=True)

### Download and extract data

In [None]:
url = "https://drive.google.com/uc?id=1XlvFK7aRmt8op6-hHkWVKIJQeDtOwoRT"
output_zip = "../data/COCO_people_subset.zip"
output_folder = "../data/"

if not os.path.exists(output_folder):
    os.mkdir(output_folder)

if not os.path.exists(output_zip):
    gdown.download(url, output_zip, quiet=False)

with zipfile.ZipFile(output_zip, "r") as zip_ref:
    zip_ref.extractall(output_folder)

### Converting from COCO People Subset  

`LuxonisDataset` expects a generator yielding dictionary of data for each instance in the following format:  

- **file** (*str*): Path to the image file.  
- **annotation** (*Optional[dict]*): Contains annotations for a single instance:  
  - **class** (*str*): Object label (e.g., `"person"`).  
  - **boundingbox** (*dict*): Normalized bounding box (0-1 scale).  
    - **x, y** (*float*): Top-left coordinates.  
    - **w, h** (*float*)_ Width and height.  
  - **segmentation** (*dict*): Object mask in **polygon** or **RLE** format.  
    - **Polygon**:  
      - **height, width** (*int*): Image dimensions.  
      - **points** (*List[Tuple[float, float]]*): List of normalized `(x, y)` coordinates.  
    - **RLE**:  
      - **height, width** (*int*): Image dimensions.  
      - **counts** (*List[int]*): Run-length encoded mask.  
  - **instance_segmentation** (*dict*): Instance segmentation mask (same format as segmentation).  
    - **Polygon**:  
      - **height, width** (*int*): Image dimensions.  
      - **points** (*List[Tuple[float, float]]*): List of normalized `(x, y)` coordinates.  
    - **RLE**:  
      - **height, width** (*int*): Image dimensions.  
      - **counts** (*List[int]*): Run-length encoded mask. 
  - **keypoints** (*dict*): Ordered list of normalized keypoints.  
    - **keypoints**(*List[Tuple[float, float, int]]*): Each keypoint as `(x, y, visibility)`, where:   
      - `x, y` (*float*): Normalized keypoint coordinates.  
      - **visibility** (*int*):  
        - `0`: Not visible  
        - `1`: Occluded  
        - `2`: Fully visible 


> If you yield bounding boxes, keypoints, and instance segmentation masks separately for the same object, ensure that `instance_id` is included in each annotation type to maintain proper association.
**instance_id** (*Optional[int]*) 


In [None]:
def COCO_people_subset_generator() -> DatasetIterator:
    # find image paths and load COCO annotations
    img_dir = "../data/person_val2017_subset"
    annot_file = "../data/person_keypoints_val2017.json"
    # get paths to images sorted by number
    im_paths = glob.glob(os.path.join(img_dir, "*.jpg"))
    nums = np.array(
        [int(os.path.splitext(os.path.basename(path))[0]) for path in im_paths]
    )
    idxs = np.argsort(nums)
    im_paths = list(np.array(im_paths)[idxs])
    # load
    with open(annot_file) as file:
        data = json.load(file)
    imgs = data["images"]
    anns = data["annotations"]
    # Create dictionaries for quick lookups
    img_dict = {img["file_name"]: img for img in imgs}
    ann_dict = {}
    for ann in anns:
        img_id = ann["image_id"]
        if img_id not in ann_dict:
            ann_dict[img_id] = []
        ann_dict[img_id].append(ann)

    # Process each image and its annotations
    for path in tqdm(im_paths):
        # Find annotations matching the COCO image
        gran = os.path.basename(path)
        img = img_dict.get(gran)
        if img is None:
            continue
        img_id = img["id"]
        img_anns = ann_dict.get(img_id, [])

        # Load the image
        im = cv2.imread(path)
        height, width, _ = im.shape

        # First yield just the file
        yield {"file": path}

        # Process each annotation
        for i, ann in enumerate(img_anns):
            # Create a base record with the file and instance ID
            record = {
                "file": path,
                "annotation": {
                    "class": "person",
                    "instance_id": i,
                },
            }

            # Add bounding box to record
            x, y, w, h = ann["bbox"]
            record["annotation"]["boundingbox"] = {
                "x": x / width,
                "y": y / height,
                "w": w / width,
                "h": h / height,
            }

            # Process segmentation
            seg = ann["segmentation"]
            if isinstance(seg, list) and seg:  # polygon format
                poly = []
                for s in seg:
                    poly_arr = np.array(s).reshape(-1, 2)
                    poly += [
                        (poly_arr[j, 0] / width, poly_arr[j, 1] / height)
                        for j in range(len(poly_arr))
                    ]
                segmentation = {
                    "height": height,
                    "width": width,
                    "points": poly,
                }
                record["annotation"]["segmentation"] = segmentation
                record["annotation"]["instance_segmentation"] = segmentation
            elif isinstance(seg, dict):  # RLE format
                segmentation = {
                    "height": seg["size"][0],
                    "width": seg["size"][1],
                    "counts": seg["counts"],
                }
                record["annotation"]["segmentation"] = segmentation
                record["annotation"]["instance_segmentation"] = segmentation

            # Add keypoints to record
            if "keypoints" in ann:
                kps = np.array(ann["keypoints"]).reshape(-1, 3)
                keypoints = []
                for kp in kps:
                    # Clip keypoints to image boundaries
                    x = np.clip(kp[0], 0, width)
                    y = np.clip(kp[1], 0, height)
                    keypoints.append((x / width, y / height, int(kp[2])))
                record["annotation"]["keypoints"] = {"keypoints": keypoints}

            # Yield the complete record with all annotations
            yield record

In [None]:
dataset = LuxonisDataset(dataset_name)
dataset.add(COCO_people_subset_generator())

### Define Splits

In [None]:
# without providing manual definitions, this will randomly split the data
dataset.make_splits()

### Test Loader

In [None]:
loader = LuxonisLoader(dataset, view="train")
for image, ann in loader:
    cls = ann["/classification"]
    box = ann["/boundingbox"]
    seg = ann["/segmentation"]
    kps = ann["/keypoints"]

    h, w, _ = image.shape
    for b in box:
        cv2.rectangle(
            image,
            (int(b[1] * w), int(b[2] * h)),
            (int(b[1] * w + b[3] * w), int(b[2] * h + b[4] * h)),
            (255, 0, 0),
            2,
        )
    mask_viz = np.zeros((h, w, 3)).astype(np.uint8)
    for mask in seg:
        mask_viz[mask == 1, 2] = 255
    image = cv2.addWeighted(image, 0.5, mask_viz, 0.5, 0)

    for kp in kps:
        kp = kp.reshape(-1, 3)
        for k in kp:
            cv2.circle(
                image, (int(k[0] * w), int(k[1] * h)), 2, (0, 255, 0), 2
            )

    plt.imshow(image)
    plt.axis("off")  # Optional: Hide axis
    plt.show()