# Import library

In [None]:
import json
from box import Box
import pandas as pd

# Label class

## `Label`

In [None]:
class Label:
    def __init__(self, path):
        self.path = path

## `AIHub`

In [None]:
class AIHub(Label):
    def __init__(
        self,
        scenario_id,
        camera_id,
        frame_id,
        root="/home/jongphago/project/ultralytics/datasets/aihub/sample/라벨링데이터/",
    ):
        path = (
            root
            + f"시나리오{scenario_id:02d}/카메라{camera_id:02d}/"
            + f"NIA_MTMDC_s{scenario_id:02d}_c{camera_id:02d}_"
            + "pm_sunny_summer_"
            + f"{frame_id:04d}.json"
        )
        super().__init__(path)

    pass

# DataFrame

## Label list

## Load label file(json)

In [None]:
from pathlib import Path

# '/home/jongphago/project/ultralytics/datasets/aihub/sample/원천데이터/RGB/시나리오01/카메라11.avi'
index = 7361
image_path = Path(f"out/test_{index + 1}.jpg")
label_path = Path(
    f"datasets/aihub/sample/converts/시나리오01/카메라11/NIA_MTMDC_s01_c11_pm_sunny_summer_{index}.txt"
)

print(image_path, label_path, sep='\n')

In [None]:
label_dict = {"scenario_id": 1, "camera_id": 7, "frame_id": 1000}

aihub = AIHub(**label_dict)
with open(aihub.path) as f:
    lines = json.load(f)
lines

## Key: `frames`

In [None]:
def get_frames(lines: dict) -> pd.DataFrame:
    # Convert to DataFrame [label, info]
    label = pd.DataFrame(label_dict, index=[0])
    info = pd.DataFrame(lines["info"])

    # Join [label, info] to `frames`
    frames = label.join(info)

    return frames


frames = get_frames(lines)
frames

## Key: `objects`

In [None]:
def get_objects(lines: dict) -> pd.DataFrame | None:
    def _flatten(x):
        return pd.Series(x[0])

    # Define objects dataframe
    objects = pd.DataFrame(lines["objects"])

    # Exception
    if objects.loc[0, "label"] == "void":
        return None

    # Flatten [position, attributes] columns
    position: pd.DataFrame = objects.position.apply(_flatten)
    attributes: pd.DataFrame = objects.attributes.apply(_flatten)

    # Join objects + [position, attributes]
    objects = objects.join(position)
    objects = objects.join(attributes)

    # Drop [position, attributes] from objects dataframe
    objects = objects.drop(["position", "attributes"], axis=1)

    return objects


objects = get_objects(lines)
objects

In [None]:
from pathlib import Path
import yaml

root = Path("/home/jongphago/project/ultralytics")
config_name = "aihub-sample"  # aihub-val
cfg_path = root / f"ultralytics/cfg/datasets/{config_name}.yaml"
with open(cfg_path, "r") as file:
    cfg = Box(yaml.safe_load(file))

In [None]:
def get_value_dict(cfg: Box) -> dict[str:int]:
    value_dict = {value: key for key, value in cfg.names.items()}
    return value_dict


value_dict = get_value_dict(cfg)

In [None]:
def xywh2ccwh(objects, shape=(1920, 1080), normalized=True):
    width, height = shape
    # top-left 수정
    objects.width.where(objects.x > 0, objects.width + objects.x, inplace=True)
    objects.height.where(objects.y > 0, objects.height + objects.y, inplace=True)
    objects.x.where(objects.x > 0, 0, inplace=True)
    objects.y.where(objects.y > 0, 0, inplace=True)
    # bottom-right 수정
    bottom_right_x = objects.x + objects.width
    bottom_right_y = objects.y + objects.height
    bottom_right_x.where(bottom_right_x < width, width, inplace=True)
    bottom_right_y.where(bottom_right_y < height, height, inplace=True)
    objects.width = bottom_right_x - objects.x
    objects.height = bottom_right_y - objects.y
    # center xy
    x = objects.x + objects.width / 2
    y = objects.y + objects.height / 2
    x.name, y.name = "x", "y"
    # width, height
    width = objects.width
    height = objects.height
    # normalized
    if normalized:
        x = x / width
        y = y / height
        width = objects.width / width
        height = objects.height / height
    # concatenate
    ccwh = pd.concat([x, y, width, height], axis=1)
    return ccwh


xywh2ccwh(objects, normalized=False)

In [None]:
import cv2
import matplotlib.pyplot as plt

