In [1]:
import json
import cv2
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple

In [15]:
# Ustawienia
JSON_PATH = "dataset/label.json"
VIDEO_PATH = "dataset/traffic.mp4"
OUTPUT_VIDEO = "dataset/result.mp4"
OUTPUT_CSV = "dataset/results.csv"
LINE_Y = 0

In [17]:
class BottomCenterTracker:
    def __init__(self, max_distance: int = 80, ttl: int = 20):
        self.objects   = {}
        self.ttl_map   = {}
        self.next_id   = 0
        self.max_dist  = max_distance
        self.ttl       = ttl
        self.crossed   = set()
        self.prev_cy   = {}

    def _match(self, centers):
        ids   = list(self.objects.keys())
        match = [-1] * len(centers)

        for i, c in enumerate(centers):
            best_id, best_d = None, self.max_dist + 1
            for obj_id in ids:
                d = np.linalg.norm(np.array(c) - np.array(self.objects[obj_id]))
                if d < best_d:
                    best_id, best_d = obj_id, d
            if best_id is not None:
                match[i] = best_id
                ids.remove(best_id)
        return match

    def update(self, detections):

        bottom_centers = [((x1 + x2)//2, y2) for (_, x1, y1, x2, y2) in detections]
        matched        = self._match(bottom_centers)
        updated        = []

        for i, center in enumerate(bottom_centers):
            obj_id = matched[i]
            if obj_id == -1:
                obj_id = self.next_id
                self.next_id += 1
            self.objects[obj_id] = center
            self.ttl_map[obj_id] = 0
            matched[i] = obj_id

        for obj_id in list(self.ttl_map.keys()):
            self.ttl_map[obj_id] += 1
            if self.ttl_map[obj_id] > self.ttl:
                self.objects.pop(obj_id, None)
                self.ttl_map.pop(obj_id, None)
                self.prev_cy.pop(obj_id, None)

        for i, (cls, x1, y1, x2, y2) in enumerate(detections):
            obj_id = matched[i]
            _, cy  = bottom_centers[i]
            prev_y = self.prev_cy.get(obj_id)
            crossed = False

            if prev_y is not None:
                if (prev_y < LINE_Y <= cy) or (prev_y > LINE_Y >= cy):
                    if obj_id not in self.crossed:
                        crossed = True
                        self.crossed.add(obj_id)

            self.prev_cy[obj_id] = cy
            updated.append((obj_id, cls, x1, y1, x2, y2, crossed))

        return updated

In [43]:
# --- NOWA WERSJA interpolate_annotations_adjusted ---------------------------
def interpolate_annotations_adjusted(json_path: str,
                                     width: int,
                                     height: int
) -> Dict[int, List[Tuple[str, int, int, int, int]]]:
    """
    Ładuje Label Studio JSON i zwraca:
        { frame_idx (0‑based) : [(label, x1, y1, x2, y2), ...] }
    * Obsługuje wartości zarówno 0‑1, jak i 0‑100 %.
    * Koryguje klatkę (LS startuje od 1 → OpenCV od 0).
    * Opcjonalnie cofa bbox o ~10 % w X i 20 % w Y, żeby
      bottom‑center lepiej trafiał w tylny zderzak.
    * Wypełnia dziury liniową interpolacją.
    """

    def to_px(pct_or_frac: float, dim: int) -> int:
        """Zamienia 0‑1 lub 0‑100 % na px, zaokrąglając do int."""
        frac = pct_or_frac if pct_or_frac <= 1 else pct_or_frac / 100.0
        return int(round(frac * dim))

    with open(json_path, "r") as f:
        data = json.load(f)

    annotations: Dict[int, List[Tuple[str, int, int, int, int]]] = {}

    for item in data:
        for ann in item.get("annotations", []):
            for result in ann.get("result", []):
                label     = result.get("value", {}).get("labels", ["car"])[0]
                sequence  = result.get("value", {}).get("sequence", [])
                prev_f, prev_box = None, None

                for entry in sequence:
                    if not entry.get("enabled", True):
                        continue

                    frame = entry["frame"] - 1          # 0‑based!
                    x     = to_px(entry["x"],      width)
                    y     = to_px(entry["y"],      height)
                    w     = to_px(entry["width"],  width)
                    h     = to_px(entry["height"], height)

                    # lekkie przesunięcie bbox‑a “w tył”
                    x -= int(0.10 * w)
                    y -= int(0.20 * h)

                    x1, y1 = x, y
                    x2, y2 = x + w, y + h
                    box    = (label, x1, y1, x2, y2)

                    # interpolacja brakujących klatek
                    if prev_f is not None and frame > prev_f + 1:
                        gap = frame - prev_f
                        for g in range(1, gap):
                            alpha = g / gap
                            interp = (
                                label,
                                int(prev_box[1] * (1 - alpha) + x1 * alpha),
                                int(prev_box[2] * (1 - alpha) + y1 * alpha),
                                int(prev_box[3] * (1 - alpha) + x2 * alpha),
                                int(prev_box[4] * (1 - alpha) + y2 * alpha),
                            )
                            annotations.setdefault(prev_f + g, []).append(interp)

                    annotations.setdefault(frame, []).append(box)
                    prev_f, prev_box = frame, box

    return annotations


In [45]:
def main():
    cap = cv2.VideoCapture(VIDEO_PATH)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    annotations = interpolate_annotations_adjusted(JSON_PATH, width, height)
    tracker = BottomCenterTracker()

    out = cv2.VideoWriter(OUTPUT_VIDEO, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
    results = []
    frame_idx = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
            
        cv2.line(frame, (0, LINE_Y), (width, LINE_Y), (0, 0, 255), 1)

        detections = annotations.get(frame_idx, [])
        tracked = tracker.update(detections)

        for obj_id, cls, x1, y1, x2, y2, crossed in tracked:
            color = (0, 255, 0) if not crossed else (0, 0, 255)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, f"{cls}-{obj_id}", (x1, y1 - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            results.append((frame_idx, obj_id, cls, x1, y1, x2, y2, crossed))


        cv2.putText(frame, f"Przekroczylo linie: {len(tracker.crossed)}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)

        out.write(frame)
        frame_idx += 1

    cap.release()
    out.release()

    df = pd.DataFrame(results, columns=["frame", "id", "class", "x1", "y1", "x2", "y2", "crossed"])
    df.to_csv(OUTPUT_CSV, index=False)

main()