In [None]:
# Imports and Configurations

import os
import argparse
import time

import cv2
import numpy as np
from ultralytics import YOLO

MODEL_PATH = "yolov8n.pt"
TARGET_WIDTH = 640

W_TRASH = 0.8
W_FLOW = 0.2
LOW_THR = 0.25
HIGH_THR = 0.50
MAX_FLOW_MAG = 5.0

In [None]:
# Helper Functions

def compute_trash_score(yolo_result, frame_shape):
    h, w, _ = frame_shape
    frame_area = float(w * h)
    if frame_area == 0:
        return 0.0

    total_box_area = 0.0
    for box in yolo_result.boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
        bw = max(0.0, float(x2 - x1))
        bh = max(0.0, float(y2 - y1))
        total_box_area += bw * bh

    return float(np.clip(total_box_area / frame_area, 0.0, 1.0))


def compute_flow_mag(prev_gray, gray):
    if prev_gray is None:
        return 0.0

    flow = cv2.calcOpticalFlowFarneback(
        prev_gray, gray, None,
        pyr_scale=0.5, levels=3, winsize=15,
        iterations=3, poly_n=5, poly_sigma=1.2, flags=0
    )

    mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    return float(np.mean(mag))


def normalize_flow(flow_mag, max_flow=MAX_FLOW_MAG):
    return float(np.clip(flow_mag / max_flow, 0.0, 1.0))


def risk_level(risk):
    if risk < LOW_THR:
        return "LOW", (0, 255, 0)
    elif risk < HIGH_THR:
        return "MODERATE", (0, 255, 255)
    else:
        return "HIGH", (0, 0, 255)

In [None]:
# Load Model

print("[INFO] Loading YOLOv8 model...")
model = YOLO(MODEL_PATH)

In [None]:
# Video Processing Function

def process_video(video_path, model):
    os.makedirs("outputs", exist_ok=True)
    base = os.path.basename(video_path)
    name, _ = os.path.splitext(base)
    out_path = os.path.join("outputs", f"{name}_output.mp4")

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"[ERROR] Cannot open video: {video_path}")
        return

    src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    src_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    src_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    target_w = TARGET_WIDTH
    target_h = int(src_h * (target_w / src_w))

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(out_path, fourcc, src_fps, (target_w, target_h))

    print(f"[INFO] Processing {video_path} ({src_w}x{src_h} -> {target_w}x{target_h})")
    print(f"[INFO] Output will be saved to {out_path}")

    prev_gray = None
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_idx += 1

        frame = cv2.resize(frame, (target_w, target_h))

        # ---- YOLO INFERENCE (with FPS) ----
        start_time = time.time()
        results = model(frame, imgsz=640, conf=0.05, verbose=False)
        inference_time = time.time() - start_time
        fps = 1.0 / inference_time if inference_time > 0 else 0.0

        r = results[0]

        trash_score = compute_trash_score(r, frame.shape)
        trash_pct = trash_score * 100.0

        for box in r.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        flow_mag = compute_flow_mag(prev_gray, gray)
        prev_gray = gray

        flow_norm = normalize_flow(flow_mag)

        risk = W_TRASH * trash_score + W_FLOW * (1.0 - flow_norm)
        level, color = risk_level(risk)

        cv2.putText(frame, f"TrashArea: {trash_pct:.2f}%", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        cv2.putText(frame, f"FlowMag: {flow_mag:.2f} (norm {flow_norm:.2f})", (10, 60),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        cv2.putText(frame, f"Risk: {risk:.2f} [{level}]", (10, 90),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
        cv2.putText(frame, f"FPS: {fps:.2f}", (10, 120),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        cv2.putText(frame, f"Frame {frame_idx}", (10, target_h - 20),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)

        out.write(frame)

    cap.release()
    out.release()
    print("[DONE] Saved:", out_path)

In [None]:
video_path = "[]"
process_video(video_path, model)