In [2]:
import cv2
from ultralytics import YOLO
from collections import defaultdict

# Call model
model = YOLO("yolo11n.pt")


# Print Total number of classes
class_list = model.names
print(class_list)


# open the video files
cap = cv2.VideoCapture("4.mp4")


# Define line positions for counting
line_y_red = 350 # Red line position
line_y_blue = line_y_red + 50 # Blue line position

# Variables to store counting and tracking information
counted_ids_red_to_blue = set()
counted_ids_blue_to_red = set()

# Dictionaries to count objects by class for each direction
count_red_to_blue = defaultdict(int) # Moving downwords
count_blue_to_red = defaultdict(int) # Moving upwards

# State dictionaries to track which line was crossed first
crossed_red_first = {}
crossed_blue_first = {}


# Loop through video frames

while cap.isOpened():
    print("1")
    ret, frame = cap.read()
    if not ret:
        break

    # Run YOLO tracking on the frame
    results = model.track(frame, persist = True, # bytetrack.yaml
                          tracker="bytetrack.yaml", # botsort.yaml
                           save = True)
    print(results)


    # Ensure results are not empty
    if results[0].boxes.data is not None:

      # Get the detected boxes, their class indices, and track IDs
      boxes = results[0].boxes.xyxy.cpu()
      track_ids = results[0].boxes.id.int().cpu().tolist()
      class_indices = results[0].boxes.cls.int().cpu().tolist()
      confidence = results[0].boxes.conf.cpu()

      # Draw the line of the each frame
      cv2.line(frame, (70, line_y_red), (1200, line_y_red ), (0, 0, 255), 3)
      cv2.putText(frame, "Red Line", (20, line_y_red - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)

      cv2.line(frame, (70, line_y_blue), (1200, line_y_blue), (255, 0, 0), 3)
      cv2.putText(frame, "Blue Line", (20, line_y_blue - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)

      # Loop through each detected object
      for box, track_id, class_idx, conf in zip(boxes, track_ids, class_indices, confidence):
        x1, y1, x2, y2 = map(int, box)

        cx = (x1 + x2) // 2 # Calculate the center point
        cy = (y1 + y2) // 2


        # Get the class name using the class index
        class_name = class_list[class_idx]

        # Draw a dot at the center and display the tracking ID and class name
        cv2.circle(frame, (cx, cy), 4, (0, 0, 255), -1)
        cv2.putText(frame, f"ID: {track_id} {class_name} {confidence[-1]:.2f}", (x1, y1 - 10),
        cv2.FONT_HERSHEY_COMPLEX, 0.6, (0, 255, 255), 2)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)


        # check if the object crosse the red line
        if line_y_red - 3 <= cy <= line_y_red + 3:
          # Record that the object crossed the red line
          if track_id not in crossed_red_first:
            crossed_red_first[track_id] = True

        # check if the object crosses the blue line
        if line_y_blue - 3 <= cy <= line_y_blue + 3:
          # Record that the object crossed the blue line
          if track_id not in crossed_blue_first:
            crossed_blue_first[track_id] = True


        # Counting logic for downward direction (red -> blue)
        if track_id in crossed_red_first and track_id not in counted_ids_red_to_blue:
          if line_y_blue -5 <= cy <= line_y_blue + 5:
            count_red_to_blue[class_name] += 1
            counted_ids_red_to_blue.add(track_id)

        # Counting logic for upward direction (blue -> red)
        if track_id in crossed_blue_first and track_id not in counted_ids_blue_to_red:
          if line_y_red -5 <= cy <= line_y_red + 5:
            count_blue_to_red[class_name] += 1
            counted_ids_blue_to_red.add(track_id)

    # Display the counts on the frame
    y_offset = 30
    for class_name, count in count_red_to_blue.items():
      cv2.putText(frame, f"{class_name} (Down): {count}", (10, y_offset),
                  cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA)
      y_offset += 30

    y_offset += 20 # Add spacing for upward counts
    for class_name, count in count_blue_to_red.items():
      cv2.putText(frame, f"{class_name} (Up): {count}", (10, y_offset),
                 cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0 ,255), 2, cv2.LINE_AA)
      y_offset += 30

     


    # Show the frame
    cv2.imshow("YOLO Object Tracking & Counting", frame)

    # Exit loop if 'ESC' key is pressed
    if cv2.waitKey(1) & 0xFF == 27:
        break

# Release resources
cap.release()
cv2.destroyAllWindows()


ModuleNotFoundError: No module named 'ultralytics'

In [None]:
count_blue_to_red

defaultdict(int, {})

In [None]:
count_red_to_blue

defaultdict(int, {'car': 9})

