In [None]:
import cv2
import numpy as np
import csv
import time
import os

# HSV thresholds 
HSV_RANGES = {
    "red1": ((0, 100, 70), (8, 255, 255)),
    "red2": ((170, 100, 70), (180, 255, 255)),
    "yellow": ((15, 100, 100), (35, 255, 255)),
    "green": ((35, 60, 60), (90, 255, 255)),
}

def get_mask(hsv, color):
    if color == "red":
        mask1 = cv2.inRange(hsv, np.array(HSV_RANGES["red1"][0]), np.array(HSV_RANGES["red1"][1]))
        mask2 = cv2.inRange(hsv, np.array(HSV_RANGES["red2"][0]), np.array(HSV_RANGES["red2"][1]))
        mask = cv2.bitwise_or(mask1, mask2)
    else:
        low, high = HSV_RANGES[color]
        mask = cv2.inRange(hsv, np.array(low), np.array(high))
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel, iterations=1)
    mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel, iterations=1)
    return mask

def contour_is_valid(cnt, min_area=80):
    area = cv2.contourArea(cnt)
    if area < min_area: return False
    x,y,w,h = cv2.boundingRect(cnt)
    aspect = float(w)/h if h>0 else 0
    if aspect < 0.2 or aspect > 1.6: return False
    peri = cv2.arcLength(cnt, True)
    if peri == 0: return False
    circularity = 4*np.pi*(area/(peri*peri))
    if circularity < 0.03: return False
    return True

def find_light_candidates(mask, min_area=80):
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    boxes = []
    for cnt in contours:
        if contour_is_valid(cnt, min_area=min_area):
            x,y,w,h = cv2.boundingRect(cnt)
            boxes.append((x,y,w,h,cv2.contourArea(cnt)))
    return boxes

def merge_boxes(boxes):
    if not boxes: return []
    rects = np.array([[x, y, x+w, y+h, a] for (x,y,w,h,a) in boxes])
    rects = rects[np.argsort(rects[:,0])]
    merged, cur = [], rects[0].copy()
    for r in rects[1:]:
        overlap_x = min(cur[2], r[2]) - max(cur[0], r[0])
        overlap_y = min(cur[3], r[3]) - max(cur[1], r[1])
        if overlap_x > -5 and overlap_y > -5:
            cur[0] = min(cur[0], r[0]); cur[1] = min(cur[1], r[1])
            cur[2] = max(cur[2], r[2]); cur[3] = max(cur[3], r[3])
            cur[4] = max(cur[4], r[4])
        else:
            merged.append(tuple(cur.tolist())); cur = r.copy()
    merged.append(tuple(cur.tolist()))
    return [(int(x1),int(y1),int(x2-x1),int(y2-y1),a) for (x1,y1,x2,y2,a) in merged]

def process_video(video_path, output_path="annotated_out.mp4", min_area=80):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video {video_path}")
    fps = cap.get(cv2.CAP_PROP_FPS) or 20.0
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(output_path, fourcc, fps, (width,height))
    
    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret: break
        blur = cv2.GaussianBlur(frame, (5,5), 0)
        hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)

        detections = {c: find_light_candidates(get_mask(hsv, c), min_area) 
                      for c in ("red","yellow","green")}
        merged = {c: merge_boxes(detections[c]) for c in detections}
        
        label_for_frame = "none"
        if merged["red"]: label_for_frame = "red"
        elif merged["yellow"]: label_for_frame = "yellow"
        elif merged["green"]: label_for_frame = "green"
        
        for color in merged:
            for (x,y,w,h,a) in merged[color]:
                box_col = (0,0,255) if color=="red" else (0,255,255) if color=="yellow" else (0,255,0)
                cv2.rectangle(frame, (x,y), (x+w, y+h), box_col, 2)
                cv2.putText(frame, color, (x,y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, box_col, 2)
        
        cv2.rectangle(frame, (0,0), (220,36), (0,0,0), -1)
        cv2.putText(frame, f"State: {label_for_frame}", (6,20),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
        
        writer.write(frame)
        frame_idx += 1

    cap.release()
    writer.release()
    print(f"[info] Annotated video saved: {output_path}")

process_video("/kaggle/input/demovid/vecteezy_pov-to-the-stoplight-traffic-safety-sign-light-in-phuket-old_20925708.mp4",
              output_path="out1.mp4")
process_video("/kaggle/input/demovid/vecteezy_road-traffic-lights-at-night-in-time-lapse_16179747.mov",
              output_path="out2.mp4")


[info] Annotated video saved: out1.mp4
[info] Annotated video saved: out2.mp4


In [9]:
from IPython.display import HTML

HTML(f'<a href="{output_path}" download>Click here to download annotated video</a>')
