In [1]:
!pip install ultralytics opencv-python numpy


Collecting ultralytics
  Obtaining dependency information for ultralytics from https://files.pythonhosted.org/packages/71/b0/f138123e3d8038b0c9e0c6986497d1e023ec20d0d5c8d1e30d77ef9a79b9/ultralytics-8.3.243-py3-none-any.whl.metadata
  Downloading ultralytics-8.3.243-py3-none-any.whl.metadata (37 kB)
Collecting polars>=0.20.0 (from ultralytics)
  Obtaining dependency information for polars>=0.20.0 from https://files.pythonhosted.org/packages/f6/c6/36a1b874036b49893ecae0ac44a2f63d1a76e6212631a5b2f50a86e0e8af/polars-1.36.1-py3-none-any.whl.metadata
  Downloading polars-1.36.1-py3-none-any.whl.metadata (10 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Obtaining dependency information for ultralytics-thop>=2.0.18 from https://files.pythonhosted.org/packages/7f/c7/fb42228bb05473d248c110218ffb8b1ad2f76728ed8699856e5af21112ad/ultralytics_thop-2.0.18-py3-none-any.whl.metadata
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Collecting polars-runtime-32==1.36

In [2]:
import os
import cv2
import time
import numpy as np
from ultralytics import YOLO


Creating new Ultralytics Settings v0.0.6 file  
View Ultralytics Settings with 'yolo settings' or at 'C:\Users\surya\AppData\Roaming\Ultralytics\settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [22]:
import json
from datetime import datetime

In [None]:
# Load YOLOv8 CNN model
model = YOLO("yolov8n.pt")  # lightweight, fast

PERSON_CLASS = 0
VEHICLE_CLASSES = [2, 3, 5, 7]  # car, motorcycle, bus, truck

def estimate_speed(prev_center, curr_center, time_diff):
    if prev_center is None or time_diff == 0:
        return 0
    dist = np.linalg.norm(np.array(curr_center) - np.array(prev_center))
    return dist / time_diff

def iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    interArea = max(0, xB - xA) * max(0, yB - yA)
    boxAArea = (boxA[2]-boxA[0]) * (boxA[3]-boxA[1])
    boxBArea = (boxB[2]-boxB[0]) * (boxB[3]-boxB[1])

    return interArea / float(boxAArea + boxBArea - interArea + 1e-6)

def process_video(video_path):
    OUTPUT_DIR = r"E:\citysense360\outputs\cctv"
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    video_name = os.path.basename(video_path).replace(".mp4", "")
    output_video_path = os.path.join(OUTPUT_DIR, f"{video_name}_annotated.mp4")

    cap = cv2.VideoCapture(video_path)

    prev_centers = {}
    stationary_time = {}
    prev_time = time.time()

    out = None

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Initialize video writer AFTER first frame
        if out is None:
            out = cv2.VideoWriter(
                output_video_path,
                cv2.VideoWriter_fourcc(*"mp4v"),
                20,
                (frame.shape[1], frame.shape[0])
            )

        results = model(frame, verbose=False)[0]
        boxes = results.boxes

        people_count = 0
        vehicle_count = 0
        vehicle_boxes = []

        curr_time = time.time()
        time_diff = curr_time - prev_time
        prev_time = curr_time

        for box in boxes:
            cls = int(box.cls[0])
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            cx, cy = (x1 + x2) // 2, (y1 + y2) // 2

            # People detection
            if cls == PERSON_CLASS:
                people_count += 1
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # Vehicle detection
            if cls in VEHICLE_CLASSES:
                vehicle_count += 1
                vehicle_boxes.append((x1, y1, x2, y2))
                cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)

                speed = estimate_speed(prev_centers.get((cx, cy)), (cx, cy), time_diff)

                # Overspeeding
                if speed > 30:
                    cv2.putText(frame, "Overspeeding", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

                # Illegal parking
                if speed < 2:
                    stationary_time[(cx, cy)] = stationary_time.get((cx, cy), 0) + time_diff
                    if stationary_time[(cx, cy)] > 5:
                        cv2.putText(frame, "Illegal Parking", (x1, y2 + 20),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
                else:
                    stationary_time[(cx, cy)] = 0

                prev_centers[(cx, cy)] = (cx, cy)

        # Crowd detection
        if people_count > 15:
            cv2.putText(frame, "CROWD DETECTED", (30, 50),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)

        # Accident inference
        for i in range(len(vehicle_boxes)):
            for j in range(i + 1, len(vehicle_boxes)):
                if iou(vehicle_boxes[i], vehicle_boxes[j]) > 0.3:
                    cv2.putText(frame, "POSSIBLE ACCIDENT", (30, 100),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)

        # Overlay counts
        cv2.putText(frame, f"People: {people_count}", (20, frame.shape[0] - 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        cv2.putText(frame, f"Vehicles: {vehicle_count}", (20, frame.shape[0] - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2)

        # WRITE FRAME
        out.write(frame)

    cap.release()
    out.release()

    print(f" Annotated video saved at:\n{output_video_path}")
    event = {
        "timestamp": datetime.now().isoformat(),
        "source": os.path.basename(video_path),
        "people_count": people_count,
        "vehicle_count": vehicle_count,
        "alerts": []
    }

    if people_count > 15:
        event["alerts"].append("CROWD")
    if speed > 30:
        event["alerts"].append("OVERSPEED")
    if stationary_time.get((cx,cy),0) > 5:
        event["alerts"].append("ILLEGAL_PARKING")
    if accident_detected:
        event["alerts"].append("ACCIDENT")

    with open(r"E:\citysense360\outputs\cctv_events.json", "a") as f:
        f.write(json.dumps(event) + "\n")

def process_image(image_path):
    OUTPUT_DIR = r"E:\citysense360\outputs\cctv_images"
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    image_name = os.path.basename(image_path)
    output_path = os.path.join(OUTPUT_DIR, f"annotated_{image_name}")

    img = cv2.imread(image_path)
    if img is None:
        raise ValueError("Could not read image")

    results = model(img, verbose=False)[0]
    boxes = results.boxes

    people_count = 0
    vehicle_count = 0

    for box in boxes:
        cls = int(box.cls[0])
        x1, y1, x2, y2 = map(int, box.xyxy[0])

        if cls == PERSON_CLASS:
            people_count += 1
            cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)

        if cls in VEHICLE_CLASSES:
            vehicle_count += 1
            cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)

    # Crowd detection (image-level)
    if people_count > 15:
        cv2.putText(img, "CROWD DETECTED", (30, 50),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)

    cv2.putText(img, f"People: {people_count}", (20, img.shape[0] - 40),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
    cv2.putText(img, f"Vehicles: {vehicle_count}", (20, img.shape[0] - 10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2)

    cv2.imwrite(output_path, img)
    print(f"Annotated image saved at:\n{output_path}")
    event = {
        "timestamp": datetime.now().isoformat(),
        "source": "CCTV_IMAGE",
        "people_count": people_count,
        "vehicle_count": vehicle_count,
        "alerts": []
    }
    if people_count > 15:
        event["alerts"].append("CROWD")

    with open(r"E:\citysense360\outputs\cctv_events.json", "a") as f:
        f.write(json.dumps(event) + "\n")



[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 9.4MB/s 0.7s0.6s<0.0s.6s


In [18]:
video_path = r"E:\citysense360\data\cctv\highway_traffic\video\cctv052x2004080516x01641.avi"
process_video(video_path)


✅ Annotated video saved at:
E:\citysense360\outputs\cctv\cctv052x2004080516x01641.avi_annotated.mp4


In [21]:
image_path = r"E:\citysense360\data\cctv\accident_footage\test\Accident\acc1 (7).jpg"
process_image(image_path)


✅ Annotated image saved at:
E:\citysense360\outputs\cctv_images\annotated_acc1 (7).jpg
