In [1]:
# VIDEO SEGMENTATION DEMO (YOLOv8-seg)
# Kernel: Python (adas)

from ultralytics import YOLO
import cv2
from pathlib import Path
import numpy as np
import time

# -------- CONFIG --------
BEST_WEIGHTS = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/weights/best.pt")  # your model
INPUT_VIDEO  = Path(r"E:/projects/riding_bike.mp4")  # <-- your video here
OUTPUT_VIDEO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike.mp4")
DEVICE = "cuda"  # or "cpu"

# -------- CHECKS --------
if not BEST_WEIGHTS.exists():
    raise SystemExit("best.pt not found at: " + str(BEST_WEIGHTS))
if not INPUT_VIDEO.exists():
    raise SystemExit("Input video not found: " + str(INPUT_VIDEO))

print("Loading model:", BEST_WEIGHTS)
model = YOLO(str(BEST_WEIGHTS))

# -------- VIDEO SETUP --------
cap = cv2.VideoCapture(str(INPUT_VIDEO))
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS)

print(f"Video: {W}x{H}, {FPS} FPS")

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(str(OUTPUT_VIDEO), fourcc, FPS, (W, H))

# colormap for segmentation
cmap = np.random.RandomState(0).randint(0,255,(256,3),dtype=np.uint8)

# -------- PROCESS LOOP --------
frame_id = 0
t0 = time.time()

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # run segmentation
    results = model.predict(frame, device=DEVICE, imgsz=640, conf=0.25, verbose=False)
    r = results[0]

    # extract mask (semantic)
    mask = None
    try:
        if hasattr(r, "masks") and r.masks is not None:
            mdata = r.masks.data if hasattr(r.masks, "data") else r.masks.numpy()
            arr = np.array(mdata)
            if arr.ndim == 3:
                mask = np.argmax(arr, axis=0).astype(np.int32)
            elif arr.ndim == 2:
                mask = arr.astype(np.int32)
    except Exception:
        mask = None

    if mask is None:
        out.write(frame)
        continue

    # resize mask to original frame
    mask = cv2.resize(mask.astype(np.int32), (W, H), interpolation=cv2.INTER_NEAREST)

    # colorize + overlay
    mask_mod = np.mod(mask, 256).astype(np.uint8)
    color_mask = cmap[mask_mod]
    overlay = (0.5 * frame + 0.5 * color_mask).astype(np.uint8)

    out.write(overlay)

    frame_id += 1
    if frame_id % 50 == 0:
        print("Processed:", frame_id, "frames")

t1 = time.time()
cap.release()
out.release()

print("\nDONE!")
print("Saved output video to:", OUTPUT_VIDEO)
print(f"Processed {frame_id} frames in {(t1 - t0):.2f}s ({(t1 - t0)/max(1,frame_id):.3f} sec/frame).")


Loading model: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\weights\best.pt
Video: 1280x720, 29.97002997002997 FPS

DONE!
Saved output video to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike.mp4
Processed 0 frames in 25.37s (25.373 sec/frame).


In [2]:
# Fixed VIDEO SEGMENTATION DEMO (YOLOv8-seg) — handles BGR→RGB, robust mask extraction, always counts frames
# Kernel: Python (adas)

from ultralytics import YOLO
import cv2
from pathlib import Path
import numpy as np
import time
import sys

# -------- CONFIG --------
BEST_WEIGHTS = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/weights/best.pt")
INPUT_VIDEO  = Path(r"E:/projects/riding_bike.mp4")
OUTPUT_VIDEO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike_fixed.mp4")
DEVICE = "cuda"  # or "cpu"

# -------- CHECKS --------
if not BEST_WEIGHTS.exists():
    raise SystemExit("best.pt not found at: " + str(BEST_WEIGHTS))
if not INPUT_VIDEO.exists():
    raise SystemExit("Input video not found: " + str(INPUT_VIDEO))

print("Loading model:", BEST_WEIGHTS)
model = YOLO(str(BEST_WEIGHTS))

# -------- VIDEO SETUP --------
cap = cv2.VideoCapture(str(INPUT_VIDEO))
if not cap.isOpened():
    raise SystemExit("Failed to open video with OpenCV: " + str(INPUT_VIDEO))

W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0

print(f"Video: {W}x{H}, {FPS:.2f} FPS")

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(str(OUTPUT_VIDEO), fourcc, FPS, (W, H))

# colormap for segmentation
cmap = np.random.RandomState(0).randint(0,255,(256,3),dtype=np.uint8)

# -------- PROCESS LOOP --------
frame_id = 0
t0 = time.time()
print_interval = 100

while True:
    ret, frame_bgr = cap.read()
    if not ret:
        break

    # convert to RGB for the model
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)

    # run segmentation (pass RGB array)
    try:
        results = model.predict(frame_rgb, device=DEVICE, imgsz=640, conf=0.25, verbose=False)
    except Exception as ex:
        # if predict fails for arrays, try passing frame path fallback (slower)
        print(f"[WARN] model.predict on array failed at frame {frame_id}: {ex}", file=sys.stderr)
        results = None

    mask = None
    if results:
        r = results[0]
        # robust extraction attempts
        try:
            if hasattr(r, "masks") and r.masks is not None:
                # prefer r.masks.data if available
                mdata = r.masks.data if hasattr(r.masks, "data") else getattr(r.masks, "numpy", lambda: r.masks)()
                arr = np.array(mdata)
                if arr.ndim == 3:
                    # instance masks N,H,W -> convert to semantic-like via argmax
                    mask = np.argmax(arr, axis=0).astype(np.int32)
                elif arr.ndim == 2:
                    mask = arr.astype(np.int32)
                else:
                    mask = np.squeeze(arr).astype(np.int32)
        except Exception:
            mask = None

    # If mask still None, we will write original frame (but still count it)
    if mask is None:
        out_frame = frame_bgr  # write original BGR frame
    else:
        # resize mask to original frame size
        if mask.shape[:2] != (H, W):
            mask = cv2.resize(mask.astype(np.int32), (W, H), interpolation=cv2.INTER_NEAREST)
        mask_mod = np.mod(mask, 256).astype(np.uint8)
        color_mask = cmap[mask_mod]
        # overlay color mask over original BGR frame -> need BGR for writer
        overlay_rgb = (0.5 * frame_rgb + 0.5 * color_mask).astype(np.uint8)
        out_frame = cv2.cvtColor(overlay_rgb, cv2.COLOR_RGB2BGR)

    out.write(out_frame)

    frame_id += 1
    if frame_id % print_interval == 0:
        print(f"Processed {frame_id} frames...")

# clean up
t1 = time.time()
cap.release()
out.release()

print("\nDONE!")
print("Saved output video to:", OUTPUT_VIDEO)
print(f"Processed {frame_id} frames in {(t1 - t0):.2f}s ({(t1 - t0)/max(1,frame_id):.3f} sec/frame).")


Loading model: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\weights\best.pt
Video: 1280x720, 29.97 FPS


  arr = np.array(mdata)


Processed 100 frames...
Processed 200 frames...
Processed 300 frames...
Processed 400 frames...
Processed 500 frames...
Processed 600 frames...
Processed 700 frames...
Processed 800 frames...

DONE!
Saved output video to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike_fixed.mp4
Processed 870 frames in 25.01s (0.029 sec/frame).


In [3]:
# Quick import test — run inside your notebook (kernel: Python (adas)) or in Anaconda Prompt (adas active)
try:
    import ultralytics, cv2, numpy as np
    from deep_sort_realtime.deepsort_tracker import DeepSort
    print("ultralytics", ultralytics.__version__)
    print("cv2", cv2.__version__)
    print("deep_sort_realtime import OK")
except Exception as e:
    print("Import failed:", e)


ultralytics 8.3.228
cv2 4.12.0
deep_sort_realtime import OK


In [6]:
# Robust Detection + DeepSORT tracking + segmentation overlay (fixed color/type issues)
# Kernel: Python (adas)

from pathlib import Path
import cv2, time, numpy as np, math, sys
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# ----------------- CONFIG -----------------
DETECT_WEIGHTS = "yolov8n.pt"
SEG_WEIGHTS    = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/weights/best.pt")
INPUT_VIDEO    = Path(r"E:/projects/riding_bike.mp4")
OUTPUT_VIDEO   = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike_track_final.mp4")
DEVICE         = "cuda"
CONF_THRESH    = 0.3

# ----------------- Sanity checks -----------------
if not INPUT_VIDEO.exists():
    raise SystemExit("Input video not found: " + str(INPUT_VIDEO))

# ----------------- Load models -----------------
det_model = YOLO(DETECT_WEIGHTS)
seg_model = None
if SEG_WEIGHTS is not None and SEG_WEIGHTS.exists():
    seg_model = YOLO(str(SEG_WEIGHTS))
    print("Segmentation overlay ENABLED")
else:
    print("Segmentation overlay DISABLED")

# ----------------- Tracker -----------------
tracker = DeepSort(max_age=30)

# ----------------- Video IO -----------------
cap = cv2.VideoCapture(str(INPUT_VIDEO))
if not cap.isOpened():
    raise SystemExit("Failed to open video: " + str(INPUT_VIDEO))
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(str(OUTPUT_VIDEO), fourcc, FPS, (W, H))

# Prepare palette as list of Python tuples (int,int,int) to avoid NumPy / dtype issues
_palette_np = (np.random.RandomState(2).randint(0,255,(256,3))).astype(np.int32)
PALETTE = [ (int(c[0]), int(c[1]), int(c[2])) for c in _palette_np ]

last_centroid = {}
frame_id = 0; t0 = time.time()