im = cv2.imread(
    f"datasets/aihub/sample/frames/scenario{label_dict['scenario_id']:02d}/camera{label_dict['camera_id']:02d}/s{label_dict['scenario_id']:02d}_c{label_dict['camera_id']:02d}_f{label_dict['frame_id']}.png"
)
im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
ccwh = xywh2ccwh(objects, normalized=False).astype(int)
for obj in ccwh.itertuples():
    tlxy = (obj.x - obj.width // 2, obj.y - obj.height // 2)
    brxy = (obj.x + obj.width // 2, obj.y + obj.height // 2)
    center = (obj.x, obj.y)
    cv2.line(im, tlxy, brxy, (255, 0, 255), 2)  # rect
    cv2.rectangle(im, tlxy, brxy, (255, 0, 255), 3)  # rect
    cv2.circle(im, tlxy, 10, (255, 255, 0), 2)  # top-left xy
    cv2.circle(im, brxy, 10, (0, 255, 255), 2)  # bottom-right xy
    cv2.circle(im, center, 10, (255, 0, 0), -1)  # center xy
plt.imshow(im)

In [None]:
shape = (1920, 1080)
width, height = shape


def xywh2ccwh(objects, shape=(1920, 1080)):
    width, height = shape
    norm_width = (
        objects.width.where((objects.x + objects.width) <= width, width - objects.x)
    )  # / width
    norm_height = (
        objects.height.where((objects.y + objects.height) <= height, height - objects.y)
    )  # / height
    norm_center_x = (objects.x + objects.width / 2)  # / width
    norm_center_y = (objects.y + objects.height / 2)  # / height
    norm_center_x.name, norm_center_y.name = "x", "y"
    ccwh = pd.concat([norm_center_x, norm_center_y, norm_width, norm_height], axis=1)
    return ccwh


xywh2ccwh(objects)

In [None]:
def ccwh2xyxy(objects, shape=(1920, 1080)):
    width, height = shape
    tlx = (objects.x - objects.width / 2) / width  # cx - 2/w
    tlx.where(tlx > 0, 0, inplace=True)
    tly = (objects.y - objects.height / 2) / height  # cy - 2/h
    tly.where(tly > 0, 0, inplace=True)
    brx = (objects.x + objects.width / 2) / width
    widths = objects.width.where(brx <= 1, brx - 1) / width
    bry = (objects.y + objects.height / 2) / height
    heights = objects.height.where(bry <= 1, bry - 1) / height
    tlx.name, tly.name = "tlx", "tly"
    return pd.concat([tlx, tly, widths, heights], axis=1)

In [None]:
ccwh2xyxy(objects)

In [None]:
def get_reorder(
    objects: pd.DataFrame,
    frames: pd.DataFrame,
    value_dict: dict[str:int],
) -> pd.DataFrame | None:
    if objects is None:
        return None
    class_col = (
        objects[["label"]]
        .apply(lambda x: value_dict[x[0]], axis=1)
        .to_frame(name="class")
    )
    norm_hor = objects[["x", "width"]] / frames.loc[0, "video_width"]
    norm_ver = objects[["y", "height"]] / frames.loc[0, "video_heigth"]
    joined = class_col.join([norm_hor, norm_ver])
    reorder = joined[["class", "x", "y", "width", "height"]]
    return reorder


reorder = get_reorder(objects, frames, value_dict)
reorder

```python
0,0.187161,0.494629,0.100182,0.492453
0,0.306682,0.486990,0.100817,0.55625
0,0.228916,0.368518,0.091270,0.424166
0,0.008020,0.709953,0.099026,0.737425
0,0.943958,0.517129,0.088682,0.421657
```

# 2024/03/29

In [None]:
import os
import yaml
import pandas as pd
from box import Box
from codes.config.pattern import (
    find,
    scenario_id_pattern,
    camera_id_pattern,
    frame_id_pattern,
)

In [None]:
videos_df = pd.read_csv("datasets/aihub/sample/videos.csv")
for row in videos_df.itertuples():
    break
video_dir = row.video_dir

In [None]:
yaml_path = "ultralytics/cfg/datasets/aihub-sample.yaml"
with open(yaml_path) as f:
    cfg: Box = Box(yaml.safe_load(f))  # dot repr.

In [None]:
scenario_id = find(scenario_id_pattern, video_dir)
camera_id = find(camera_id_pattern, video_dir)
frame_id = find(frame_id_pattern, aihub.path)

In [None]:
cfg.path = Path(cfg.path)
cfg.lbl = cfg.path / cfg.lbl  # ex. aihub/sample/라벨링데이터
cfg.cvt = cfg.path / cfg.cvt  # ex. aihub/sample/converts
sub_dir = f"scenario{scenario_id}/camera{camera_id}"  # row.video_dir
cfg.converts_dir = cfg.cvt / sub_dir
cfg.labels = cfg.path / cfg.labels  # ex. aihub/sample/labels
cfg.labels_dir = cfg.labels / row.task

In [None]:
raw = Path(  # json
    "/home/jongphago/project/ultralytics/datasets/"
    + "aihub/2.Validation/라벨링데이터/"
    + "시나리오42/카메라01/"
    + "NIA_MTMDC_s42_c01_am_sunny_fall_0001.json"
)
src = (  # txt
    "/home/jongphago/project/ultralytics/datasets/"
    + "aihub/2.Validation/converts/"
    + "시나리오01/카메라07"
    + "NIA_MTMDC_s42_c01_am_sunny_fall_0001.txt"
)
dst = (  # link
    "/home/jongphago/project/ultralytics/datasets"
    + "/aihub/2.Validation/labels/val"
    + "s42_c01_f0001.txt"
)

In [None]:
def raw2cvt(raw, cfg):
    cvt_name = f"{raw.stem}.txt"
    cvt = cfg.path / cfg.cvt / row.video_dir / cvt_name
    return cvt

cvt = raw2cvt(raw, cfg)

In [None]:
def cvt2dst(cvt, cfg):
    scenario_id_pattern = r"(?<=시나리오)\d{2}"
    camera_id_pattern = r"(?<=카메라)\d{2}"
    frame_id_pattern = r"_(\d{4})."

    scenario_id = find(scenario_id_pattern, str(cvt))
    camera_id = find(camera_id_pattern, str(cvt))
    frame_id = find(frame_id_pattern, str(cvt))
    dst_name = f"s{scenario_id}_c{camera_id}_f{frame_id}.txt"

    dst = Path(cfg.path) / cfg.val / dst_name
    return dst

dst = cvt2dst(cvt, cfg)

# Write txt to convert

In [None]:
from pathlib import Path
import yaml
import pandas as pd
from box import Box

In [None]:
os.makedirs(cvt.parent, exist_ok=True)

In [None]:
def convert(reorder, cvt_path) -> None:
    if reorder is not None:
        reorder.to_csv(cvt_path, header=False, index=False)
    else:
        with open(cvt_path, "w") as f:
            f.write("")
    return None


convert(reorder, cvt)

In [None]:
# config file (.yaml)
def load_config_from_yaml(fp: str | Path) -> Box:
    with open(fp) as f:
        cfg = Box(yaml.safe_load(f))
    return cfg


cfg_yaml_path = "ultralytics/cfg/datasets/aihub-sample.yaml"
cfg: Box = load_config_from_yaml(cfg_yaml_path)

In [None]:
# sub directory list (.csv)
def get_videos(cfg: Box) -> map:
    """AI is creating summary for get_videos

    Args:
        cfg (Box): directory information

    Returns:
        map: iterable pandas dataframe

    Examples:
    cfg = load_config_from_yaml(cfg_yaml_path)
    rows = get_videos(cfg)
    for row in rows:
        row.task, row.video_dir
    """
    videos_df: pd.DataFrame = pd.read_csv(Path(cfg.path) / cfg.csv)
    rows: map = videos_df.itertuples()
    return rows


rows = get_videos(cfg)
for row in rows:
    print(row.video_dir)

In [None]:
# traverse directory (.json)
def get_json_labels(cfg, sub_dir) -> list[Path]:
    raw_label_path = Path(cfg.path) / cfg.lbl / sub_dir
    label_paths = []
    for dirs, _, file_names in os.walk(raw_label_path):
        parent = Path(dirs)
        for file_name in sorted(file_names):
            label_paths.append(parent / file_name)
    return label_paths


json_labels = get_json_labels(cfg, row.video_dir)
for json_label in json_labels:
    assert json_label.exists()

In [None]:
rows = get_videos(cfg)
for row in rows:
    json_labels = get_json_labels(cfg, row.video_dir)

In [None]:
json_label = json_labels[2000]
with open(json_label) as f:
    lines = json.load(f)

In [None]:
frames = get_frames(lines)
objects = get_objects(lines)
reorder = get_reorder(objects, frames, value_dict)

# Create symbolic link file

In [None]:
label_dir = Path(cfg.path) / cfg.labels / row.task
os.makedirs(label_dir, exist_ok=True)

In [None]:
srcs = []
for directory, _, file_names in os.walk(cvt.parent):
    root = Path(directory)
    for file_name in file_names:
        srcs.append(root / file_name)
srcs

In [None]:
for src in srcs:
    dst = cvt2dst(src, cfg)
    print(dst)
    if not os.path.exists(dst):
        os.symlink(src, dst)