In [None]:
import cv2
import os
import time
import json
from datetime import datetime
from collections import defaultdict

In [None]:
# Config
LATEST_JSON = r"path\where\the\detected results\are\stored"  #loaction of parsed JSON file, generated by class_filter
video_path_file = r"any\directory"  #in which the location of the video file is written by the module
VIDEO_SOURCE = 0
print(f"[DEBUG] Using video source: {VIDEO_SOURCE}")

In [None]:
if os.path.isfile(video_path_file):
    try:
        with open(video_path_file, "r") as f:
            path = f.read().strip()
            VIDEO_SOURCE = int(path) if path.isdigit() else path
    except Exception:
        pass

In [None]:
WINDOW_NAME = "Traffic Overlay"
POLL_INTERVAL = 1
GREEN_DURATION = 30  # Green for 30 seconds after which the signals revert to red.

In [None]:
per_class_thresholds = {             # Customizable
    "truck": 3,
    "bus": 4,
    "car": 5,
    "motorcycle": 25
}
total_threshold = None              #In case of heavy traffic, can be set for better processing.

In [None]:
# Overlay appearance 
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 2.0
THICKNESS = 3
POS = (50, 120)
COUNT_POS = (50, 220)
BG_RECT_PADDING = 10
GREEN_COLOR = (0, 200, 0)
RED_COLOR = (0, 0, 200)
TEXT_BG_COLOR = (0, 0, 0)
TEXT_BG_ALPHA = 0.5

In [None]:
# JSON reader
def read_counts_from_json(path, retries=3, retry_delay=0.05):
    counts = defaultdict(int)
    if not os.path.isfile(path):
        print("[DEBUG] JSON file not found")
        return counts
    for _ in range(retries):
        try:
            with open(path, 'r', encoding='utf-8') as fh:
                data = json.load(fh)
            raw = data.get("counts", {}) if isinstance(data, dict) else {}
            for k, v in raw.items():
                try:
                    counts[str(k).lower()] = int(v)
                except Exception:
                    try:
                        counts[str(k).lower()] = int(float(v))
                    except Exception:
                        counts[str(k).lower()] = 0
            print(f"[DEBUG] JSON contents: {dict(counts)}")
            return counts
        except (json.JSONDecodeError, ValueError) as e:
            print(f"[DEBUG] JSON decode error: {e}")
            time.sleep(retry_delay)
            continue
        except Exception as e:
            print(f"[DEBUG] Unexpected error reading JSON: {e}")
            return counts
    return counts

In [None]:
# Decision logic
def should_turn_green(counts, per_class_thresholds=None, total_threshold=None):
    if total_threshold is not None:
        total = sum(counts.values())
        return (total >= total_threshold, f"total={total}")
    if per_class_thresholds:
        for cls, thr in per_class_thresholds.items():
            c = counts.get(cls.lower(), 0)
            if c >= thr:
                return (True, f"{cls}={c}")
    return (False, "")

In [None]:
# Overlay helper
def draw_text_with_bg(frame, text, pos, font, scale, thickness, text_color, bg_color, alpha=0.5, padding=8):
    (w, h), baseline = cv2.getTextSize(text, font, scale, thickness)
    x, y = pos
    rect_tl = (x - padding, y - h - padding)
    rect_br = (x + w + padding, y + baseline + padding)
    overlay = frame.copy()
    cv2.rectangle(overlay, rect_tl, rect_br, bg_color, -1)
    cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
    cv2.putText(frame, text, (x, y), font, scale, text_color, thickness, cv2.LINE_AA)

In [None]:
# Main function
def main():
    cap = cv2.VideoCapture(VIDEO_SOURCE, cv2.CAP_ANY)
    if not cap.isOpened():
        print(f"[ERROR] Cannot open video source: {VIDEO_SOURCE}")
        return

    green_active = False
    green_end_time = 0
    cooldown_active = False
    cooldown_end_time = 0
    last_read_time = 0
    last_counts = defaultdict(int)
    last_reason = ""

    cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)

    try:
        while True:
            ret, frame = cap.read()
            print(f"[DEBUG] Frame read at {time.time()} -> ret={ret}")            #For Debugging
            if not ret:
                if isinstance(VIDEO_SOURCE, str):
                    break
                else:
                    time.sleep(0.1)
                    continue

            now = time.time()

            if cooldown_active and now < cooldown_end_time:                                       
                pass
            elif now - last_read_time >= POLL_INTERVAL:
                counts = read_counts_from_json(LATEST_JSON)
                counts = {k.lower(): int(v) for k, v in counts.items()}
                last_counts.clear()
                last_counts.update(counts)

                triggered, reason = should_turn_green(last_counts, per_class_thresholds, total_threshold)
                last_read_time = now
                last_reason = reason

                print(f"[DEBUG] Decision: GREEN={triggered}, reason={reason}")

                if triggered and not green_active:
                    green_active = True
                    green_end_time = now + GREEN_DURATION
                    cooldown_active = True
                    cooldown_end_time = now + GREEN_DURATION
                    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] GREEN triggered by {reason} -> until {datetime.fromtimestamp(green_end_time)}")

            if cooldown_active and now >= cooldown_end_time:
                cooldown_active = False

            if green_active and now >= green_end_time:
                green_active = False
                last_reason = ""

            signal_text = f"GREEN ({int(green_end_time - now)}s)" if green_active else "RED"
            signal_color = GREEN_COLOR if green_active else RED_COLOR
            draw_text_with_bg(frame, signal_text, POS, FONT, FONT_SCALE, THICKNESS, signal_color, TEXT_BG_COLOR, TEXT_BG_ALPHA, BG_RECT_PADDING)

            keys = per_class_thresholds.keys()
            counts_summary = "  ".join([f"{k}:{last_counts.get(k.lower(), 0)}" for k in keys]) or "no detections"
            draw_text_with_bg(frame, counts_summary, COUNT_POS, FONT, 0.9, 2, (255,255,255), TEXT_BG_COLOR, 0.5, 8)

            if last_reason:
                draw_text_with_bg(frame, f"Triggered by: {last_reason}", (POS[0], POS[1]+140), FONT, 0.8, 2, (230,230,230), TEXT_BG_COLOR, 0.5, 6)

            cv2.imshow(WINDOW_NAME, frame)
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                break

            time.sleep(0.01)

    except KeyboardInterrupt:
        pass
    finally:
        cap.release()
        cv2.destroyAllWindows()

In [None]:
if __name__ == "__main__":

    main()