print("Starting detection+tracking (final)...")
while True:
    ret, frame_bgr = cap.read()
    if not ret:
        break
    t_frame = time.time()
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)

    # segmentation (optional)
    seg_mask = None
    if seg_model is not None:
        try:
            rseg = seg_model.predict(frame_rgb, device=DEVICE, imgsz=640, verbose=False)[0]
            if hasattr(rseg, "masks") and rseg.masks is not None:
                arr = np.array(rseg.masks.data if hasattr(rseg.masks, "data") else rseg.masks.numpy())
                seg_mask = np.argmax(arr, axis=0).astype(np.int32) if arr.ndim==3 else arr.astype(np.int32)
        except Exception:
            seg_mask = None

    # detection -> DeepSort format: ([x1,y1,x2,y2], score, class_name)
    detections_for_tracker = []
    try:
        r = det_model.predict(frame_rgb, device=DEVICE, imgsz=640, conf=CONF_THRESH, verbose=False)[0]
        if hasattr(r, "boxes") and r.boxes is not None:
            xyxy = r.boxes.xyxy.cpu().numpy() if hasattr(r.boxes, "xyxy") else np.array(r.boxes.xyxy)
            confs = r.boxes.conf.cpu().numpy() if hasattr(r.boxes, "conf") else np.array(r.boxes.conf)
            clsids = r.boxes.cls.cpu().numpy() if hasattr(r.boxes, "cls") else np.array(r.boxes.cls)
            for (x1,y1,x2,y2), conf, clsid in zip(xyxy, confs, clsids):
                detections_for_tracker.append(([float(x1), float(y1), float(x2), float(y2)], float(conf), str(int(clsid))))
    except Exception as ex:
        print("[WARN] detection error:", ex, file=sys.stderr)
        detections_for_tracker = []

    # update tracker (pass BGR frame for appearance)
    tracks = tracker.update_tracks(detections_for_tracker, frame=frame_bgr)

    # draw tracks
    for tr in tracks:
        try:
            if not tr.is_confirmed():
                continue
        except Exception:
            # some tracker versions don't have is_confirmed method; assume confirmed
            pass

        # ensure track id is an int
        try:
            tid_raw = tr.track_id
            tid = int(tid_raw)
        except Exception:
            # fallback if track_id is non-numeric
            tid = abs(hash(str(tr.track_id))) % 10000

        # get bbox (prefer ltrb)
        try:
            ltrb = tr.to_ltrb()
        except Exception:
            ltrb = tr.to_tlbr()
        x1,y1,x2,y2 = map(int, ltrb)

        # safe color selection (palette is list)
        color = PALETTE[tid % len(PALETTE)]

        cv2.rectangle(frame_bgr, (x1,y1), (x2,y2), color, 2)

        cx = int((x1 + x2) / 2)
        cy = int((y1 + y2) / 2)
        speed_text = ""
        if tid in last_centroid:
            px, py, t_prev = last_centroid[tid]
            dt = max(1e-6, t_frame - t_prev)
            dist = math.hypot(cx - px, cy - py)
            speed = dist / dt
            speed_text = f"{int(speed)} px/s"
        last_centroid[tid] = (cx, cy, t_frame)

        label = f"ID:{tid} {speed_text}"
        # draw label background for readability
        (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        bx1, by1 = x1, max(0, y1 - th - 6)
        bx2, by2 = x1 + tw + 6, by1 + th + 6
        cv2.rectangle(frame_bgr, (bx1, by1), (bx2, by2), color, -1)
        cv2.putText(frame_bgr, label, (x1+3, by2-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)

    # overlay segmentation if present
    frame_out = frame_bgr
    if seg_mask is not None:
        if seg_mask.shape[:2] != (H, W):
            seg_mask = cv2.resize(seg_mask.astype(np.int32), (W, H), interpolation=cv2.INTER_NEAREST)
        seg_col = (np.random.RandomState(0).randint(0,255,(256,3))).astype(np.uint8)
        color_mask = seg_col[np.mod(seg_mask.astype(np.int32),256)]
        frame_out = (0.4 * frame_bgr.astype(np.float32) + 0.6 * color_mask.astype(np.float32)).astype(np.uint8)

    out.write(frame_out)
    frame_id += 1

# cleanup
cap.release()
out.release()
t1 = time.time()
print("Done. Saved to:", OUTPUT_VIDEO)
print("Frames:", frame_id, "Time:", (t1 - t0), "FPS:", frame_id / max(1, (t1 - t0)))


Segmentation overlay ENABLED
Starting detection+tracking (final)...


  arr = np.array(rseg.masks.data if hasattr(rseg.masks, "data") else rseg.masks.numpy())


Done. Saved to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike_track_final.mp4
Frames: 870 Time: 62.04981756210327 FPS: 14.020992070270804


In [8]:
# Calibration using OpenCV mouse clicks (works in Jupyter)
# Click exactly TWO points in the window, then press 'q'

import cv2
import math

VIDEO_PATH = r"E:/projects/riding_bike.mp4"

# Read first frame
cap = cv2.VideoCapture(VIDEO_PATH)
ret, frame = cap.read()
cap.release()

if not ret:
    raise SystemExit("Could not read video.")

points = []

def click_event(event, x, y, flags, param):
    global points
    if event == cv2.EVENT_LBUTTONDOWN:
        points.append((x, y))
        print(f"Selected point: {(x,y)}")

# Show frame
cv2.namedWindow("Calibration")
cv2.setMouseCallback("Calibration", click_event)

print("Click TWO points on the frame, then press 'q' to continue.")

while True:
    cv2.imshow("Calibration", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cv2.destroyAllWindows()

if len(points) != 2:
    raise SystemExit("ERROR: You must click exactly TWO points.")

p1, p2 = points
pixel_dist = math.hypot(p2[0]-p1[0], p2[1]-p1[1])
print("Pixel distance =", pixel_dist)

real_m = float(input("Enter real distance between the two points (meters): "))

px_per_m = pixel_dist / real_m
print("Calibration OK: ", px_per_m, "pixels per meter")


Click TWO points on the frame, then press 'q' to continue.
Selected point: (330, 288)
Selected point: (545, 460)
Pixel distance = 275.3343422096125


Enter real distance between the two points (meters):  5


Calibration OK:  55.066868441922495 pixels per meter


In [9]:
# Calibrated tracking -> outputs KM/H overlay + CSV
# Kernel: Python (adas)

from pathlib import Path
import cv2, time, math, csv, numpy as np, sys
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# -------- CONFIG (edit if needed) --------
INPUT_VIDEO  = Path(r"E:/projects/riding_bike.mp4")
OUTPUT_VIDEO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike_track_kmh_final.mp4")
TRACKS_CSV   = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_final.csv")

DETECT_WEIGHTS = "yolov8n.pt"
SEG_WEIGHTS = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/weights/best.pt")  # set to None to disable
DEVICE = "cuda"   # or "cpu"
CONF_THRESH = 0.3

# Use the px_per_m you measured
PX_PER_M = 55.066868441922495   # <-- from your calibration (pixels per meter)

# -------- Sanity checks --------
if not INPUT_VIDEO.exists():
    raise SystemExit("Input video missing: " + str(INPUT_VIDEO))

# -------- Load models --------
det_model = YOLO(DETECT_WEIGHTS)
seg_model = None
if SEG_WEIGHTS is not None and SEG_WEIGHTS.exists():
    seg_model = YOLO(str(SEG_WEIGHTS))
    print("Seg overlay ENABLED using:", SEG_WEIGHTS)
else:
    print("Seg overlay DISABLED")

# -------- Tracker setup --------
tracker = DeepSort(max_age=30)

# -------- Video IO --------
cap = cv2.VideoCapture(str(INPUT_VIDEO))
if not cap.isOpened():
    raise SystemExit("Failed to open video: " + str(INPUT_VIDEO))
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"avc1")  # use H.264 (avc1) for better Windows compatibility
out = cv2.VideoWriter(str(OUTPUT_VIDEO), fourcc, FPS, (W, H))

# color palette
rng = np.random.RandomState(2)
_palette_np = (rng.randint(0,255,(256,3))).astype(np.int32)
PALETTE = [ (int(c[0]), int(c[1]), int(c[2])) for c in _palette_np ]

# bookkeeping
last_centroid = {}   # tid -> (cx, cy, t)
csv_rows = []
frame_id = 0
t_start = time.time()

print("Running calibrated tracker -> KM/H overlay. This may take some minutes depending on GPU/CPU.")

while True:
    ret, frame_bgr = cap.read()
    if not ret:
        break
    t_frame = time.time()
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)

    # 1) optional segmentation (fast enough on gpu; skip if you prefer)
    seg_mask = None
    if seg_model is not None:
        try:
            rseg = seg_model.predict(frame_rgb, device=DEVICE, imgsz=640, verbose=False)[0]
            if hasattr(rseg, "masks") and rseg.masks is not None:
                arr = np.array(rseg.masks.data if hasattr(rseg.masks, "data") else rseg.masks.numpy())
                seg_mask = np.argmax(arr, axis=0).astype(np.int32) if arr.ndim==3 else arr.astype(np.int32)
        except Exception:
            seg_mask = None

    # 2) detection -> list of ([x1,y1,x2,y2], score, class_name)
    detections_for_tracker = []
    try:
        r = det_model.predict(frame_rgb, device=DEVICE, imgsz=640, conf=CONF_THRESH, verbose=False)[0]
        if hasattr(r, "boxes") and r.boxes is not None:
            xyxy = r.boxes.xyxy.cpu().numpy() if hasattr(r.boxes, "xyxy") else np.array(r.boxes.xyxy)
            confs = r.boxes.conf.cpu().numpy() if hasattr(r.boxes, "conf") else np.array(r.boxes.conf)
            clsids = r.boxes.cls.cpu().numpy() if hasattr(r.boxes, "cls") else np.array(r.boxes.cls)
            for (x1,y1,x2,y2), conf, clsid in zip(xyxy, confs, clsids):
                detections_for_tracker.append(([float(x1), float(y1), float(x2), float(y2)], float(conf), str(int(clsid))))
    except Exception as ex:
        print("Detection warning:", ex, file=sys.stderr)
        detections_for_tracker = []

    # 3) update tracker
    tracks = tracker.update_tracks(detections_for_tracker, frame=frame_bgr)

    # 4) draw tracked boxes + compute speed (km/h) and record CSV rows
    for tr in tracks:
        try:
            if hasattr(tr, "is_confirmed") and (not tr.is_confirmed()):
                continue
        except Exception:
            pass

        # normalize track id to int
        try:
            tid = int(tr.track_id)
        except Exception:
            tid = abs(hash(str(tr.track_id))) % 10000

        # bbox
        try:
            ltrb = tr.to_ltrb()
        except Exception:
            ltrb = tr.to_tlbr()
        x1,y1,x2,y2 = map(int, ltrb)
        cx = (x1 + x2) / 2.0
        cy = (y1 + y2) / 2.0

        # compute pixel speed and convert to km/h
        speed_px_s = 0.0
        speed_m_s = 0.0
        speed_kmh = 0.0
        if tid in last_centroid:
            px, py, t_prev = last_centroid[tid]
            dt = max(1e-6, t_frame - t_prev)
            dist_px = math.hypot(cx - px, cy - py)
            speed_px_s = dist_px / dt
            speed_m_s = (dist_px / PX_PER_M) / dt
            speed_kmh = speed_m_s * 3.6
        last_centroid[tid] = (cx, cy, t_frame)

        # draw bbox + label
        color = PALETTE[tid % len(PALETTE)]
        cv2.rectangle(frame_bgr, (x1,y1), (x2,y2), color, 2)
        label = f"ID:{tid} {speed_kmh:.1f} km/h"
        (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        bx1, by1 = x1, max(0, y1 - th - 6)
        bx2, by2 = x1 + tw + 6, by1 + th + 6
        cv2.rectangle(frame_bgr, (bx1, by1), (bx2, by2), color, -1)
        cv2.putText(frame_bgr, label, (x1+3, by2-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)

        # append CSV row
        csv_rows.append({
            "frame": frame_id,
            "time": t_frame - t_start,
            "track_id": tid,
            "x1": x1, "y1": y1, "x2": x2, "y2": y2,
            "cx": cx, "cy": cy,
            "speed_px_s": speed_px_s,
            "speed_m_s": speed_m_s,
            "speed_kmh": speed_kmh
        })

    # 5) overlay segmentation if present
    frame_out = frame_bgr
    if seg_mask is not None:
        if seg_mask.shape[:2] != (H, W):
            seg_mask = cv2.resize(seg_mask.astype(np.int32), (W, H), interpolation=cv2.INTER_NEAREST)
        seg_col = (np.random.RandomState(0).randint(0,255,(256,3))).astype(np.uint8)
        color_mask = seg_col[np.mod(seg_mask.astype(np.int32),256)]
        frame_out = (0.4 * frame_bgr.astype(np.float32) + 0.6 * color_mask.astype(np.float32)).astype(np.uint8)

    out.write(frame_out)
    frame_id += 1

# finalize
cap.release()
out.release()

# save CSV
keys = ["frame","time","track_id","x1","y1","x2","y2","cx","cy","speed_px_s","speed_m_s","speed_kmh"]
with open(TRACKS_CSV, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=keys)
    writer.writeheader()
    for r in csv_rows:
        writer.writerow({k: r.get(k, 0) for k in keys})

t_end = time.time()
print("Done. Saved video:", OUTPUT_VIDEO)
print("Saved CSV:", TRACKS_CSV)
print(f"Frames: {frame_id}, Total time: {t_end - t_start:.1f}s, Achieved FPS: {frame_id / max(1, (t_end - t_start)):.2f}")


Seg overlay ENABLED using: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\weights\best.pt


  self.model.load_state_dict(torch.load(model_wts_path))


Running calibrated tracker -> KM/H overlay. This may take some minutes depending on GPU/CPU.


  arr = np.array(rseg.masks.data if hasattr(rseg.masks, "data") else rseg.masks.numpy())


Done. Saved video: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike_track_kmh_final.mp4
Saved CSV: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\tracks_kmh_final.csv
Frames: 870, Total time: 50.6s, Achieved FPS: 17.21


In [10]:
# Relative-TTC overlay (pixels) + optional audible beep
# Kernel: Python (adas)

from pathlib import Path
import cv2, time, csv, math, numpy as np, sys
import collections

# ---------- CONFIG ----------
VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")
TRACKS_CSV = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_final.csv")  # input CSV
OUT_VIDEO  = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike_ttc.mp4")
OUT_CSV    = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_final_with_ttc.csv")

TTC_THRESHOLD = 3.0     # seconds -> alarm when TTC < threshold
BEEP_ON_ALARM = True    # Windows only (uses winsound). Set False to disable sound.
ALARM_COOLDOWN = 2.0    # seconds between beeps per tracked object

# ---------- Sanity ----------
if not VIDEO_PATH.exists():
    raise SystemExit("Video not found: " + str(VIDEO_PATH))
if not TRACKS_CSV.exists():
    raise SystemExit("CSV not found: " + str(TRACKS_CSV))

# ---------- Load CSV into frame-indexed structure ----------
frame_index = collections.defaultdict(list)
all_rows = []
with open(TRACKS_CSV, newline='') as f:
    reader = csv.DictReader(f)
    for r in reader:
        # convert numeric fields
        r2 = dict(r)
        r2['frame'] = int(float(r['frame']))
        r2['track_id'] = int(float(r['track_id']))
        r2['x1'] = int(float(r['x1'])); r2['y1'] = int(float(r['y1']))
        r2['x2'] = int(float(r['x2'])); r2['y2'] = int(float(r['y2']))
        r2['speed_px_s'] = float(r.get('speed_px_s', 0.0))
        frame_index[r2['frame']].append(r2)
        all_rows.append(r2)

print(f"Loaded {len(all_rows)} track rows across {len(frame_index)} frames.")

# ---------- Video IO setup ----------
cap = cv2.VideoCapture(str(VIDEO_PATH))
if not cap.isOpened():
    raise SystemExit("Failed to open video: " + str(VIDEO_PATH))
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"avc1")   # H.264 (Windows friendly)
out = cv2.VideoWriter(str(OUT_VIDEO), fourcc, FPS, (W, H))

# color palette
rng = np.random.RandomState(7)
palette = (rng.randint(0,255,(256,3))).astype(np.int32)
PALETTE = [ (int(c[0]), int(c[1]), int(c[2])) for c in palette ]

# alarm state per track
last_alarm_time = {}   # track_id -> last alarm timestamp

# output CSV writer setup (we will append ttc + alarm)
out_rows = []
keys = None

frame_id = 0
t_start = time.time()
print("Processing video and computing TTC...")

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # find rows for this frame
    rows = frame_index.get(frame_id, [])

    # for each detection compute TTC
    for r in rows:
        tid = r['track_id']
        x1,y1,x2,y2 = r['x1'], r['y1'], r['x2'], r['y2']
        speed_px_s = r.get('speed_px_s', 0.0)

        # estimate distance in pixels (vertical distance from box bottom to bottom of image)
        dist_px = max(0.0, (H - y2))

        # compute TTC in seconds (handle very small speeds)
        if speed_px_s > 1e-3:
            ttc = dist_px / speed_px_s
        else:
            ttc = float('inf')

        alarm = False
        now = time.time()
        if ttc != float('inf') and ttc <= TTC_THRESHOLD:
            # check cooldown so we don't beep every frame for same object
            last = last_alarm_time.get(tid, -9999)
            if (now - last) >= ALARM_COOLDOWN:
                alarm = True
                last_alarm_time[tid] = now
                # beep (Windows)
                if BEEP_ON_ALARM:
                    try:
                        import winsound
                        # short beep 600Hz for 180ms
                        winsound.Beep(600, 180)
                    except Exception:
                        # not Windows or winsound unavailable — ignore
                        pass

        # draw overlays on frame
        color = PALETTE[tid % len(PALETTE)]
        if alarm:
            box_color = (0,0,255)   # red for alarm
        else:
            box_color = color
        cv2.rectangle(frame, (x1,y1), (x2,y2), box_color, 2)

        # label: ID + TTC or 'inf'
        label = f"ID:{tid} "
        if ttc == float('inf'):
            label += "TTC: inf"
        else:
            label += f"TTC: {ttc:.1f}s"
            if alarm:
                label += " !!"

        (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        bx1, by1 = x1, max(0, y1 - th - 6)
        bx2, by2 = x1 + tw + 6, by1 + th + 6
        cv2.rectangle(frame, (bx1, by1), (bx2, by2), box_color, -1)
        cv2.putText(frame, label, (x1+3, by2-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)

        # store output row
        out_row = dict(r)
        out_row['ttc_s'] = ttc
        out_row['alarm'] = int(alarm)
        out_rows.append(out_row)
        if keys is None:
            keys = list(out_row.keys())

    out.write(frame)
    frame_id += 1

# finish
cap.release()
out.release()

# save CSV
if keys is None:
    keys = ["frame","track_id","x1","y1","x2","y2","speed_px_s","ttc_s","alarm"]
with open(OUT_CSV, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=keys)
    writer.writeheader()
    for r in out_rows:
        writer.writerow(r)

t_end = time.time()
print("Done.")
print("Saved TTC video:", OUT_VIDEO)
print("Saved TTC CSV :", OUT_CSV)
print(f"Frames: {frame_id}, Total time: {t_end - t_start:.1f}s, Achieved FPS: {frame_id / max(1, (t_end - t_start)):.2f}")
print(f"Alarms triggered for tracks: {list(last_alarm_time.keys())}")


Loaded 5468 track rows across 868 frames.
Processing video and computing TTC...
Done.
Saved TTC video: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike_ttc.mp4
Saved TTC CSV : C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\tracks_kmh_final_with_ttc.csv
Frames: 870, Total time: 44.1s, Achieved FPS: 19.73
Alarms triggered for tracks: [1, 4, 10, 14, 15, 19, 18, 28, 33, 37, 32, 48, 54, 65, 72, 71, 79, 67, 81, 80, 90, 86, 97, 100, 105, 109, 118, 119, 122, 123, 132, 129, 130, 145, 152, 158, 144, 171, 178, 182, 186, 189, 193, 194, 197, 204, 212, 216, 219, 220, 230, 234, 238, 239, 244, 245, 246, 248, 253, 259, 270, 274, 272, 284, 288, 290, 291, 294, 305, 307, 309, 312]


In [11]:
# TTC refinement: smoothing + consecutive-frame confirmation + min box size filter
# Kernel: Python (adas)

from pathlib import Path
import csv, time, math, collections, numpy as np, cv2, winsound, sys

# ---------- CONFIG ----------
VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")
IN_CSV = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_final.csv")  # input you created earlier
OUT_VIDEO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike_ttc_confirmed.mp4")
OUT_CSV   = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_ttc_confirmed.csv")

TTC_THRESHOLD = 3.0          # seconds (same as before)
MIN_CONSECUTIVE = 3          # require TTC below threshold for this many frames to confirm alarm
SMOOTH_WINDOW = 3            # moving average window (in frames) for speed_px_s
MIN_BOX_AREA = 400           # ignore detections with bbox area < this (pixels^2)
ALARM_COOLDOWN = 2.0         # seconds between beeps per track
BEEP_ON_ALARM = True         # True to play beep (Windows)

# ---------- load CSV into frame-index structure ----------
if not IN_CSV.exists():
    raise SystemExit(f"Input CSV not found: {IN_CSV}")

frame_index = collections.defaultdict(list)
all_rows = []
with open(IN_CSV, newline='') as f:
    reader = csv.DictReader(f)
    for r in reader:
        fr = int(float(r['frame']))
        tid = int(float(r['track_id']))
        x1 = int(float(r['x1'])); y1 = int(float(r['y1']))
        x2 = int(float(r['x2'])); y2 = int(float(r['y2']))
        sp = float(r.get('speed_px_s', 0.0))
        entry = {"frame": fr, "track_id": tid, "x1": x1, "y1": y1, "x2": x2, "y2": y2, "speed_px_s": sp}
        frame_index[fr].append(entry)
        all_rows.append(entry)

print(f"Loaded {len(all_rows)} rows for {len(frame_index)} frames.")

# ---------- Build per-track time-ordered lists ----------
tracks = collections.defaultdict(list)
for r in all_rows:
    tracks[r['track_id']].append((r['frame'], r['speed_px_s'], r['x1'], r['y1'], r['x2'], r['y2']))

# sort and compute smoothed speed per frame per track
smoothed = collections.defaultdict(dict)  # smoothed[track_id][frame] = smoothed_speed_px_s
for tid, triplets in tracks.items():
    triplets_sorted = sorted(triplets, key=lambda x: x[0])
    speeds = [t[1] for t in triplets_sorted]
    frames = [t[0] for t in triplets_sorted]
    # moving average
    k = SMOOTH_WINDOW
    if k <= 1:
        sm = speeds
    else:
        sm = np.convolve(speeds, np.ones(k)/k, mode='same')
    for f, s in zip(frames, sm):
        smoothed[tid][f] = float(s)

# ---------- iterate video, compute TTC with smoothed speeds and confirm alarms ----------
cap = cv2.VideoCapture(str(VIDEO_PATH))
if not cap.isOpened():
    raise SystemExit("Failed to open video.")
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"avc1")
out = cv2.VideoWriter(str(OUT_VIDEO), fourcc, FPS, (W, H))

# per-track state for consecutive below-threshold
consec_below = collections.defaultdict(int)    # tid -> count
confirmed_alarm_state = collections.defaultdict(bool)  # tid -> True if currently alarmed (persist until reset)
last_alarm_time = collections.defaultdict(lambda: -9999)
rows_out = []

frame_id = 0
t0 = time.time()
print("Running refined TTC confirmation...")

while True:
    ret, frame = cap.read()
    if not ret:
        break
    rows = frame_index.get(frame_id, [])
    for r in rows:
        tid = r['track_id']
        x1,y1,x2,y2 = r['x1'], r['y1'], r['x2'], r['y2']
        w = max(0, x2 - x1); h = max(0, y2 - y1)
        area = w * h
        if area < MIN_BOX_AREA:
            # ignore small boxes
            ttc = float('inf')
            alarm = 0
            consec_below[tid] = 0
        else:
            sp = smoothed.get(tid, {}).get(frame_id, r['speed_px_s'])
            dist_px = max(0.0, (H - y2))
            ttc = dist_px / sp if sp > 1e-3 else float('inf')
            if ttc <= TTC_THRESHOLD:
                consec_below[tid] += 1
            else:
                consec_below[tid] = 0

            if consec_below[tid] >= MIN_CONSECUTIVE and not confirmed_alarm_state.get(tid, False):
                # confirm new alarm
                confirmed_alarm_state[tid] = True
                alarm = 1
                now = time.time()
                last_alarm_time[tid] = now
                if BEEP_ON_ALARM:
                    try:
                        winsound.Beep(700, 180)
                    except Exception:
                        pass
            elif confirmed_alarm_state.get(tid, False):
                # keep alarmed until TTC rises above threshold for MIN_CONSECUTIVE frames
                if ttc > TTC_THRESHOLD:
                    # count up a "clear" counter by reusing consec_below as negative counter
                    consec_below[tid] = 0
                    confirmed_alarm_state[tid] = False
                    alarm = 0
                else:
                    alarm = 1
            else:
                alarm = 0

        # draw box
        color = (0,0,255) if alarm else (int(100), int(220), int(100))
        cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)
        label = f"ID:{tid} TTC:{(ttc if ttc!=float('inf') else 9999):.1f}s"
        if alarm:
            label += " !!"
        (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        bx1, by1 = x1, max(0, y1 - th - 6)
        bx2, by2 = x1 + tw + 6, by1 + th + 6
        cv2.rectangle(frame, (bx1, by1), (bx2, by2), color, -1)
        cv2.putText(frame, label, (x1+3, by2-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)

        out_row = dict(r)
        out_row['ttc_s'] = ttc
        out_row['alarm_confirmed'] = int(alarm)
        rows_out.append(out_row)

    out.write(frame)
    frame_id += 1

cap.release()
out.release()

# save CSV
keys = ["frame","track_id","x1","y1","x2","y2","speed_px_s","ttc_s","alarm_confirmed"]
with open(OUT_CSV, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=keys)
    writer.writeheader()
    for r in rows_out:
        writer.writerow({k: r.get(k, 0) for k in keys})

t1 = time.time()
print("Done.")
print("Saved refined TTC video to:", OUT_VIDEO)
print("Saved refined TTC CSV to:", OUT_CSV)
print(f"Frames: {frame_id}, Total time: {t1 - t0:.1f}s, Achieved FPS: {frame_id / max(1, (t1 - t0)):.2f}")


Loaded 5468 rows for 868 frames.
Running refined TTC confirmation...
Done.
Saved refined TTC video to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike_ttc_confirmed.mp4
Saved refined TTC CSV to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\tracks_kmh_ttc_confirmed.csv
Frames: 870, Total time: 27.2s, Achieved FPS: 31.93


In [19]:
# Improved lane detection with dynamic bike-hood exclusion (test cell)
# Kernel: Python (adas)
import cv2, numpy as np, math
from pathlib import Path

VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")

def refined_lane_detection_v2(frame, seg_mask=None,
                               roi_top_frac=0.62,
                               center_search_w_frac=0.5,   # width fraction to search for hood
                               center_search_h_frac=0.45,  # height frac from bottom to search
                               dark_thresh=90,             # gray threshold to call "dark" pixels
                               min_dark_area=2000,         # min area to consider as bike hood
                               canny_low=50, canny_high=150,
                               hough_rho=1, hough_theta=np.pi/180, hough_thresh=30,
                               min_line_length=40, max_line_gap=20,
                               slope_thresh_min=0.35):
    """
    Returns left_line, right_line, lane_center, debug dict
    Each line is (x1,y1,x2,y2) or None. debug contains images to visualize.
    """
    H, W = frame.shape[:2]
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5,5), 0)

    # ROI lower part
    top_y = int(H * roi_top_frac)
    mask_roi = np.zeros_like(gray)
    poly = np.array([[(0,H),(0,top_y),(W,top_y),(W,H)]], dtype=np.int32)
    cv2.fillPoly(mask_roi, poly, 255)

    # detect dark large region inside center-bottom search window (bike hood / hands)
    sw = int(W * center_search_w_frac)
    sh = int(H * center_search_h_frac)
    sx1 = max(0, (W - sw)//2)
    sx2 = min(W, sx1 + sw)
    sy1 = max(top_y, H - sh)   # search lower portion only
    sy2 = H
    search_patch = gray[sy1:sy2, sx1:sx2]
    # threshold dark pixels
    _, dark_mask = cv2.threshold(search_patch, dark_thresh, 255, cv2.THRESH_BINARY_INV)
    # morphological close to combine regions
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15,15))
    dark_mask = cv2.morphologyEx(dark_mask, cv2.MORPH_CLOSE, kernel)
    # find contours and pick largest by area
    cnts, _ = cv2.findContours(dark_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    ex_mask = np.zeros_like(gray)
    hood_box = None
    if cnts:
        cnts = sorted(cnts, key=lambda c: cv2.contourArea(c), reverse=True)
        for c in cnts:
            area = cv2.contourArea(c)
            if area < min_dark_area:
                continue
            # bounding box in search_patch coords -> convert to full frame coords
            x,y,w,h = cv2.boundingRect(c)
            X1, Y1 = sx1 + x, sy1 + y
            X2, Y2 = X1 + w, Y1 + h
            # expand a bit
            pad_x = int(0.06 * W)
            pad_y = int(0.06 * H)
            X1 = max(0, X1 - pad_x); Y1 = max(0, Y1 - pad_y)
            X2 = min(W, X2 + pad_x); Y2 = min(H, Y2 + pad_y)
            hood_box = (X1, Y1, X2, Y2)
            cv2.rectangle(ex_mask, (X1, Y1), (X2, Y2), 255, -1)
            break

    # edges from Canny inside ROI
    edges = cv2.Canny(blur, canny_low, canny_high)
    edges_roi = cv2.bitwise_and(edges, mask_roi)
    # apply segmentation mask if available (assumes road>0)
    if seg_mask is not None:
        try:
            road_mask = (seg_mask > 0).astype(np.uint8) * 255
            edges_roi = cv2.bitwise_and(edges_roi, road_mask)
        except Exception:
            pass

    # zero out found hood region (exclude it)
    if hood_box is not None:
        X1, Y1, X2, Y2 = hood_box
        edges_roi[Y1:Y2, X1:X2] = 0

    # small morphological closing to reduce noise
    ker2 = cv2.getStructuringElement(cv2.MORPH_RECT, (5,5))
    edges_roi = cv2.morphologyEx(edges_roi, cv2.MORPH_CLOSE, ker2)

    # Hough
    raw = cv2.HoughLinesP(edges_roi, hough_rho, hough_theta, hough_thresh,
                          minLineLength=min_line_length, maxLineGap=max_line_gap)
    candidates = []
    if raw is not None:
        for l in raw:
            x1,y1,x2,y2 = l[0]
            if x2 == x1:
                slope = float('inf')
            else:
                slope = (y2-y1)/(x2-x1)
            if abs(slope) < slope_thresh_min:
                continue
            # discard lines mostly inside hood_box area
            if hood_box is not None:
                hx1, hy1, hx2, hy2 = hood_box
                midx = (x1+x2)//2; midy = (y1+y2)//2
                if hx1 <= midx <= hx2 and hy1 <= midy <= hy2:
                    continue
            candidates.append((x1,y1,x2,y2,slope))

    # split to left / right
    left_lines = [ (x1,y1,x2,y2) for (x1,y1,x2,y2,s) in candidates if s < 0 ]
    right_lines = [ (x1,y1,x2,y2) for (x1,y1,x2,y2,s) in candidates if s > 0 ]

    def aggregate(group):
        if not group:
            return None
        xs1 = np.array([g[0] for g in group]); ys1 = np.array([g[1] for g in group])
        xs2 = np.array([g[2] for g in group]); ys2 = np.array([g[3] for g in group])
        return (int(np.median(xs1)), int(np.median(ys1)), int(np.median(xs2)), int(np.median(ys2)))

    left = aggregate(left_lines)
    right = aggregate(right_lines)
    # lane center
    if left is None and right is None:
        lane_center = None
    elif left is None:
        lx1,ly1,lx2,ly2 = right
        lane_center = ((lx1 - W//4, ly1), (lx2 - W//4, ly2))
    elif right is None:
        rx1,ry1,rx2,ry2 = left
        lane_center = ((rx1 + W//4, ry1), (rx2 + W//4, ry2))
    else:
        lx1,ly1,lx2,ly2 = left; rx1,ry1,rx2,ry2 = right
        lane_center = ((int((lx1+rx1)/2), int((ly1+ry1)/2)), (int((lx2+rx2)/2), int((ly2+ry2)/2)))

    debug = {
        "edges_roi": edges_roi,
        "hood_box": hood_box,
        "raw_candidates": candidates,
        "left_lines": left_lines,
        "right_lines": right_lines
    }
    return left, right, lane_center, debug

# ---------------- single-frame test & visualization ----------------
cap = cv2.VideoCapture(str(VIDEO_PATH))
ret, frame0 = cap.read()
cap.release()
if not ret:
    raise SystemExit("Cannot read video frame for test.")

left, right, center, debug = refined_lane_detection_v2(frame0, seg_mask=None)

vis = frame0.copy()
# visualize hood box (if any)
if debug.get("hood_box") is not None:
    x1,y1,x2,y2 = debug["hood_box"]
    cv2.rectangle(vis, (x1,y1), (x2,y2), (0,0,255), 2)
# visualize edges
edges_bgr = cv2.cvtColor(debug["edges_roi"], cv2.COLOR_GRAY2BGR)
# draw candidate raw lines in yellow
for (x1,y1,x2,y2,s) in debug["raw_candidates"]:
    cv2.line(vis, (x1,y1), (x2,y2), (0,255,255), 1)
# draw aggregated left/right/center
if left is not None:
    cv2.line(vis, (left[0],left[1]), (left[2],left[3]), (0,255,0), 3)
if right is not None:
    cv2.line(vis, (right[0],right[1]), (right[2],right[3]), (0,255,0), 3)
if center is not None:
    (c1,c2) = center
    cv2.line(vis, (c1[0],c1[1]), (c2[0],c2[1]), (255,0,0), 3)

# show combined visualization side-by-side with edges
combined = np.hstack([cv2.resize(vis, (vis.shape[1]//1, vis.shape[0]//1)), cv2.resize(edges_bgr, (vis.shape[1]//1, vis.shape[0]//1))])
# If running in notebook, use cv2.imshow; otherwise save to file
try:
    cv2.imshow("lane test (L) + edges (R). Press any key to close", combined)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
except Exception:
    outp = Path("lane_test_debug.png")
    cv2.imwrite(str(outp), combined)
    print("Saved debug image to:", outp)


In [20]:
# Final ADAS pipeline with robust lane detection (dynamic hood exclusion)
# Kernel: Python (adas)
# Outputs:
#   - annotated video: output_riding_bike_adas_refined.mp4
#   - events CSV: adas_events_refined.csv

from pathlib import Path
import cv2, time, math, csv, numpy as np, collections, sys
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# ---------------- CONFIG ----------------
VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")
OUTPUT_VIDEO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike_adas_refined.mp4")
EVENTS_CSV = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/adas_events_refined.csv")

DETECT_WEIGHTS = "yolov8n.pt"
SEG_WEIGHTS = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/weights/best.pt")  # optional
TRACKS_CSV_HOMO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_final_homography.csv")
TRACKS_CSV = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_final.csv")

DEVICE = "cuda"
CONF_THRESH = 0.3

# lane detection / hood detection params (tweak if needed)
ROI_TOP_FRAC = 0.62
CENTER_SEARCH_W_FRAC = 0.5
CENTER_SEARCH_H_FRAC = 0.45
DARK_THRESH = 90
MIN_DARK_AREA = 2000
CANNY_LOW = 50
CANNY_HIGH = 150
HOUGH_THRESH = 30
MIN_LINE_LENGTH = 40
MAX_LINE_GAP = 20
SLOPE_MIN = 0.35

# ADAS thresholds
TTC_THRESHOLD = 3.0         # seconds
DEPARTURE_THRESHOLD_PX = 60 # fallback pixels threshold for lane departure
ALARM_COOLDOWN = 2.0
BEEP_ON_ALARM = True        # set False if no sound desired

# ---------------- sanity checks ----------------
if not VIDEO_PATH.exists():
    raise SystemExit("Input video not found: " + str(VIDEO_PATH))

# ---------------- lane detection helper (robust v2) ----------------
def refined_lane_detection_v2(frame, seg_mask=None,
                               roi_top_frac=ROI_TOP_FRAC,
                               center_search_w_frac=CENTER_SEARCH_W_FRAC,
                               center_search_h_frac=CENTER_SEARCH_H_FRAC,
                               dark_thresh=DARK_THRESH,
                               min_dark_area=MIN_DARK_AREA,
                               canny_low=CANNY_LOW, canny_high=CANNY_HIGH,
                               hough_rho=1, hough_theta=np.pi/180, hough_thresh=HOUGH_THRESH,
                               min_line_length=MIN_LINE_LENGTH, max_line_gap=MAX_LINE_GAP,
                               slope_thresh_min=SLOPE_MIN):
    H, W = frame.shape[:2]
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5,5), 0)

    top_y = int(H * roi_top_frac)
    mask_roi = np.zeros_like(gray)
    poly = np.array([[(0,H),(0,top_y),(W,top_y),(W,H)]], dtype=np.int32)
    cv2.fillPoly(mask_roi, poly, 255)

    # search for dark hood region in bottom-center patch
    sw = int(W * center_search_w_frac)
    sh = int(H * center_search_h_frac)
    sx1 = max(0, (W - sw)//2)
    sx2 = min(W, sx1 + sw)
    sy1 = max(top_y, H - sh)
    sy2 = H
    search_patch = gray[sy1:sy2, sx1:sx2]
    _, dark_mask = cv2.threshold(search_patch, dark_thresh, 255, cv2.THRESH_BINARY_INV)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15,15))
    dark_mask = cv2.morphologyEx(dark_mask, cv2.MORPH_CLOSE, kernel)
    cnts, _ = cv2.findContours(dark_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    ex_mask = np.zeros_like(gray)
    hood_box = None
    if cnts:
        cnts = sorted(cnts, key=lambda c: cv2.contourArea(c), reverse=True)
        for c in cnts:
            area = cv2.contourArea(c)
            if area < min_dark_area:
                continue
            x,y,w,h = cv2.boundingRect(c)
            X1, Y1 = sx1 + x, sy1 + y
            X2, Y2 = X1 + w, Y1 + h
            pad_x = int(0.06 * W); pad_y = int(0.06 * H)
            X1 = max(0, X1 - pad_x); Y1 = max(0, Y1 - pad_y)
            X2 = min(W, X2 + pad_x); Y2 = min(H, Y2 + pad_y)
            hood_box = (X1, Y1, X2, Y2)
            cv2.rectangle(ex_mask, (X1, Y1), (X2, Y2), 255, -1)
            break

    # edges in ROI, optionally masked by seg_mask
    edges = cv2.Canny(blur, canny_low, canny_high)
    edges_roi = cv2.bitwise_and(edges, mask_roi)
    if seg_mask is not None:
        try:
            road_mask = (seg_mask > 0).astype(np.uint8) * 255
            edges_roi = cv2.bitwise_and(edges_roi, road_mask)
        except Exception:
            pass

    # exclude hood area
    if hood_box is not None:
        X1,Y1,X2,Y2 = hood_box
        edges_roi[Y1:Y2, X1:X2] = 0

    ker2 = cv2.getStructuringElement(cv2.MORPH_RECT, (5,5))
    edges_roi = cv2.morphologyEx(edges_roi, cv2.MORPH_CLOSE, ker2)

    raw = cv2.HoughLinesP(edges_roi, hough_rho, hough_theta, hough_thresh,
                          minLineLength=min_line_length, maxLineGap=max_line_gap)
    candidates = []
    if raw is not None:
        for l in raw:
            x1,y1,x2,y2 = l[0]
            if x2 == x1:
                slope = float('inf')
            else:
                slope = (y2-y1)/(x2-x1)
            if abs(slope) < slope_thresh_min:
                continue
            if hood_box is not None:
                hx1,hy1,hx2,hy2 = hood_box
                midx = (x1+x2)//2; midy = (y1+y2)//2
                if hx1 <= midx <= hx2 and hy1 <= midy <= hy2:
                    continue
            candidates.append((x1,y1,x2,y2,slope))

    left_lines = [ (x1,y1,x2,y2) for (x1,y1,x2,y2,s) in candidates if s < 0 ]
    right_lines = [ (x1,y1,x2,y2) for (x1,y1,x2,y2,s) in candidates if s > 0 ]

    def aggregate(group):
        if not group:
            return None
        xs1 = np.array([g[0] for g in group]); ys1 = np.array([g[1] for g in group])
        xs2 = np.array([g[2] for g in group]); ys2 = np.array([g[3] for g in group])
        return (int(np.median(xs1)), int(np.median(ys1)), int(np.median(xs2)), int(np.median(ys2)))

    left = aggregate(left_lines)
    right = aggregate(right_lines)
    if left is None and right is None:
        lane_center = None
    elif left is None:
        lx1,ly1,lx2,ly2 = right
        lane_center = ((lx1 - W//4, ly1), (lx2 - W//4, ly2))
    elif right is None:
        rx1,ry1,rx2,ry2 = left
        lane_center = ((rx1 + W//4, ry1), (rx2 + W//4, ry2))
    else:
        lx1,ly1,lx2,ly2 = left; rx1,ry1,rx2,ry2 = right
        lane_center = ((int((lx1+rx1)/2), int((ly1+ry1)/2)), (int((lx2+rx2)/2), int((ly2+ry2)/2)))

    debug = {"edges_roi": edges_roi, "hood_box": hood_box, "candidates": candidates, "left_lines": left_lines, "right_lines": right_lines}
    return left, right, lane_center, debug

# ---------------- Load models & CSVs ----------------
print("Loading models...")
det_model = YOLO(DETECT_WEIGHTS)
seg_model = None
if SEG_WEIGHTS is not None and SEG_WEIGHTS.exists():
    seg_model = YOLO(str(SEG_WEIGHTS))
    print("Segmentation model loaded.")
else:
    print("Segmentation model not found - segmentation overlay disabled.")

use_homography = False
homography_rows_by_frame = {}
if TRACKS_CSV_HOMO.exists():
    with open(TRACKS_CSV_HOMO, newline='') as f:
        import csv
        reader = csv.DictReader(f)
        for r in reader:
            fr = int(float(r['frame']))
            homography_rows_by_frame.setdefault(fr, []).append(r)
    use_homography = True
    print("Using homography CSV for metric speeds/TTC:", TRACKS_CSV_HOMO)
elif TRACKS_CSV.exists():
    with open(TRACKS_CSV, newline='') as f:
        import csv
        rows = [r for r in csv.DictReader(f)]
    rows_by_frame = collections.defaultdict(list)
    for r in rows:
        rows_by_frame[int(float(r['frame']))].append(r)
    print("Using pixel-speed CSV for data:", TRACKS_CSV)
else:
    rows_by_frame = {}

# ---------------- Video IO & tracker ----------------
cap = cv2.VideoCapture(str(VIDEO_PATH))
if not cap.isOpened():
    raise SystemExit("Failed to open video.")
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"avc1")
out = cv2.VideoWriter(str(OUTPUT_VIDEO), fourcc, FPS, (W, H))
tracker = DeepSort(max_age=30)

palette = (np.random.RandomState(2).randint(0,255,(256,3))).astype(np.int32)
PALETTE = [ (int(c[0]), int(c[1]), int(c[2])) for c in palette ]

# ---------------- ADAS loop ----------------
events = []
last_alarm_time = collections.defaultdict(lambda: -9999)
last_centroid = {}
frame_id = 0
t_start = time.time()
print("Starting refined ADAS processing...")

while True:
    ret, frame = cap.read()
    if not ret:
        break
    t_frame = time.time()
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # segmentation overlay (fast on GPU; optional)
    seg_mask = None
    if seg_model is not None:
        try:
            rseg = seg_model.predict(frame_rgb, device=DEVICE, imgsz=640, verbose=False)[0]
            if hasattr(rseg, "masks") and rseg.masks is not None:
                arr = np.array(rseg.masks.data if hasattr(rseg.masks, "data") else rseg.masks.numpy())
                if arr.ndim == 3:
                    seg_mask = np.argmax(arr, axis=0).astype(np.int32)
                else:
                    seg_mask = arr.astype(np.int32)
        except Exception:
            seg_mask = None

    # lane detection (refined)
    left_line, right_line, lane_center, debug = refined_lane_detection_v2(frame, seg_mask=seg_mask)

    # optional: draw detected lane features
    frame_proc = frame.copy()
    if left_line is not None:
        cv2.line(frame_proc, (left_line[0],left_line[1]), (left_line[2],left_line[3]), (0,255,0), 3)
    if right_line is not None:
        cv2.line(frame_proc, (right_line[0],right_line[1]), (right_line[2],right_line[3]), (0,255,0), 3)
    if lane_center is not None:
        (c1,c2) = lane_center
        cv2.line(frame_proc, (c1[0],c1[1]), (c2[0],c2[1]), (255,0,0), 3)
    if debug.get("hood_box") is not None:
        X1,Y1,X2,Y2 = debug["hood_box"]
        cv2.rectangle(frame_proc, (X1,Y1), (X2,Y2), (0,0,255), 2)

    # detection + tracker input
    detections_for_tracker = []
    try:
        rdet = det_model.predict(frame_rgb, device=DEVICE, imgsz=640, conf=CONF_THRESH, verbose=False)[0]
        if hasattr(rdet, "boxes") and rdet.boxes is not None:
            xyxy = rdet.boxes.xyxy.cpu().numpy() if hasattr(rdet.boxes, "xyxy") else np.array(rdet.boxes.xyxy)
            confs = rdet.boxes.conf.cpu().numpy() if hasattr(rdet.boxes, "conf") else np.array(rdet.boxes.conf)
            clsids = rdet.boxes.cls.cpu().numpy() if hasattr(rdet.boxes, "cls") else np.array(rdet.boxes.cls)
            for (x1,y1,x2,y2), conf, clsid in zip(xyxy, confs, clsids):
                detections_for_tracker.append(([float(x1), float(y1), float(x2), float(y2)], float(conf), str(int(clsid))))
    except Exception as e:
        print("Detection error:", e, file=sys.stderr)

    tracks = tracker.update_tracks(detections_for_tracker, frame=frame_proc)

    # draw tracks and compute ADAS signals
    for tr in tracks:
        try:
            if hasattr(tr, "is_confirmed") and (not tr.is_confirmed()):
                continue
        except Exception:
            pass
        try:
            tid = int(tr.track_id)
        except Exception:
            tid = abs(hash(str(tr.track_id))) % 10000
        try:
            ltrb = tr.to_ltrb()
        except Exception:
            ltrb = tr.to_tlbr()
        x1,y1,x2,y2 = map(int, ltrb)
        cx = (x1+x2)//2; cy = y2
        color = PALETTE[tid % len(PALETTE)]

        # pixel speed fallback
        speed_px_s = 0.0
        speed_kmh_fallback = 0.0
        if tid in last_centroid:
            px,py,t_prev = last_centroid[tid]
            dt = max(1e-6, t_frame - t_prev)
            dist_px = math.hypot(cx - px, cy - py)
            speed_px_s = dist_px / dt
            speed_kmh_fallback = (speed_px_s) * 0.036
        last_centroid[tid] = (cx,cy,t_frame)

        # get metric speed and TTC if homography CSV available
        metric_speed_kmh = None
        metric_ttc = None
        if use_homography:
            fr_rows = homography_rows_by_frame.get(frame_id, [])
            match = None
            for r in fr_rows:
                if int(float(r['track_id'])) == tid:
                    match = r; break
            if match is not None:
                metric_speed_kmh = float(match.get('speed_kmh', 0.0))
                metric_ttc = float(match.get('ttc_s', float('inf')))
        else:
            metric_ttc = (H - y2) / (speed_px_s + 1e-6) if speed_px_s>1e-6 else float('inf')

        # lane departure: compute lateral px distance to lane_center
        lane_departure = False
        lateral_dist_px = None
        if lane_center is not None:
            (lx1,ly1),(lx2,ly2) = lane_center
            pxp, pyp = cx, cy
            num = abs((lx2-lx1)*(ly1-pyp) - (lx1-pxp)*(ly2-ly1))
            den = math.hypot(lx2-lx1, ly2-ly1) + 1e-6
            lateral_dist_px = num/den
            lane_departure = lateral_dist_px > DEPARTURE_THRESHOLD_PX

        # alarm decision
        alarm = (metric_ttc is not None and metric_ttc <= TTC_THRESHOLD)
        if alarm:
            now = time.time()
            last = last_alarm_time.get(tid, -9999)
            if (now - last) >= ALARM_COOLDOWN:
                last_alarm_time[tid] = now
                if BEEP_ON_ALARM:
                    try:
                        import winsound
                        winsound.Beep(750, 160)
                    except Exception:
                        pass

        # draw
        box_color = (0,0,255) if alarm or lane_departure else color
        cv2.rectangle(frame_proc, (x1,y1), (x2,y2), box_color, 2)
        label_parts = [f"ID:{tid}"]
        if metric_speed_kmh is not None:
            label_parts.append(f"{metric_speed_kmh:.1f}km/h")
        else:
            label_parts.append(f"{speed_kmh_fallback:.0f}px/s")
        if metric_ttc==float('inf'):
            label_parts.append("TTC:inf")
        else:
            label_parts.append(f"TTC:{metric_ttc:.1f}s")
        if lane_departure:
            label_parts.append("LANE-DEP")
        label = " ".join(label_parts)
        # draw label
        (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        bx1, by1 = x1, max(0, y1 - th - 6)
        bx2, by2 = x1 + tw + 6, by1 + th + 6
        cv2.rectangle(frame_proc, (bx1,by1), (bx2,by2), box_color, -1)
        cv2.putText(frame_proc, label, (x1+3, by2-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)

        events.append({
            "frame": frame_id, "time": t_frame - t_start, "track_id": tid,
            "x1": x1, "y1": y1, "x2": x2, "y2": y2,
            "speed_px_s": speed_px_s, "metric_speed_kmh": metric_speed_kmh,
            "ttc_s": metric_ttc, "lane_dep": int(lane_departure)
        })

    # HUD summary
    summary = f"Frame:{frame_id} Objects:{len(tracks)}"
    cv2.putText(frame_proc, summary, (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)

    out.write(frame_proc)
    frame_id += 1

# cleanup
cap.release()
out.release()

# save events CSV
keys = ["frame","time","track_id","x1","y1","x2","y2","speed_px_s","metric_speed_kmh","ttc_s","lane_dep"]
with open(EVENTS_CSV, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=keys)
    writer.writeheader()
    for r in events:
        writer.writerow({k: r.get(k, None) for k in keys})

print("DONE.")
print("Refined ADAS video saved to:", OUTPUT_VIDEO)
print("Events CSV saved to:", EVENTS_CSV)


Loading models...
Segmentation model loaded.
Using pixel-speed CSV for data: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\tracks_kmh_final.csv
Starting refined ADAS processing...


  arr = np.array(rseg.masks.data if hasattr(rseg.masks, "data") else rseg.masks.numpy())


DONE.
Refined ADAS video saved to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike_adas_refined.mp4
Events CSV saved to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\adas_events_refined.csv


In [21]:
# Full ADAS pipeline (single cell)
# - Uses segmentation to remove hood/driver from lane detection
# - Robust lane detection, detection+DeepSORT tracking
# - Metric TTC if homography CSV available, else relative TTC
# - Outputs video and events CSV
#
# Kernel: Python (adas)
# Edit paths below if needed, then run.

from pathlib import Path
import cv2, time, math, csv, numpy as np, collections, sys
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# ---------------- CONFIG ----------------
VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")
OUTPUT_VIDEO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike_adas_segmask.mp4")
EVENTS_CSV = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/adas_events_segmask.csv")

DETECT_WEIGHTS = "yolov8n.pt"
SEG_WEIGHTS = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/weights/best.pt")  # segmentation model (optional)
TRACKS_CSV_HOMO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_final_homography.csv")
TRACKS_CSV = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tracks_kmh_final.csv")

DEVICE = "cuda"   # or "cpu"
CONF_THRESH = 0.3

# lane detection params
ROI_TOP_FRAC = 0.62
CENTER_SEARCH_W_FRAC = 0.5
CENTER_SEARCH_H_FRAC = 0.45
DARK_THRESH = 90
MIN_DARK_AREA = 2000
CANNY_LOW = 50
CANNY_HIGH = 150
HOUGH_THRESH = 30
MIN_LINE_LENGTH = 40
MAX_LINE_GAP = 20
SLOPE_MIN = 0.35

# ADAS thresholds
TTC_THRESHOLD = 3.0         # seconds
DEPARTURE_THRESHOLD_PX = 60 # fallback pixels threshold for lane departure
ALARM_COOLDOWN = 2.0
BEEP_ON_ALARM = True

# segmentation mapping: which segmentation class ids mean "drivable/road"?
# Based on your dataset earlier, index 0 corresponded to 'drivable'. Adjust if different.
ROAD_CLASSES = {0}

# ---------------- Sanity checks ----------------
if not VIDEO_PATH.exists():
    raise SystemExit("Input video not found: " + str(VIDEO_PATH))

# ---------------- Helpers ----------------
def lane_mask_from_seg(seg_mask, road_classes=ROAD_CLASSES):
    """Return binary road mask (255 road, 0 else) from seg_mask (H,W ints)."""
    if seg_mask is None:
        return None
    try:
        mask = np.zeros_like(seg_mask, dtype=np.uint8)
        for rc in road_classes:
            mask[seg_mask == rc] = 255
        return mask
    except Exception:
        return None

def refined_lane_detection_with_seg(frame, seg_mask=None,
                                    roi_top_frac=ROI_TOP_FRAC,
                                    center_search_w_frac=CENTER_SEARCH_W_FRAC,
                                    center_search_h_frac=CENTER_SEARCH_H_FRAC,
                                    dark_thresh=DARK_THRESH, min_dark_area=MIN_DARK_AREA,
                                    canny_low=CANNY_LOW, canny_high=CANNY_HIGH,
                                    hough_rho=1, hough_theta=np.pi/180, hough_thresh=HOUGH_THRESH,
                                    min_line_length=MIN_LINE_LENGTH, max_line_gap=MAX_LINE_GAP,
                                    slope_thresh_min=SLOPE_MIN):
    """Return left_line, right_line, lane_center, debug dict."""
    H, W = frame.shape[:2]
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5,5), 0)

    top_y = int(H * roi_top_frac)
    # ROI mask (lower portion)
    mask_roi = np.zeros_like(gray)
    poly = np.array([[(0,H),(0,top_y),(W,top_y),(W,H)]], dtype=np.int32)
    cv2.fillPoly(mask_roi, poly, 255)

    # search for dark hood region in bottom-center patch (fallback)
    sw = int(W * center_search_w_frac)
    sh = int(H * center_search_h_frac)
    sx1 = max(0, (W - sw)//2)
    sx2 = min(W, sx1 + sw)
    sy1 = max(top_y, H - sh)
    sy2 = H
    search_patch = gray[sy1:sy2, sx1:sx2]
    _, dark_mask = cv2.threshold(search_patch, dark_thresh, 255, cv2.THRESH_BINARY_INV)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15,15))
    dark_mask = cv2.morphologyEx(dark_mask, cv2.MORPH_CLOSE, kernel)
    cnts, _ = cv2.findContours(dark_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    ex_mask = np.zeros_like(gray)
    hood_box = None
    if cnts:
        cnts = sorted(cnts, key=lambda c: cv2.contourArea(c), reverse=True)
        for c in cnts:
            area = cv2.contourArea(c)
            if area < min_dark_area:
                continue
            x,y,w,h = cv2.boundingRect(c)
            X1, Y1 = sx1 + x, sy1 + y
            X2, Y2 = X1 + w, Y1 + h
            pad_x = int(0.06 * W); pad_y = int(0.06 * H)
            X1 = max(0, X1 - pad_x); Y1 = max(0, Y1 - pad_y)
            X2 = min(W, X2 + pad_x); Y2 = min(H, Y2 + pad_y)
            hood_box = (X1, Y1, X2, Y2)
            cv2.rectangle(ex_mask, (X1, Y1), (X2, Y2), 255, -1)
            break

    # edges in ROI
    edges = cv2.Canny(blur, canny_low, canny_high)
    edges_roi = cv2.bitwise_and(edges, mask_roi)

    # segmentation-based road mask if available -> keep only edges on road
    road_mask = lane_mask_from_seg(seg_mask)
    if road_mask is not None:
        if road_mask.shape != edges_roi.shape:
            road_mask = cv2.resize(road_mask, (edges_roi.shape[1], edges_roi.shape[0]), interpolation=cv2.INTER_NEAREST)
        edges_roi = cv2.bitwise_and(edges_roi, road_mask)

    # exclude detected hood region (if any)
    if hood_box is not None:
        X1, Y1, X2, Y2 = hood_box
        edges_roi[Y1:Y2, X1:X2] = 0

    # small morphological closing
    ker2 = cv2.getStructuringElement(cv2.MORPH_RECT, (5,5))
    edges_roi = cv2.morphologyEx(edges_roi, cv2.MORPH_CLOSE, ker2)

    # Hough
    raw = cv2.HoughLinesP(edges_roi, hough_rho, hough_theta, hough_thresh,
                          minLineLength=min_line_length, maxLineGap=max_line_gap)
    candidates = []
    if raw is not None:
        for l in raw:
            x1,y1,x2,y2 = l[0]
            if x2 == x1:
                slope = float('inf')
            else:
                slope = (y2-y1)/(x2-x1)
            if abs(slope) < slope_thresh_min:
                continue
            # discard lines mostly inside hood_box area
            if hood_box is not None:
                hx1,hy1,hx2,hy2 = hood_box
                midx = (x1+x2)//2; midy = (y1+y2)//2
                if hx1 <= midx <= hx2 and hy1 <= midy <= hy2:
                    continue
            candidates.append((x1,y1,x2,y2,slope))

    left_lines = [ (x1,y1,x2,y2) for (x1,y1,x2,y2,s) in candidates if s < 0 ]
    right_lines = [ (x1,y1,x2,y2) for (x1,y1,x2,y2,s) in candidates if s > 0 ]

    def aggregate(group):
        if not group:
            return None
        xs1 = np.array([g[0] for g in group]); ys1 = np.array([g[1] for g in group])
        xs2 = np.array([g[2] for g in group]); ys2 = np.array([g[3] for g in group])
        return (int(np.median(xs1)), int(np.median(ys1)), int(np.median(xs2)), int(np.median(ys2)))

    left = aggregate(left_lines)
    right = aggregate(right_lines)

    if left is None and right is None:
        lane_center = None
    elif left is None:
        lx1,ly1,lx2,ly2 = right
        lane_center = ((lx1 - W//4, ly1), (lx2 - W//4, ly2))
    elif right is None:
        rx1,ry1,rx2,ry2 = left
        lane_center = ((rx1 + W//4, ry1), (rx2 + W//4, ry2))
    else:
        lx1,ly1,lx2,ly2 = left; rx1,ry1,rx2,ry2 = right
        lane_center = ((int((lx1+rx1)/2), int((ly1+ry1)/2)), (int((lx2+rx2)/2), int((ly2+ry2)/2)))

    debug = {"edges_roi": edges_roi, "hood_box": hood_box, "candidates": candidates, "left_lines": left_lines, "right_lines": right_lines, "road_mask": road_mask}
    return left, right, lane_center, debug

# ---------------- Load models ----------------
print("Loading models...")
det_model = YOLO(DETECT_WEIGHTS)
seg_model = None
if SEG_WEIGHTS is not None and SEG_WEIGHTS.exists():
    try:
        seg_model = YOLO(str(SEG_WEIGHTS))
        print("Segmentation model loaded.")
    except Exception as e:
        print("Segmentation load failed:", e)
else:
    print("Segmentation model not found - will use dark-region fallback only.")

# ---------------- load homography CSV if exists ----------------
use_homography = False
homography_rows_by_frame = {}
if TRACKS_CSV_HOMO.exists():
    with open(TRACKS_CSV_HOMO, newline='') as f:
        reader = csv.DictReader(f)
        for r in reader:
            fr = int(float(r['frame']))
            homography_rows_by_frame.setdefault(fr, []).append(r)
    use_homography = True
    print("Using homography CSV for metric speeds/TTC:", TRACKS_CSV_HOMO)
elif TRACKS_CSV.exists():
    with open(TRACKS_CSV, newline='') as f:
        rows = [r for r in csv.DictReader(f)]
    rows_by_frame = collections.defaultdict(list)
    for r in rows:
        rows_by_frame[int(float(r['frame']))].append(r)
    print("Using pixel-speed CSV for data:", TRACKS_CSV)
else:
    rows_by_frame = {}
    print("No CSV found: metric data unavailable; will compute relative metrics.")

# ---------------- Video IO & tracker ----------------
cap = cv2.VideoCapture(str(VIDEO_PATH))
if not cap.isOpened():
    raise SystemExit("Failed to open video.")
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"avc1")
out = cv2.VideoWriter(str(OUTPUT_VIDEO), fourcc, FPS, (W, H))
tracker = DeepSort(max_age=30)

palette = (np.random.RandomState(2).randint(0,255,(256,3))).astype(np.int32)
PALETTE = [ (int(c[0]), int(c[1]), int(c[2])) for c in palette ]

# ---------------- ADAS loop ----------------
events = []
last_alarm_time = collections.defaultdict(lambda: -9999)
last_centroid = {}
frame_id = 0
t_start = time.time()
print("Starting ADAS processing with segmentation-based hood removal...")

while True:
    ret, frame = cap.read()
    if not ret:
        break
    t_frame = time.time()
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # segmentation overlay & seg_mask (we need seg_mask to get road mask)
    seg_mask = None
    if seg_model is not None:
        try:
            rseg = seg_model.predict(frame_rgb, device=DEVICE, imgsz=640, verbose=False)[0]
            if hasattr(rseg, "masks") and rseg.masks is not None:
                arr = np.array(rseg.masks.data if hasattr(rseg.masks, "data") else rseg.masks.numpy())
                # if arr is (C,H,W) where channels are class maps, choose argmax; else arr is label map
                if arr.ndim == 3:
                    seg_mask = np.argmax(arr, axis=0).astype(np.int32)
                else:
                    seg_mask = arr.astype(np.int32)
        except Exception:
            seg_mask = None

    # lane detection with seg mask to remove hood reliably
    left_line, right_line, lane_center, debug = refined_lane_detection_with_seg(frame, seg_mask=seg_mask)

    # draw lane visualization
    frame_proc = frame.copy()
    if left_line is not None:
        cv2.line(frame_proc, (left_line[0],left_line[1]), (left_line[2],left_line[3]), (0,255,0), 3)
    if right_line is not None:
        cv2.line(frame_proc, (right_line[0],right_line[1]), (right_line[2],right_line[3]), (0,255,0), 3)
    if lane_center is not None:
        (c1,c2) = lane_center
        cv2.line(frame_proc, (c1[0],c1[1]), (c2[0],c2[1]), (255,0,0), 3)
    if debug.get("hood_box") is not None:
        X1,Y1,X2,Y2 = debug["hood_box"]
        cv2.rectangle(frame_proc, (X1,Y1), (X2,Y2), (0,0,255), 2)

    # detection -> tracker
    detections_for_tracker = []
    try:
        rdet = det_model.predict(frame_rgb, device=DEVICE, imgsz=640, conf=CONF_THRESH, verbose=False)[0]
        if hasattr(rdet, "boxes") and rdet.boxes is not None:
            xyxy = rdet.boxes.xyxy.cpu().numpy() if hasattr(rdet.boxes, "xyxy") else np.array(rdet.boxes.xyxy)
            confs = rdet.boxes.conf.cpu().numpy() if hasattr(rdet.boxes, "conf") else np.array(rdet.boxes.conf)
            clsids = rdet.boxes.cls.cpu().numpy() if hasattr(rdet.boxes, "cls") else np.array(rdet.boxes.cls)
            for (x1,y1,x2,y2), conf, clsid in zip(xyxy, confs, clsids):
                detections_for_tracker.append(([float(x1), float(y1), float(x2), float(y2)], float(conf), str(int(clsid))))
    except Exception as e:
        print("Detection error:", e, file=sys.stderr)

    tracks = tracker.update_tracks(detections_for_tracker, frame=frame_proc)

    # draw tracks & ADAS logic
    for tr in tracks:
        try:
            if hasattr(tr, "is_confirmed") and (not tr.is_confirmed()):
                continue
        except Exception:
            pass
        try:
            tid = int(tr.track_id)
        except Exception:
            tid = abs(hash(str(tr.track_id))) % 10000
        try:
            ltrb = tr.to_ltrb()
        except Exception:
            ltrb = tr.to_tlbr()
        x1,y1,x2,y2 = map(int, ltrb)
        cx = (x1+x2)//2; cy = y2
        color = PALETTE[tid % len(PALETTE)]

        # pixel speed fallback
        speed_px_s = 0.0
        speed_kmh_fallback = 0.0
        if tid in last_centroid:
            px,py,t_prev = last_centroid[tid]
            dt = max(1e-6, t_frame - t_prev)
            dist_px = math.hypot(cx - px, cy - py)
            speed_px_s = dist_px / dt
            speed_kmh_fallback = (speed_px_s) * 0.036
        last_centroid[tid] = (cx,cy,t_frame)

        # metric speed & TTC from homography CSV if available
        metric_speed_kmh = None
        metric_ttc = None
        if use_homography:
            fr_rows = homography_rows_by_frame.get(frame_id, [])
            match = None
            for r in fr_rows:
                if int(float(r['track_id'])) == tid:
                    match = r; break
            if match is not None:
                metric_speed_kmh = float(match.get('speed_kmh', 0.0))
                metric_ttc = float(match.get('ttc_s', float('inf')))
        else:
            metric_ttc = (H - y2) / (speed_px_s + 1e-6) if speed_px_s>1e-6 else float('inf')

        # lane departure px distance to lane_center
        lane_departure = False
        lateral_dist_px = None
        if lane_center is not None:
            (lx1,ly1),(lx2,ly2) = lane_center
            pxp, pyp = cx, cy
            num = abs((lx2-lx1)*(ly1-pyp) - (lx1-pxp)*(ly2-ly1))
            den = math.hypot(lx2-lx1, ly2-ly1) + 1e-6
            lateral_dist_px = num/den
            lane_departure = lateral_dist_px > DEPARTURE_THRESHOLD_PX

        # alarm decision
        alarm = (metric_ttc is not None and metric_ttc <= TTC_THRESHOLD)
        if alarm:
            now = time.time()
            last = last_alarm_time.get(tid, -9999)
            if (now - last) >= ALARM_COOLDOWN:
                last_alarm_time[tid] = now
                if BEEP_ON_ALARM:
                    try:
                        import winsound
                        winsound.Beep(750, 160)
                    except Exception:
                        pass

        # draw
        box_color = (0,0,255) if alarm or lane_departure else color
        cv2.rectangle(frame_proc, (x1,y1), (x2,y2), box_color, 2)
        label_parts = [f"ID:{tid}"]
        if metric_speed_kmh is not None:
            label_parts.append(f"{metric_speed_kmh:.1f}km/h")
        else:
            label_parts.append(f"{speed_kmh_fallback:.0f}px/s")
        if metric_ttc==float('inf'):
            label_parts.append("TTC:inf")
        else:
            label_parts.append(f"TTC:{metric_ttc:.1f}s")
        if lane_departure:
            label_parts.append("LANE-DEP")
        label = " ".join(label_parts)
        (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        bx1, by1 = x1, max(0, y1 - th - 6)
        bx2, by2 = x1 + tw + 6, by1 + th + 6
        cv2.rectangle(frame_proc, (bx1,by1), (bx2,by2), box_color, -1)
        cv2.putText(frame_proc, label, (x1+3, by2-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)

        events.append({
            "frame": frame_id, "time": t_frame - t_start, "track_id": tid,
            "x1": x1, "y1": y1, "x2": x2, "y2": y2,
            "speed_px_s": speed_px_s, "metric_speed_kmh": metric_speed_kmh,
            "ttc_s": metric_ttc, "lane_dep": int(lane_departure)
        })

    # HUD summary
    summary = f"Frame:{frame_id} Objects:{len(tracks)}"
    cv2.putText(frame_proc, summary, (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)

    out.write(frame_proc)
    frame_id += 1

# cleanup
cap.release()
out.release()

# save events CSV
keys = ["frame","time","track_id","x1","y1","x2","y2","speed_px_s","metric_speed_kmh","ttc_s","lane_dep"]
with open(EVENTS_CSV, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=keys)
    writer.writeheader()
    for r in events:
        writer.writerow({k: r.get(k, None) for k in keys})

print("DONE.")
print("ADAS video saved to:", OUTPUT_VIDEO)
print("Events CSV saved to:", EVENTS_CSV)


Loading models...
Segmentation model loaded.
Using pixel-speed CSV for data: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\tracks_kmh_final.csv
Starting ADAS processing with segmentation-based hood removal...


  arr = np.array(rseg.masks.data if hasattr(rseg.masks, "data") else rseg.masks.numpy())


DONE.
ADAS video saved to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike_adas_segmask.mp4
Events CSV saved to: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\adas_events_segmask.csv


In [22]:
# ADAS + Traffic Light Detection (integrated)
# Kernel: Python (adas)
# Outputs: video with TLD overlays and events CSV including traffic_light_state
#
# Paste and run. Adjust PATHS if needed.

from pathlib import Path
import cv2, time, math, csv, numpy as np, collections, sys
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# ---------------- CONFIG ----------------
VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")
OUTPUT_VIDEO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_riding_bike_adas_tld.mp4")
EVENTS_CSV = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/adas_events_tld.csv")

DETECT_WEIGHTS = "yolov8n.pt"   # detection model (COCO may contain traffic light class)
SEG_WEIGHTS = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/weights/best.pt")  # optional
USE_MODEL_TL_CLASS_ID = 9  # COCO traffic light class id (if model trained on COCO or similar)

DEVICE = "cuda"
CONF_THRESH = 0.25

# TLD params (upper ROI)
TL_ROI_TOP_FRAC = 0.0    # top of ROI (as fraction of H)
TL_ROI_BOTTOM_FRAC = 0.45  # bottom of ROI (upper 45% of frame)
MIN_TL_AREA = 20         # tiny blobs ignored

# color thresholds (HSV) for fallback detection
# red uses two ranges (low and high hue)
RED_RANGE1 = ((0, 80, 50), (10, 255, 255))
RED_RANGE2 = ((160, 80, 50), (179, 255, 255))
YELLOW_RANGE = ((15, 100, 100), (35, 255, 255))
GREEN_RANGE = ((40, 60, 60), (90, 255, 255))

# other ADAS params (kept modest)
TTC_THRESHOLD = 3.0
DEPARTURE_THRESHOLD_PX = 60
ALARM_COOLDOWN = 2.0
BEEP_ON_ALARM = True

# ---------------- sanity ----------------
if not VIDEO_PATH.exists():
    raise SystemExit("Video not found: " + str(VIDEO_PATH))

# ---------------- load models ----------------
print("Loading YOLO model...")
det_model = YOLO(DETECT_WEIGHTS)

seg_model = None
if SEG_WEIGHTS is not None and SEG_WEIGHTS.exists():
    try:
        seg_model = YOLO(str(SEG_WEIGHTS))
        print("Segmentation model loaded.")
    except Exception as e:
        print("Segmentation load failed:", e)

# ---------------- helper: color-based traffic light detector (fallback) ----------------
def detect_tl_color_fallback(frame, tl_roi_top_frac=TL_ROI_TOP_FRAC, tl_roi_bottom_frac=TL_ROI_BOTTOM_FRAC):
    """Return best (state, bbox) or (None, None). state in {'red','yellow','green'}"""
    H,W = frame.shape[:2]
    y1 = int(H * tl_roi_top_frac)
    y2 = int(H * tl_roi_bottom_frac)
    roi = frame[y1:y2, 0:W]
    if roi.size == 0:
        return None, None
    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)

    def mask_from_range(rng):
        lower, upper = rng
        lower = np.array(lower, dtype=np.uint8); upper = np.array(upper, dtype=np.uint8)
        return cv2.inRange(hsv, lower, upper)

    # red (two ranges)
    m1 = mask_from_range(RED_RANGE1)
    m2 = mask_from_range(RED_RANGE2)
    red_mask = cv2.bitwise_or(m1, m2)
    yellow_mask = mask_from_range(YELLOW_RANGE)
    green_mask = mask_from_range(GREEN_RANGE)

    # morphological cleanup
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))
    for m in (red_mask, yellow_mask, green_mask):
        cv2.morphologyEx(m, cv2.MORPH_OPEN, kernel, dst=m)

    def find_largest(mask):
        cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        if not cnts:
            return None
        cnts = sorted(cnts, key=cv2.contourArea, reverse=True)
        c = cnts[0]
        a = cv2.contourArea(c)
        if a < MIN_TL_AREA:
            return None
        x,y,w,h = cv2.boundingRect(c)
        # convert to full-frame coords
        return (x, y + y1, x+w, y + y1 + h, a)

    # check red, yellow, green in priority order red->yellow->green
    red_box = find_largest(red_mask)
    if red_box is not None:
        return "red", red_box
    yellow_box = find_largest(yellow_mask)
    if yellow_box is not None:
        return "yellow", yellow_box
    green_box = find_largest(green_mask)
    if green_box is not None:
        return "green", green_box
    return None, None

# ---------------- video IO & tracker ----------------
cap = cv2.VideoCapture(str(VIDEO_PATH))
if not cap.isOpened():
    raise SystemExit("Failed to open video.")
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"avc1")  # H.264
out = cv2.VideoWriter(str(OUTPUT_VIDEO), fourcc, FPS, (W, H))
tracker = DeepSort(max_age=30)

palette = (np.random.RandomState(2).randint(0,255,(256,3))).astype(np.int32)
PALETTE = [ (int(c[0]), int(c[1]), int(c[2])) for c in palette ]

# ---------------- main loop ----------------
events = []
last_alarm_time = collections.defaultdict(lambda: -9999)
last_centroid = {}
frame_id = 0
t_start = time.time()

# Determine whether model likely has traffic-light class by running a small probe (first frame)
probe_has_tl_class = False
try:
    # load first frame to probe
    cap_probe = cv2.VideoCapture(str(VIDEO_PATH))
    retp, fp = cap_probe.read()
    cap_probe.release()
    if retp:
        rprobe = det_model.predict(cv2.cvtColor(fp, cv2.COLOR_BGR2RGB), device=DEVICE, imgsz=640, conf=0.4, verbose=False)[0]
        if hasattr(rprobe, "boxes") and rprobe.boxes is not None and hasattr(rprobe, "names"):
            # check whether the names mapping includes 'traffic light' or class id 9 exists
            names_map = rprobe.names if hasattr(rprobe, "names") else {}
            # Look for name containing 'traffic' or id=9 known
            if any(("traffic" in str(n).lower() or str(n).lower()=="traffic light" or i==USE_MODEL_TL_CLASS_ID) for i,n in names_map.items()):
                probe_has_tl_class = True
except Exception:
    probe_has_tl_class = False

print("Traffic-light class from model present?" , probe_has_tl_class)

print("Starting ADAS + TLD processing...")
while True:
    ret, frame = cap.read()
    if not ret:
        break
    t_frame = time.time()
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Run detection (YOLO) once per frame (we'll reuse for TLD if model supports class)
    detections_for_tracker = []
    try:
        rdet = det_model.predict(frame_rgb, device=DEVICE, imgsz=640, conf=CONF_THRESH, verbose=False)[0]
        # collect boxes for tracker
        if hasattr(rdet, "boxes") and rdet.boxes is not None:
            xyxy = rdet.boxes.xyxy.cpu().numpy() if hasattr(rdet.boxes, "xyxy") else np.array(rdet.boxes.xyxy)
            confs = rdet.boxes.conf.cpu().numpy() if hasattr(rdet.boxes, "conf") else np.array(rdet.boxes.conf)
            clsids = rdet.boxes.cls.cpu().numpy() if hasattr(rdet.boxes, "cls") else np.array(rdet.boxes.cls)
            for (x1,y1,x2,y2), conf, clsid in zip(xyxy, confs, clsids):
                detections_for_tracker.append(([float(x1), float(y1), float(x2), float(y2)], float(conf), str(int(clsid))))
    except Exception as e:
        print("Detection error:", e, file=sys.stderr)
        rdet = None

    # Track
    tracks = tracker.update_tracks(detections_for_tracker, frame=frame)

    # Traffic light detection: prefer model-based if available
    tl_state = None
    tl_box = None
    try:
        if probe_has_tl_class and rdet is not None:
            # parse rdet boxes and names
            names = rdet.names if hasattr(rdet, "names") else {}
            # iterate predicted boxes and find traffic-light class (either by name or id)
            if hasattr(rdet, "boxes") and rdet.boxes is not None:
                xyxy = rdet.boxes.xyxy.cpu().numpy() if hasattr(rdet.boxes, "xyxy") else np.array(rdet.boxes.xyxy)
                confs = rdet.boxes.conf.cpu().numpy() if hasattr(rdet.boxes, "conf") else np.array(rdet.boxes.conf)
                clsids = rdet.boxes.cls.cpu().numpy() if hasattr(rdet.boxes, "cls") else np.array(rdet.boxes.cls)
                best_conf = 0.0
                for (x1,y1,x2,y2), conf, clsid in zip(xyxy, confs, clsids):
                    clsid = int(clsid)
                    name = str(names.get(clsid, "")).lower() if names is not None else ""
                    if ("traffic" in name) or (clsid == USE_MODEL_TL_CLASS_ID):
                        if conf > best_conf:
                            best_conf = float(conf)
                            tl_state = "detected"   # model may not give color; we optionally classify color below by cropping
                            tl_box = (int(x1), int(y1), int(x2), int(y2))
                # if we found a TL box, try color classification inside it
                if tl_box is not None:
                    x1,y1,x2,y2 = tl_box
                    crop = frame[y1:y2, x1:x2]
                    # classify dominant color using HSV mean
                    if crop.size > 0:
                        hsv = cv2.cvtColor(crop, cv2.COLOR_BGR2HSV)
                        h_mean = hsv[...,0].mean()
                        s_mean = hsv[...,1].mean()
                        v_mean = hsv[...,2].mean()
                        # heuristic thresholds
                        if ( (h_mean < 15 or h_mean>160) and s_mean>80 and v_mean>50):
                            tl_state = "red"
                        elif (15 <= h_mean <= 40 and s_mean>90):
                            tl_state = "yellow"
                        elif (40 < h_mean <= 100 and s_mean>60):
                            tl_state = "green"
                        else:
                            tl_state = "unknown"
        # model-based not available or no TL box found -> fallback to color-based ROI search
        if tl_state is None:
            st, box = detect_tl_color_fallback(frame)
            if st is not None:
                tl_state, tl_box = st, box[:4]
    except Exception as e:
        # any error in TL detection -> fallback color detect
        tl_state = None; tl_box = None
        try:
            st, box = detect_tl_color_fallback(frame)
            if st is not None:
                tl_state, tl_box = st, box[:4]
        except Exception:
            pass

    # draw TL overlay if found
    frame_proc = frame.copy()
    if tl_box is not None:
        x1,y1,x2,y2 = map(int, tl_box)
        color = (0,0,255) if tl_state=="red" else (0,255,255) if tl_state=="yellow" else (0,255,0) if tl_state=="green" else (200,200,200)
        cv2.rectangle(frame_proc, (x1,y1), (x2,y2), color, 2)
        txt = f"TL:{tl_state}"
        cv2.putText(frame_proc, txt, (x1, max(0,y1-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    # draw tracked objects and keep ADAS logging (similar to before but simplified)
    for tr in tracks:
        try:
            if hasattr(tr, "is_confirmed") and (not tr.is_confirmed()):
                continue
        except Exception:
            pass
        try:
            tid = int(tr.track_id)
        except Exception:
            tid = abs(hash(str(tr.track_id))) % 10000
        try:
            ltrb = tr.to_ltrb()
        except Exception:
            ltrb = tr.to_tlbr()
        x1,y1,x2,y2 = map(int, ltrb)
        cx = (x1+x2)//2; cy = y2
        color = PALETTE[tid % len(PALETTE)]

        # compute pixel speed fallback
        speed_px_s = 0.0
        if tid in last_centroid:
            px,py,t_prev = last_centroid[tid]
            dt = max(1e-6, t_frame - t_prev)
            dist_px = math.hypot(cx - px, cy - py)
            speed_px_s = dist_px / dt
        last_centroid[tid] = (cx,cy,t_frame)

        # draw box & label
        cv2.rectangle(frame_proc, (x1,y1), (x2,y2), color, 2)
        cv2.putText(frame_proc, f"ID:{tid}", (x1, max(0,y1-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        events.append({
            "frame": frame_id, "time": t_frame - t_start, "track_id": tid,
            "x1": x1, "y1": y1, "x2": x2, "y2": y2,
            "speed_px_s": speed_px_s, "traffic_light_state": tl_state
        })

    # HUD: show current TL state at top-left
    hud_text = f"TL: {tl_state if tl_state is not None else 'none'}  Frame:{frame_id}"
    cv2.putText(frame_proc, hud_text, (10,25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

    out.write(frame_proc)
    frame_id += 1

# cleanup
cap.release()
out.release()

# save events csv (traffic_light_state included)
keys = ["frame","time","track_id","x1","y1","x2","y2","speed_px_s","traffic_light_state"]
with open(EVENTS_CSV, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=keys)
    writer.writeheader()
    for r in events:
        writer.writerow({k: r.get(k, None) for k in keys})

print("DONE.")
print("Output video:", OUTPUT_VIDEO)
print("Events CSV:", EVENTS_CSV)


Loading YOLO model...
Segmentation model loaded.
Traffic-light class from model present? True
Starting ADAS + TLD processing...
DONE.
Output video: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_riding_bike_adas_tld.mp4
Events CSV: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\adas_events_tld.csv


In [23]:
# Traffic light color detector (robust HSV + blob filtering)
# Kernel: Python (adas)
from pathlib import Path
import cv2, numpy as np, math

VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")
DEBUG_OUT = Path("tl_color_debug.png")

# ---------- PARAMETERS (tune if necessary) ----------
TL_ROI_TOP_FRAC = 0.0        # start of ROI (0 = top of frame)
TL_ROI_BOTTOM_FRAC = 0.45    # bottom of ROI (upper 45% of frame)
MIN_TL_AREA = 40             # min area (in pixels) to consider a light blob
MAX_TL_AREA = 20000          # max area to ignore huge regions
ASPECT_RATIO_MAX = 2.5       # discard overly wide blobs
CIRCULARITY_MIN = 0.3        # solidity/shape measure (lower->allow more shapes)

# HSV ranges (you can tune to your environment)
RED_RANGE1 = ((0, 90, 60), (10, 255, 255))
RED_RANGE2 = ((160, 90, 60), (179, 255, 255))
YELLOW_RANGE = ((15, 100, 100), (40, 255, 255))
GREEN_RANGE = ((40, 60, 60), (100, 255, 255))

# morphological kernel
KERNEL = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))

# ---------- detector function ----------
def detect_traffic_light_color(frame,
                               tl_roi_top_frac=TL_ROI_TOP_FRAC,
                               tl_roi_bottom_frac=TL_ROI_BOTTOM_FRAC,
                               min_area=MIN_TL_AREA):
    """
    Return (state, bbox, score, debug_masks)
      - state: 'red' | 'yellow' | 'green' | None
      - bbox: (x1,y1,x2,y2) of detected light in full-frame coords or None
      - score: heuristic score (area or combined) for ranking
      - debug_masks: dict of masks for visualization
    """
    H, W = frame.shape[:2]
    y1 = int(H * tl_roi_top_frac)
    y2 = int(H * tl_roi_bottom_frac)
    roi = frame[y1:y2, 0:W].copy()
    if roi.size == 0:
        return None, None, 0.0, {}

    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)

    def mask_from_range(rng):
        low, high = rng
        low = np.array(low, dtype=np.uint8); high = np.array(high, dtype=np.uint8)
        m = cv2.inRange(hsv, low, high)
        m = cv2.morphologyEx(m, cv2.MORPH_OPEN, KERNEL)
        m = cv2.morphologyEx(m, cv2.MORPH_CLOSE, KERNEL)
        return m

    masks = {}
    masks['red1'] = mask_from_range(RED_RANGE1)
    masks['red2'] = mask_from_range(RED_RANGE2)
    masks['red'] = cv2.bitwise_or(masks['red1'], masks['red2'])
    masks['yellow'] = mask_from_range(YELLOW_RANGE)
    masks['green'] = mask_from_range(GREEN_RANGE)

    # helper to find candidate blobs in a mask
    def find_best_blob(mask):
        cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        best = None  # (area, bbox, circularity)
        for c in cnts:
            area = cv2.contourArea(c)
            if area < min_area or area > MAX_TL_AREA:
                continue
            x,y,w,h = cv2.boundingRect(c)
            aspect = float(w)/float(h+1e-6)
            if aspect > ASPECT_RATIO_MAX:  # too wide, not a light
                continue
            # circularity / solidity measure
            hull = cv2.convexHull(c)
            hull_area = cv2.contourArea(hull) if len(hull) > 2 else area
            circularity = float(area) / (hull_area + 1e-6)
            # score prefers larger area and decent circularity
            score = area * (0.5 + 0.5 * min(1.0, circularity / (CIRCULARITY_MIN+1e-6)))
            if best is None or score > best[0]:
                best = (score, (x,y,w,h), circularity, area)
        return best

    cand_red = find_best_blob(masks['red'])
    cand_yellow = find_best_blob(masks['yellow'])
    cand_green = find_best_blob(masks['green'])

    # Priority: red > yellow > green (you can change)
    cands = []
    if cand_red is not None:
        cands.append(('red',) + cand_red)
    if cand_yellow is not None:
        cands.append(('yellow',) + cand_yellow)
    if cand_green is not None:
        cands.append(('green',) + cand_green)

    # If no candidates, return None
    if not cands:
        return None, None, 0.0, masks

    # choose best by score (but if close, prefer red then yellow)
    # sort by score desc
    cands.sort(key=lambda x: x[1], reverse=True)
    # enforced priority: if red exists with reasonable score choose it
    names = [c[0] for c in cands]
    # priority selection
    for preferred in ['red','yellow','green']:
        for cand in cands:
            if cand[0] == preferred:
                _, score, bbox, circ, area = cand
                x,y,w,h = bbox
                # convert bbox coords to full frame
                fx1, fy1, fx2, fy2 = x, y + y1, x + w, y + h + y1
                return preferred, (fx1, fy1, fx2, fy2), float(score), masks

    # fallback pick best
    pref = cands[0]
    _, score, bbox, circ, area = pref
    x,y,w,h = bbox
    fx1, fy1, fx2, fy2 = x, y + y1, x + w, y + h + y1
    return pref[0], (fx1, fy1, fx2, fy2), float(score), masks

# ---------- quick test on first frame ----------
cap = cv2.VideoCapture(str(VIDEO_PATH))
ret, frame0 = cap.read()
cap.release()
if not ret:
    raise SystemExit("Cannot read video frame.")

state, bbox, score, debug_masks = detect_traffic_light_color(frame0)
print("Detected:", state, " bbox:", bbox, " score:", score)

# draw debug image
vis = frame0.copy()
H, W = frame0.shape[:2]
# draw ROI rect
y1 = int(H * TL_ROI_TOP_FRAC); y2 = int(H * TL_ROI_BOTTOM_FRAC)
cv2.rectangle(vis, (0,y1), (W,y2), (255,255,0), 2)

# overlay masks (small thumbnails on right)
thumb_h = 120
thumbs = []
for k in ['red','yellow','green']:
    m = debug_masks.get(k)
    if m is None:
        m = np.zeros((y2-y1, W), dtype=np.uint8)
    m_color = cv2.cvtColor(m, cv2.COLOR_GRAY2BGR)
    m_small = cv2.resize(m_color, (int(W*0.25), thumb_h))
    thumbs.append(m_small)
# stack thumbs vertically
stacked = np.vstack(thumbs)
# paste on right side
h_s, w_s = stacked.shape[:2]
vis[10:10+h_s, -w_s-10:-10] = stacked

# draw bbox and state
if bbox is not None:
    x1,y1b,x2,y2b = bbox
    color = (0,0,255) if state=='red' else (0,255,255) if state=='yellow' else (0,255,0)
    cv2.rectangle(vis, (x1,y1b), (x2,y2b), color, 2)
    cv2.putText(vis, f"TL:{state} s={score:.1f}", (x1, max(0,y1b-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

# save debug image
cv2.imwrite(str(DEBUG_OUT), vis)
print("Debug image saved to:", DEBUG_OUT)


Detected: red  bbox: (362, 191, 392, 216)  score: 378.0
Debug image saved to: tl_color_debug.png


In [24]:
# Run Traffic-Light color detector over full video -> annotated video + CSV
# Kernel: Python (adas)

from pathlib import Path
import cv2, time, csv, numpy as np, math, collections
from datetime import datetime

# ---------- Paths & params ----------
VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")
OUT_VIDEO = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_tl_color_full.mp4")
OUT_CSV   = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tl_events.csv")

# re-use detector from debug cell (copy/paste detector code here)
TL_ROI_TOP_FRAC = 0.0
TL_ROI_BOTTOM_FRAC = 0.45
MIN_TL_AREA = 40
MAX_TL_AREA = 20000
ASPECT_RATIO_MAX = 2.5
CIRCULARITY_MIN = 0.3
KERNEL = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))

RED_RANGE1 = ((0, 90, 60), (10, 255, 255))
RED_RANGE2 = ((160, 90, 60), (179, 255, 255))
YELLOW_RANGE = ((15, 100, 100), (40, 255, 255))
GREEN_RANGE = ((40, 60, 60), (100, 255, 255))

BEEP_ON_RED = True
ALARM_COOLDOWN = 2.0  # seconds between beeps

UPSAMPLE_FOR_CLASS = True  # set False to skip crop upsample refinement
UPSAMPLE_SCALE = 2         # upsample factor for small boxes before re-classify
MIN_AREA_FOR_UPSAMPLE = 60

# ---------- helper functions (same as debug) ----------
def mask_from_range_hsv(hsv, rng):
    low, high = rng
    low = np.array(low, dtype=np.uint8); high = np.array(high, dtype=np.uint8)
    m = cv2.inRange(hsv, low, high)
    m = cv2.morphologyEx(m, cv2.MORPH_OPEN, KERNEL)
    m = cv2.morphologyEx(m, cv2.MORPH_CLOSE, KERNEL)
    return m

def find_best_blob(mask, min_area=MIN_TL_AREA):
    cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    best = None
    for c in cnts:
        area = cv2.contourArea(c)
        if area < min_area or area > MAX_TL_AREA:
            continue
        x,y,w,h = cv2.boundingRect(c)
        aspect = float(w)/float(h+1e-6)
        if aspect > ASPECT_RATIO_MAX:
            continue
        hull = cv2.convexHull(c)
        hull_area = cv2.contourArea(hull) if len(hull) > 2 else area
        circularity = float(area) / (hull_area + 1e-6)
        score = area * (0.5 + 0.5 * min(1.0, circularity / (CIRCULARITY_MIN+1e-6)))
        if best is None or score > best[0]:
            best = (score, (x,y,w,h), circularity, area)
    return best

def classify_crop_by_hsv(crop):
    """Return 'red'|'yellow'|'green'|None and score"""
    if crop is None or crop.size==0:
        return None, 0.0
    hsv = cv2.cvtColor(crop, cv2.COLOR_BGR2HSV)
    red1 = mask_from_range_hsv(hsv, RED_RANGE1)
    red2 = mask_from_range_hsv(hsv, RED_RANGE2)
    red = cv2.bitwise_or(red1, red2)
    yellow = mask_from_range_hsv(hsv, YELLOW_RANGE)
    green = mask_from_range_hsv(hsv, GREEN_RANGE)
    br = find_best_blob(red)
    by = find_best_blob(yellow)
    bg = find_best_blob(green)
    cands = []
    if br is not None:
        cands.append(('red',) + br)
    if by is not None:
        cands.append(('yellow',) + by)
    if bg is not None:
        cands.append(('green',) + bg)
    if not cands:
        return None, 0.0
    # prefer red>yellow>green if close; otherwise best score
    cands.sort(key=lambda x: x[1], reverse=True)
    for pref in ('red','yellow','green'):
        for cand in cands:
            if cand[0] == pref:
                return cand[0], float(cand[1])
    # fallback
    return cands[0][0], float(cands[0][1])

def detect_traffic_light_color_frame(frame):
    H, W = frame.shape[:2]
    y1 = int(H * TL_ROI_TOP_FRAC); y2 = int(H * TL_ROI_BOTTOM_FRAC)
    roi = frame[y1:y2, 0:W].copy()
    if roi.size == 0:
        return None, None, 0.0, {}
    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    masks = {}
    masks['red1'] = mask_from_range_hsv(hsv, RED_RANGE1)
    masks['red2'] = mask_from_range_hsv(hsv, RED_RANGE2)
    masks['red'] = cv2.bitwise_or(masks['red1'], masks['red2'])
    masks['yellow'] = mask_from_range_hsv(hsv, YELLOW_RANGE)
    masks['green'] = mask_from_range_hsv(hsv, GREEN_RANGE)

    cand_red = find_best_blob(masks['red'])
    cand_yellow = find_best_blob(masks['yellow'])
    cand_green = find_best_blob(masks['green'])

    cands = []
    if cand_red is not None:
        cands.append(('red',) + cand_red)
    if cand_yellow is not None:
        cands.append(('yellow',) + cand_yellow)
    if cand_green is not None:
        cands.append(('green',) + cand_green)

    if not cands:
        return None, None, 0.0, masks

    cands.sort(key=lambda x: x[1], reverse=True)
    # priority red>yellow>green
    for pref in ('red','yellow','green'):
        for cand in cands:
            if cand[0] == pref:
                _, score, bbox, circ, area = cand
                x,y,w,h = bbox
                fx1, fy1, fx2, fy2 = x, y + y1, x + w, y + h + y1
                return pref, (fx1, fy1, fx2, fy2), float(score), masks

    # fallback
    pref = cands[0]
    _, score, bbox, circ, area = pref
    x,y,w,h = bbox
    fx1, fy1, fx2, fy2 = x, y + y1, x + w, y + h + y1
    return pref, (fx1, fy1, fx2, fy2), float(score), masks

# ---------- setup video IO ----------
cap = cv2.VideoCapture(str(VIDEO_PATH))
if not cap.isOpened():
    raise SystemExit("Failed to open video.")
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"avc1")
out = cv2.VideoWriter(str(OUT_VIDEO), fourcc, FPS, (W, H))

frame_id = 0
events = []
last_beep = -9999

print("Processing TL color detection over full video...")
t0 = time.time()
while True:
    ret, frame = cap.read()
    if not ret:
        break
    t_frame = time.time()
    state, bbox, score, masks = detect_traffic_light_color_frame(frame)

    # optional upsample reclassification (helps tiny lights)
    if UPSAMPLE_FOR_CLASS and bbox is not None:
        x1,y1,x2,y2 = map(int, bbox)
        w = x2 - x1; h = y2 - y1
        if w*h < MIN_AREA_FOR_UPSAMPLE:
            # upsample ROI a bit and reclassify
            pad = 4
            xa = max(0, x1-pad); ya = max(0, y1-pad); xb = min(W, x2+pad); yb = min(H, y2+pad)
            crop = frame[ya:yb, xa:xb].copy()
            if crop.size > 0:
                crop_up = cv2.resize(crop, (int(crop.shape[1]*UPSAMPLE_SCALE), int(crop.shape[0]*UPSAMPLE_SCALE)), interpolation=cv2.INTER_CUBIC)
                st2, sc2 = classify_crop_by_hsv(crop_up)
                if st2 is not None:
                    # if reclass says same or higher score prefer it
                    state = st2

    # draw on frame
    vis = frame.copy()
    if bbox is not None:
        x1,y1,x2,y2 = map(int, bbox)
        col = (0,0,255) if state=='red' else (0,255,255) if state=='yellow' else (0,255,0) if state=='green' else (200,200,200)
        cv2.rectangle(vis, (x1,y1), (x2,y2), col, 2)
        cv2.putText(vis, f"TL:{state}", (x1, max(0,y1-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, col, 2)
    # HUD
    hud = f"TL:{state if state else 'none'}  Frame:{frame_id}"
    cv2.putText(vis, hud, (10,25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
    out.write(vis)

    # beep on red with cooldown
    if state == 'red' and BEEP_ON_RED:
        now = time.time()
        if (now - last_beep) >= ALARM_COOLDOWN:
            last_beep = now
            try:
                import winsound
                winsound.Beep(700, 200)
            except Exception:
                pass

    events.append({
        "frame": frame_id,
        "time_s": frame_id / FPS,
        "state": state,
        "bbox": bbox,
        "score": score
    })

    frame_id += 1

cap.release()
out.release()

# ---------- save CSV ----------
with open(OUT_CSV, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=["frame","time_s","state","bbox","score"])
    writer.writeheader()
    for r in events:
        writer.writerow({"frame": r["frame"], "time_s": r["time_s"], "state": r["state"], "bbox": str(r["bbox"]) if r["bbox"] else "", "score": r["score"]})

print("Done. Processed frames:", frame_id, "Elapsed:", time.time()-t0)
print("Output video:", OUT_VIDEO)
print("Events CSV:", OUT_CSV)


Processing TL color detection over full video...
Done. Processed frames: 870 Elapsed: 11.5126314163208
Output video: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_tl_color_full.mp4
Events CSV: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\tl_events.csv


In [27]:
# Robust Traffic Light Detection with temporal confirmation
# Kernel: Python (adas)
# Outputs:
#   - annotated video: output_tld_confirmed.mp4
#   - events CSV: tld_confirmed_events.csv

from pathlib import Path
import cv2, numpy as np, math, time, csv, sys, collections

# ---------------- CONFIG (edit paths if needed) ----------------
VIDEO_PATH = Path(r"E:/projects/riding_bike.mp4")
OUT_VIDEO  = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/output_tld_confirmed.mp4")
OUT_CSV    = Path(r"C:/Users/Dell/projects/idas_lite_seg/run_yolov8n_seg2/tld_confirmed_events.csv")

# TL detection params
UPPER_ROI_BOTTOM_FRAC = 0.45   # analyze only top 45% of frame
MIN_AREA = 40                  # min blob area
MAX_AREA = 1000                # max blob area (traffic lights are small)
ASPECT_MIN, ASPECT_MAX = 0.4, 2.0
CIRCULARITY_MIN = 0.25
IOU_MATCH_THRESH = 0.35        # IoU threshold to match detections across frames
N_CONS_CONFIRM = 3             # require this many consecutive frames to confirm a TL
COOLDOWN_SECS = 2.0            # beep cooldown
BEEP_ON_RED = True

# HSV ranges (tune if needed)
RED_RANGE1 = ((0, 120, 80), (10, 255, 255))
RED_RANGE2 = ((160, 120, 80), (179, 255, 255))
YELLOW_RANGE = ((15, 120, 120), (40, 255, 255))
GREEN_RANGE = ((40, 80, 80), (90, 255, 255))

# morphological kernel
KERNEL = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))

# ------------- utility helpers -------------
def iou(boxA, boxB):
    # boxes (x1,y1,x2,y2)
    xA = max(boxA[0], boxB[0]); yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2]); yB = min(boxA[3], boxB[3])
    interW = max(0, xB - xA); interH = max(0, yB - yA)
    interArea = interW * interH
    areaA = max(0, boxA[2]-boxA[0]) * max(0, boxA[3]-boxA[1])
    areaB = max(0, boxB[2]-boxB[0]) * max(0, boxB[3]-boxB[1])
    union = areaA + areaB - interArea + 1e-6
    return interArea/union

def mask_from_range_hsv(hsv, rng):
    low, high = rng
    low = np.array(low, dtype=np.uint8); high = np.array(high, dtype=np.uint8)
    m = cv2.inRange(hsv, low, high)
    m = cv2.morphologyEx(m, cv2.MORPH_OPEN, KERNEL)
    m = cv2.morphologyEx(m, cv2.MORPH_CLOSE, KERNEL)
    return m

def find_color_blobs(hsv, color_ranges):
    # returns list of candidate boxes (x,y,w,h,area,mask)
    mask = None
    if isinstance(color_ranges, tuple) and len(color_ranges)==2 and all(isinstance(x, tuple) for x in color_ranges):
        # single range
        mask = mask_from_range_hsv(hsv, color_ranges)
    elif isinstance(color_ranges, (list,tuple)) and len(color_ranges)>1:
        mask = None
        for rng in color_ranges:
            m = mask_from_range_hsv(hsv, rng)
            mask = m if mask is None else cv2.bitwise_or(mask, m)
    else:
        mask = np.zeros(hsv.shape[:2], dtype=np.uint8)
    cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    cands = []
    for c in cnts:
        area = cv2.contourArea(c)
        if area < MIN_AREA or area > MAX_AREA:
            continue
        x,y,w,h = cv2.boundingRect(c)
        aspect = w/(h+1e-6)
        if aspect < ASPECT_MIN or aspect > ASPECT_MAX:
            continue
        perim = cv2.arcLength(c, True)
        if perim <= 0: continue
        circularity = 4*math.pi*(area/(perim*perim))
        if circularity < CIRCULARITY_MIN:
            continue
        cands.append((x,y,w,h,area,mask))
    return cands

def detect_tl_shapes(frame):
    """Return list of (state, bbox) in full-frame coords. bbox = (x1,y1,x2,y2)."""
    H,W = frame.shape[:2]
    y1 = 0; y2 = int(H * UPPER_ROI_BOTTOM_FRAC)
    roi = frame[y1:y2, :]
    if roi.size == 0:
        return []

    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    # find colored blobs
    red_cands = find_color_blobs(hsv, [RED_RANGE1, RED_RANGE2])
    yellow_cands = find_color_blobs(hsv, YELLOW_RANGE)
    green_cands = find_color_blobs(hsv, GREEN_RANGE)

    # convert to (state, bbox)
    res = []
    for (x,y,w,h,area,mask) in red_cands:
        res.append(("red", (x, y + y1, x+w, y + y1 + h)))
    for (x,y,w,h,area,mask) in yellow_cands:
        res.append(("yellow", (x, y + y1, x+w, y + y1 + h)))
    for (x,y,w,h,area,mask) in green_cands:
        res.append(("green", (x, y + y1, x+w, y + y1 + h)))

    # now cluster by X proximity (vertical alignment). Real TL often has vertical stack -> same x center
    clustered = []
    used = [False]*len(res)
    for i, (state_i, box_i) in enumerate(res):
        if used[i]: continue
        xi_center = (box_i[0] + box_i[2])/2
        cluster = [(state_i, box_i)]
        used[i] = True
        for j, (state_j, box_j) in enumerate(res):
            if used[j]: continue
            xj_center = (box_j[0] + box_j[2])/2
            if abs(xi_center - xj_center) < max(20, (box_i[2]-box_i[0])*0.6):  # similar x
                cluster.append((state_j, box_j))
                used[j] = True
        # cluster -> choose highest priority state among cluster members:
        # priority red>yellow>green
        priority = {"red":3,"yellow":2,"green":1}
        cluster.sort(key=lambda x: priority.get(x[0],0), reverse=True)
        clustered.append(cluster[0])  # representative
    return clustered

# ------------- Temporal confirmation tracker for TLs -------------
class TLConfirmTracker:
    def __init__(self, iou_thresh=IOU_MATCH_THRESH, required_cons=N_CONS_CONFIRM):
        self.iou_thresh = iou_thresh
        self.required_cons = required_cons
        self.entries = []  # list of dicts: {box, state, count, last_seen_time}
        self.confirmed = []  # confirmed entries (same dict but flagged)
    def update(self, detections):
        """
        detections: list of (state, box)
        returns: list of confirmed entries (state, box) newly confirmed this frame
        """
        now = time.time()
        matched_idx = set()
        # try to match detections to existing entries
        for state, box in detections:
            best_i = None; best_iou = 0.0
            for i, e in enumerate(self.entries):
                iouv = iou(box, e['box'])
                if iouv > best_iou:
                    best_i = i; best_iou = iouv
            if best_i is not None and best_iou >= self.iou_thresh:
                e = self.entries[best_i]
                # state match increases confidence, mismatch reduces/overwrites
                if e['state'] == state:
                    e['count'] += 1
                else:
                    # different state observed for same box -> reset count and adopt new state
                    e['state'] = state
                    e['count'] = 1
                e['box'] = box
                e['last_seen'] = now
                matched_idx.add(best_i)
            else:
                # new entry
                self.entries.append({'state':state, 'box':box, 'count':1, 'last_seen':now, 'confirmed':False})
        # age out stale entries (not seen for a while)
        keep = []
        for e in self.entries:
            if now - e['last_seen'] > 1.0:   # 1s stale threshold
                # don't keep if stale and not confirmed
                if e.get('confirmed', False):
                    keep.append(e)
                else:
                    # drop
                    pass
            else:
                keep.append(e)
        self.entries = keep
        # check confirm
        newly_confirmed = []
        for e in self.entries:
            if not e.get('confirmed', False) and e['count'] >= self.required_cons:
                e['confirmed'] = True
                newly_confirmed.append((e['state'], e['box']))
                self.confirmed.append(e)
        return newly_confirmed
    def get_confirmed(self):
        # return list of confirmed entries current
        return [(e['state'], e['box']) for e in self.confirmed]

# ---------------- Main processing: apply detector + temporal confirmation ----------------
if not VIDEO_PATH.exists():
    raise SystemExit("Video not found: " + str(VIDEO_PATH))

cap = cv2.VideoCapture(str(VIDEO_PATH))
if not cap.isOpened():
    raise SystemExit("Cannot open video.")
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cap.get if False else cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"avc1")
out = cv2.VideoWriter(str(OUT_VIDEO), fourcc, FPS, (W, H))

tracker_tl = TLConfirmTracker()
events = []
last_beep = -9999
frame_id = 0
t0 = time.time()
print("Starting TL detection + confirmation pass...")

while True:
    ret, frame = cap.read()
    if not ret:
        break
    detections = detect_tl_shapes(frame)  # returns list of (state, box)
    # update confirmation tracker
    newly_confirmed = tracker_tl.update(detections)
    confirmed_now = tracker_tl.get_confirmed()

    # draw detections (unconfirmed) faintly
    for st, box in detections:
        x1,y1,x2,y2 = map(int, box)
        cv2.rectangle(frame, (x1,y1), (x2,y2), (200,200,200), 1)
        cv2.putText(frame, f"{st}", (x1, max(0,y1-8)), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (200,200,200), 1)

    # draw confirmed TLs with strong color
    for st, box in confirmed_now:
        x1,y1,x2,y2 = map(int, box)
        color = (0,0,255) if st=='red' else (0,255,255) if st=='yellow' else (0,255,0)
        cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)
        cv2.putText(frame, f"TL:{st}", (x1, max(0,y1-8)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        # beep on red (confirmed)
        if st == 'red' and BEEP_ON_RED:
            now = time.time()
            if now - last_beep >= COOLDOWN_SECS:
                last_beep = now
                try:
                    import winsound
                    winsound.Beep(700, 180)
                except Exception:
                    pass
        # log event
        events.append({"frame":frame_id, "time_s":frame_id/FPS, "state":st, "bbox":str(box)})

    # HUD summary
    confirmed_states = ", ".join([f"{s}" for s,_ in confirmed_now]) if confirmed_now else "none"
    cv2.putText(frame, f"Confirmed TLs: {confirmed_states}", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

    out.write(frame)
    frame_id += 1

cap.release()
out.release()

# save CSV
with open(OUT_CSV, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=["frame","time_s","state","bbox"])
    writer.writeheader()
    for r in events:
        writer.writerow(r)

print("Done. Frames processed:", frame_id, "Output video:", OUT_VIDEO, "Events CSV:", OUT_CSV)


Starting TL detection + confirmation pass...
Done. Frames processed: 870 Output video: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\output_tld_confirmed.mp4 Events CSV: C:\Users\Dell\projects\idas_lite_seg\run_yolov8n_seg2\tld_confirmed_events.csv