In [None]:
import cv2
import math
from ultralytics import YOLO
from collections import defaultdict

# Load YOLO model
model = YOLO("yolo11n.pt")

# Class list
class_list = model.names
print("Classes:", class_list)

# Open video file
cap = cv2.VideoCapture("4.mp4")
fps = cap.get(cv2.CAP_PROP_FPS)

# Pixel-to-meter calibration (adjust based on your camera setup)
PIXEL_TO_METER = 0.05  # 1 pixel = 0.05 m (example)

# Define line positions for counting
line_y_red = 350
line_y_blue = line_y_red + 50

# Tracking dictionaries
counted_ids_red_to_blue = set()
counted_ids_blue_to_red = set()
count_red_to_blue = defaultdict(int)
count_blue_to_red = defaultdict(int)
crossed_red_first = {}
crossed_blue_first = {}

# Speed tracking
prev_positions = {}  # {track_id: (cx, cy)}
speeds = {}          # {track_id: current_speed_kmh}

# Storage for results
results_list = []    # [[frame, id1, id2, speed1, speed2, distance], ...]

frame_no = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    frame_no += 1

    # Run YOLO tracker
    results = model.track(frame, persist=True, tracker="bytetrack.yaml")

    if results[0].boxes.data is not None:
        boxes = results[0].boxes.xyxy.cpu()
        track_ids = results[0].boxes.id.int().cpu().tolist()
        class_indices = results[0].boxes.cls.int().cpu().tolist()
        confidence = results[0].boxes.conf.cpu()

        vehicle_positions = []

        # Draw counting lines
        cv2.line(frame, (70, line_y_red), (1200, line_y_red), (0, 0, 255), 3)
        cv2.putText(frame, "Red Line", (20, line_y_red - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
        cv2.line(frame, (70, line_y_blue), (1200, line_y_blue), (255, 0, 0), 3)
        cv2.putText(frame, "Blue Line", (20, line_y_blue - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)

        # Process each detected vehicle
        for box, track_id, class_idx, conf in zip(boxes, track_ids, class_indices, confidence):
            x1, y1, x2, y2 = map(int, box)
            cx = (x1 + x2) // 2
            cy = (y1 + y2) // 2
            class_name = class_list[class_idx]

            # --- Speed calculation ---
            if track_id in prev_positions:
                px, py = prev_positions[track_id]
                dist_pixels = math.sqrt((cx - px)**2 + (cy - py)**2)
                dist_meters = dist_pixels * PIXEL_TO_METER
                speed_mps = dist_meters * fps
                speed_kmh = speed_mps * 3.6
            else:
                speed_kmh = 0.0

            prev_positions[track_id] = (cx, cy)
            speeds[track_id] = speed_kmh

            vehicle_positions.append((track_id, cx, cy, speed_kmh, class_name))

            # Draw detections
            cv2.circle(frame, (cx, cy), 4, (0, 0, 255), -1)
            cv2.putText(frame, f"ID:{track_id} {class_name} {speed_kmh:.1f} km/h",
                        (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
                        0.6, (0, 255, 255), 2)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # --- Counting logic ---
            if line_y_red - 3 <= cy <= line_y_red + 3:
                if track_id not in crossed_red_first:
                    crossed_red_first[track_id] = True

            if line_y_blue - 3 <= cy <= line_y_blue + 3:
                if track_id not in crossed_blue_first:
                    crossed_blue_first[track_id] = True

            if track_id in crossed_red_first and track_id not in counted_ids_red_to_blue:
                if line_y_blue - 5 <= cy <= line_y_blue + 5:
                    count_red_to_blue[class_name] += 1
                    counted_ids_red_to_blue.add(track_id)

            if track_id in crossed_blue_first and track_id not in counted_ids_blue_to_red:
                if line_y_red - 5 <= cy <= line_y_red + 5:
                    count_blue_to_red[class_name] += 1
                    counted_ids_blue_to_red.add(track_id)

            # Hard braking threshold (20 km/h drop ek frame se dusre frame me)
            HARD_BRAKE_THRESHOLD = 20  

            # Initialize counter before loop
            hard_braking_events = 0  

            # Inside object loop
            if track_id in speeds:  
                prev_speed = speeds[track_id]  
                if prev_speed - speed_kmh > HARD_BRAKE_THRESHOLD:  
                    hard_braking_events += 1  


        # --- Distance calculation between vehicles ---
        for i in range(len(vehicle_positions)):
            for j in range(i + 1, len(vehicle_positions)):
                id1, x1, y1, v1, cname1 = vehicle_positions[i]
                id2, x2, y2, v2, cname2 = vehicle_positions[j]
                dist_pixels = math.sqrt((x1 - x2)**2 + (y1 - y2)**2)
                dist_meters = dist_pixels * PIXEL_TO_METER

                # Save results
                results_list.append([frame_no, id1, id2, v1, v2, dist_meters])

                tailgating_events = 0
                if dist_meters < 10:  # 10 meter se kam distance = tailgating
                    tailgating_events += 1
                    cv2.line(frame, (x1, y1), (x2, y2), (255, 255, 0), 2)
                    cv2.putText(frame, f"{dist_meters:.1f} m",
                    ((x1 + x2)//2, (y1 + y2)//2),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                    (200, 255, 0), 2)


                # Draw lines between close vehicles
                if dist_meters < 10:  # threshold = 10 meters
                    cv2.line(frame, (x1, y1), (x2, y2), (255, 255, 0), 2)
                    cv2.putText(frame, f"{dist_meters:.1f} m",
                                ((x1 + x2)//2, (y1 + y2)//2),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                                (200, 255, 0), 2)

    # Display counts
    y_offset = 30
    for class_name, count in count_red_to_blue.items():
        cv2.putText(frame, f"{class_name} (Down): {count}", (10, y_offset),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        y_offset += 30

    y_offset += 20
    for class_name, count in count_blue_to_red.items():
        cv2.putText(frame, f"{class_name} (Up): {count}", (10, y_offset),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        y_offset += 30

    # Show frame
    cv2.imshow("YOLO Tracking with Speed & Distance", frame)

    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
cv2.destroyAllWindows()

# Print results for verification
print("Results List (first 10 entries):")
for r in results_list[:10]:
    print(r)


Classes: {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68

In [None]:
# Print results for verification
# [frame_no, vehicle_id1, vehicle_id2, speed1_kmh, speed2_kmh, distance_m]
print("Results List (first 20 entries):")
for r in results_list[:20]:
    print(r)

Results List (first 20 entries):
[1, 1, 2, 0.0, 0.0, 13.156462290448752]
[1, 1, 3, 0.0, 0.0, 19.95100248107849]
[1, 1, 4, 0.0, 0.0, 16.945205811674285]
[1, 1, 5, 0.0, 0.0, 14.637024970942697]
[1, 1, 6, 0.0, 0.0, 22.828600482727804]
[1, 1, 7, 0.0, 0.0, 34.40744832154805]
[1, 1, 8, 0.0, 0.0, 26.474940981992763]
[1, 1, 9, 0.0, 0.0, 26.487733009829288]
[1, 1, 10, 0.0, 0.0, 39.52192429525668]
[1, 1, 11, 0.0, 0.0, 30.491187251401023]
[1, 2, 3, 0.0, 0.0, 9.104119946485767]
[1, 2, 4, 0.0, 0.0, 19.628104849933933]
[1, 2, 5, 0.0, 0.0, 10.20024509509453]
[1, 2, 6, 0.0, 0.0, 10.925314640778087]
[1, 2, 7, 0.0, 0.0, 22.004658597669724]
[1, 2, 8, 0.0, 0.0, 14.057204558517316]
[1, 2, 9, 0.0, 0.0, 14.058893982102575]
[1, 2, 10, 0.0, 0.0, 27.02267566322773]
[1, 2, 11, 0.0, 0.0, 17.971366113904644]
[1, 3, 4, 0.0, 0.0, 18.29077636405847]


In [None]:
# Print results for verification
# [frame_no, vehicle_id1, vehicle_id2, speed1_kmh, speed2_kmh, distance_m]
print("Results List (Last 20 entries):")
for r in results_list[:-20:-1]:
    print(r)

Results List (Last 20 entries):
[403, 231, 287, 9.0, 6.3639610306789285, 4.0388736053508785]
[403, 285, 287, 0.0, 6.3639610306789285, 9.737812896128165]
[403, 285, 231, 0.0, 9.0, 13.413146536141324]
[403, 148, 287, 24.23324163210527, 6.3639610306789285, 20.523279465036772]
[403, 148, 231, 24.23324163210527, 9.0, 17.84383647089381]
[403, 148, 285, 24.23324163210527, 0.0, 30.0341472327749]
[403, 272, 287, 6.3639610306789285, 6.3639610306789285, 5.01422975141746]
[403, 272, 231, 6.3639610306789285, 9.0, 8.85353036929337]
[403, 272, 285, 6.3639610306789285, 0.0, 4.759464255564906]
[403, 272, 148, 6.3639610306789285, 24.23324163210527, 25.297677759035512]
[403, 117, 287, 30.186917696247168, 6.3639610306789285, 45.29127951383136]
[403, 117, 231, 30.186917696247168, 9.0, 42.12247025044947]
[403, 117, 285, 30.186917696247168, 0.0, 54.95730160770269]
[403, 117, 148, 30.186917696247168, 24.23324163210527, 25.075585735930478]
[403, 117, 272, 30.186917696247168, 6.3639610306789285, 50.199128478490

In [None]:
print(len(results_list))

7883


In [None]:
count_blue_to_red

defaultdict(int, {})

In [None]:
count_red_to_blue

defaultdict(int, {'car': 9})

In [None]:
car = count_red_to_blue['car']
print(car)   

motorcycle = count_red_to_blue['motorcycle']
print(motorcycle)

truck = count_red_to_blue['truck']
print(truck)

ambulance=count_red_to_blue['ambulance']
print(ambulance)

bus=count_red_to_blue['bus']
print(bus)


actualcount={'car':car,'motorcycle':motorcycle,'truck':truck,'ambulance':ambulance}
print(actualcount)

9
0
0
0
0
{'car': 9, 'motorcycle': 0, 'truck': 0, 'ambulance': 0}


In [None]:
# ===============================
# Comprehensive Priority Score (CPS) Calculator
# ===============================

# -------------------------
# Tuning knobs (adjustable)
# -------------------------
# Percentage contributions (calibrated based on discussion)
TRAFFIC_CONTRIB = 0.65  # Part 1: Traffic Score
SAFETY_CONTRIB = 0.12   # Part 2: Safety Penalty
GREEN_WAVE_CONTRIB = 0.23  # Part 3: Green Wave Bonus

# Base weights for events
HARD_BRAKING_POINTS = 2
TAILGATING_POINTS = 1

# Max scale for ETA
MAX_ETA_SCALE = 6

# -------------------------
# Traffic class weights
# -------------------------
CLASS_WEIGHTS = {
    "ambulance": 12,
    "bus": 3.5,
    "truck": 2.5,
    "car": 1,
    "bike": 0.3
}

# -------------------------
# Part 1: Traffic Score
# -------------------------
carscore=1*car
ambulancescore=12*ambulance
bikescore=0.3*motorcycle
truckscore=2.5*truck
busscore=3.5*bus

traffic_score=carscore+bikescore+ambulancescore+truckscore+busscore

print("---- Traffic Score Calculator ----")

print("The traffic score is",traffic_score)


---- Traffic Score Calculator ----
The traffic score is 9.0


In [None]:
hard_brakes = []
prev_speeds = {}
brake_state = {}   # vehicle_id -> True agar gaadi abhi stop hai

for frame_no, id1, id2, v1, v2, dist in results_list:
    for vid, v in [(id1, v1), (id2, v2)]:

        # Agar pehle se speed stored hai
        if vid in prev_speeds:
            # Case: Pehle chal rahi thi (>0), abhi 0 → Hard Brake (sirf first stop par count)
            if prev_speeds[vid] > 0 and v == 0 and not brake_state.get(vid, False):
                hard_brakes.append((frame_no, vid, prev_speeds[vid]))
                brake_state[vid] = True   # abhi stop state me hai

            # Agar gaadi dobara chalne lagi toh reset stop-state
            elif v > 0:
                brake_state[vid] = False

        prev_speeds[vid] = v

# Duplicate hard brakes hatao (same vehicle ke multiple 0 ek hi stop me gaye hon)
unique_hard_brakes = {}
for frame_no, vid, speed in hard_brakes:
    if vid not in unique_hard_brakes:
        unique_hard_brakes[vid] = (frame_no, speed)

hard_braking_count = len(unique_hard_brakes)
print("Hard Braking Count:", hard_braking_count)


Hard Braking Count: 12


In [None]:
TAILGATE_DISTANCE = 10.0   # meters
MIN_SPEED = 20.0           # km/h
MIN_CONSEC_FRAMES = 5      # kitne consecutive frames me hona chahiye

pair_states = {}   # (id1,id2) -> consecutive frame counter
tailgates = []

for frame_no, id1, id2, v1, v2, dist in results_list:
    pair = tuple(sorted((id1, id2)))
    if dist <= TAILGATE_DISTANCE and v1 >= MIN_SPEED and v2 >= MIN_SPEED:
        # increase streak
        pair_states[pair] = pair_states.get(pair, 0) + 1
        # jab streak threshold paar kare to ek event count karo
        if pair_states[pair] == MIN_CONSEC_FRAMES:
            tailgates.append((frame_no, pair[0], pair[1], dist))
    else:
        pair_states[pair] = 0  # streak reset

tailgating_count = len(tailgates)
print("Tailgating Count:", tailgating_count)


Tailgating Count: 5
