In [2]:
import argparse
from collections import defaultdict, deque
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv

# Define SOURCE and TARGET constants
SOURCE = np.array(((800, 410), (1125, 410), (1920, 850), (0, 850)))

TARGET_WIDTH = 32
TARGET_HEIGHT = 140

TARGET = np.array(
    [
        [0, 0],
        [TARGET_WIDTH - 1, 0],
        [TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
        [0, TARGET_HEIGHT - 1],
    ]
)

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

if __name__ == "__main__":
    video_info = sv.VideoInfo.from_video_path(video_path='./asset/m6-motorway-trim.mp4')
    video_info.fps = 25
    
    model = YOLO("yolov8n.pt")

    byte_track = sv.ByteTrack(
        frame_rate=video_info.fps, track_activation_threshold=0.3
    )

    thickness = sv.calculate_optimal_line_thickness(
        resolution_wh=video_info.resolution_wh
    )
    text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)

    box_annotator = sv.BoxAnnotator(thickness=thickness)
    trace_annotator = sv.TraceAnnotator(
        thickness=thickness,
        trace_length=video_info.fps * 2,
        position=sv.Position.BOTTOM_CENTER,
    )

    # Two label annotators for top-left and bottom-center
    label_annotator_top_left = sv.LabelAnnotator(
        text_scale=text_scale,
        text_thickness=thickness,
        text_position=sv.Position.TOP_LEFT,
    )

    label_annotator_bottom = sv.LabelAnnotator(
        text_scale=text_scale,
        text_thickness=thickness,
        text_position=sv.Position.BOTTOM_CENTER,
    )

    frame_generator = sv.get_video_frames_generator(source_path='./asset/m6-motorway-trim.mp4')

    polygon_zone = sv.PolygonZone(polygon=SOURCE)
    view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

    coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))
    
    # Vehicle counts by type
    vehicle_counts = defaultdict(int)

    # Set to track unique tracker IDs
    processed_tracker_ids = set()

    with sv.VideoSink('./asset/m6-motorway-trim-result.mp4', video_info) as sink:
        with open('./asset/speed_results.txt', 'w') as f:  
            for frame in frame_generator:
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)]
                detections = detections.with_nms(threshold=0.6)
                detections = byte_track.update_with_detections(detections=detections)

                points = detections.get_anchors_coordinates(
                    anchor=sv.Position.BOTTOM_CENTER
                )
                points = view_transformer.transform_points(points=points).astype(int)

                # Two label lists
                top_left_labels = []
                bottom_labels = []

                for tracker_id, [_, y], class_id in zip(detections.tracker_id, points, detections.class_id):
                    coordinates[tracker_id].append(y)
                    vehicle_type = CLASS_NAMES.get(class_id, "unknown")

                    top_left_labels.append(vehicle_type)  # Label for top-left

                    if vehicle_type != "unknown" and tracker_id not in processed_tracker_ids:
                        # Increment the count for this vehicle type only once per tracker_id
                        vehicle_counts[vehicle_type] += 1
                        processed_tracker_ids.add(tracker_id)  # Mark tracker_id as processed

                    if len(coordinates[tracker_id]) < video_info.fps / 2:
                        bottom_labels.append(f"#{tracker_id}")
                    else:
                        coordinate_start = coordinates[tracker_id][-1]
                        coordinate_end = coordinates[tracker_id][0]
                        distance = abs(coordinate_start - coordinate_end)
                        time = len(coordinates[tracker_id]) / video_info.fps
                        speed = distance / time * 3.6
                        bottom_labels.append(f"#{tracker_id} {int(speed)} km/h")
                        
                        f.write(f"Tracker ID: {tracker_id}, Type: {vehicle_type}, Speed: {int(speed)} km/h\n")

                annotated_frame = frame.copy()
                annotated_frame = trace_annotator.annotate(scene=annotated_frame, detections=detections)
                annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)

                # Apply both label sets
                annotated_frame = label_annotator_top_left.annotate(
                    scene=annotated_frame, detections=detections, labels=top_left_labels
                )
                annotated_frame = label_annotator_bottom.annotate(
                    scene=annotated_frame, detections=detections, labels=bottom_labels
                )

                sink.write_frame(annotated_frame)
                cv2.imshow("frame", annotated_frame)
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        print("\nVehicle Counts:")
        for vehicle, count in vehicle_counts.items():
            print(f"{vehicle}: {count}")

        cv2.destroyAllWindows()



0: 384x640 11 cars, 1 bus, 1 train, 2 trucks, 228.5ms
Speed: 55.7ms preprocess, 228.5ms inference, 3.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 1 train, 2 trucks, 172.5ms
Speed: 8.8ms preprocess, 172.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 1 train, 2 trucks, 136.8ms
Speed: 3.5ms preprocess, 136.8ms inference, 3.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 1 train, 2 trucks, 128.5ms
Speed: 3.6ms preprocess, 128.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 1 train, 1 truck, 106.2ms
Speed: 3.0ms preprocess, 106.2ms inference, 2.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 train, 2 trucks, 121.9ms
Speed: 4.4ms preprocess, 121.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 train, 1 truck, 102.7ms
Speed: 4.0ms preprocess, 102.7ms inference

In [12]:
import argparse
from collections import defaultdict, deque
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv

# Define SOURCE and TARGET constants
SOURCE = np.array(((432,  24), (488, 709), (887, 592), (578,  17)))

TARGET_WIDTH = 5
TARGET_HEIGHT = 130

TARGET = np.array(
    [
        [0, 0],
        [TARGET_WIDTH - 1, 0],
        [TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
        [0, TARGET_HEIGHT - 1],
    ]
)

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

if __name__ == "__main__":
    video_info = sv.VideoInfo.from_video_path(video_path='./asset/videoplayback.mp4')
    video_info.fps = 25
    
    model = YOLO("yolo11n.pt")

    byte_track = sv.ByteTrack(
        frame_rate=video_info.fps, track_activation_threshold=0.3
    )

    thickness = sv.calculate_optimal_line_thickness(
        resolution_wh=video_info.resolution_wh
    )
    text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)

    box_annotator = sv.BoxAnnotator(thickness=thickness)
    trace_annotator = sv.TraceAnnotator(
        thickness=thickness,
        trace_length=video_info.fps * 2,
        position=sv.Position.BOTTOM_CENTER,
    )

    # Two label annotators for top-left and bottom-center
    label_annotator_top_left = sv.LabelAnnotator(
        text_scale=text_scale,
        text_thickness=thickness,
        text_position=sv.Position.TOP_LEFT,
    )

    label_annotator_bottom = sv.LabelAnnotator(
        text_scale=text_scale,
        text_thickness=thickness,
        text_position=sv.Position.BOTTOM_CENTER,
    )

    frame_generator = sv.get_video_frames_generator(source_path='./asset/videoplayback.mp4')

    polygon_zone = sv.PolygonZone(polygon=SOURCE)
    view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

    coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))
    
    # Vehicle counts by type
    vehicle_counts = defaultdict(int)

    # Set to track unique tracker IDs
    processed_tracker_ids = set()

    with sv.VideoSink('./asset/Merge-way-full.mp4-result.mp4', video_info) as sink:
        with open('./asset/speed_resultss.txt', 'w') as f:  
            for frame in frame_generator:
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)]
                detections = detections.with_nms(threshold=0.6)
                detections = byte_track.update_with_detections(detections=detections)

                points = detections.get_anchors_coordinates(
                    anchor=sv.Position.BOTTOM_CENTER
                )
                points = view_transformer.transform_points(points=points).astype(int)

                # Two label lists
                top_left_labels = []
                bottom_labels = []

                for tracker_id, [_, y], class_id in zip(detections.tracker_id, points, detections.class_id):
                    coordinates[tracker_id].append(y)
                    vehicle_type = CLASS_NAMES.get(class_id, "unknown")

                    top_left_labels.append(vehicle_type)  # Label for top-left

                    if vehicle_type != "unknown" and tracker_id not in processed_tracker_ids:
                        # Increment the count for this vehicle type only once per tracker_id
                        vehicle_counts[vehicle_type] += 1
                        processed_tracker_ids.add(tracker_id)  # Mark tracker_id as processed

                    if len(coordinates[tracker_id]) < video_info.fps / 2:
                        bottom_labels.append(f"#{tracker_id}")
                    else:
                        coordinate_start = coordinates[tracker_id][-1]
                        coordinate_end = coordinates[tracker_id][0]
                        distance = abs(coordinate_start - coordinate_end)
                        time = len(coordinates[tracker_id]) / video_info.fps
                        speed = distance / time * 3.6
                        bottom_labels.append(f"#{tracker_id} {int(speed)} km/h")
                        
                        # Write to file only if tracker_id is new (not processed before)
                        f.write(f"Tracker ID: {tracker_id}, Type: {vehicle_type}, Speed: {int(speed)} km/h\n")

                annotated_frame = frame.copy()
                annotated_frame = trace_annotator.annotate(scene=annotated_frame, detections=detections)
                annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)

                # Apply both label sets
                annotated_frame = label_annotator_top_left.annotate(
                    scene=annotated_frame, detections=detections, labels=top_left_labels
                )
                annotated_frame = label_annotator_bottom.annotate(
                    scene=annotated_frame, detections=detections, labels=bottom_labels
                )

                sink.write_frame(annotated_frame)
                cv2.imshow("frame", annotated_frame)
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        print("\nVehicle Counts:")
        for vehicle, count in vehicle_counts.items():
            print(f"{vehicle}: {count}")

        cv2.destroyAllWindows()




0: 384x640 8 cars, 109.6ms
Speed: 7.5ms preprocess, 109.6ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 91.4ms
Speed: 3.0ms preprocess, 91.4ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 94.3ms
Speed: 2.0ms preprocess, 94.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 76.3ms
Speed: 6.3ms preprocess, 76.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 72.2ms
Speed: 1.0ms preprocess, 72.2ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 cars, 61.1ms
Speed: 2.0ms preprocess, 61.1ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 64.7ms
Speed: 1.0ms preprocess, 64.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 66.3ms
Speed: 2.0ms preprocess, 66.3ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 38

In [14]:
import argparse
from collections import defaultdict
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
import csv

# Define SOURCE and TARGET constants
SOURCE = np.array(((432,  24), (488, 709), (887, 592), (578,  17)))

TARGET_WIDTH = 5
TARGET_HEIGHT = 130

TARGET = np.array(
    [
        [0, 0],
        [TARGET_WIDTH - 1, 0],
        [TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
        [0, TARGET_HEIGHT - 1],
    ]
)

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

if __name__ == "__main__":
    video_info = sv.VideoInfo.from_video_path(video_path='./asset/videoplayback.mp4')
    video_info.fps = 25

    model = YOLO("yolo11n.pt")

    byte_track = sv.ByteTrack(
        frame_rate=video_info.fps, track_activation_threshold=0.3
    )

    thickness = sv.calculate_optimal_line_thickness(
        resolution_wh=video_info.resolution_wh
    )
    text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)

    box_annotator = sv.BoxAnnotator(thickness=thickness)
    trace_annotator = sv.TraceAnnotator(
        thickness=thickness,
        trace_length=video_info.fps * 2,
        position=sv.Position.BOTTOM_CENTER,
    )

    label_annotator_top_left = sv.LabelAnnotator(
        text_scale=text_scale,
        text_thickness=thickness,
        text_position=sv.Position.TOP_LEFT,
    )

    label_annotator_bottom = sv.LabelAnnotator(
        text_scale=text_scale,
        text_thickness=thickness,
        text_position=sv.Position.BOTTOM_CENTER,
    )

    frame_generator = sv.get_video_frames_generator(source_path='./asset/videoplayback.mp4')

    polygon_zone = sv.PolygonZone(polygon=SOURCE)
    view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

    vehicle_counts = defaultdict(int)
    processed_tracker_ids = set()

    with open('./asset/tracking_results.csv', mode='a', newline='') as csvfile:
        fieldnames = ["tracker_id", "vehicle_type"]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        if csvfile.tell() == 0:
            writer.writeheader()

        try:
            with sv.VideoSink('./asset/TrackingOnlyResult.mp4', video_info) as sink:
                tracker_types = {}  # Moved outside the loop to persist vehicle types

                for frame in frame_generator:
                    result = model(frame)[0]
                    detections = sv.Detections.from_ultralytics(result)
                    detections = detections[detections.confidence > 0.5]
                    detections = detections[polygon_zone.trigger(detections)]
                    detections = detections.with_nms(threshold=0.6)
                    detections = byte_track.update_with_detections(detections=detections)

                    points = detections.get_anchors_coordinates(
                        anchor=sv.Position.BOTTOM_CENTER
                    )
                    points = view_transformer.transform_points(points=points).astype(int)

                    top_left_labels = []
                    bottom_labels = []

                    for tracker_id, point, class_id in zip(detections.tracker_id, points, detections.class_id):
                        # Removed coordinates tracking and speed calculation
                        if tracker_id not in tracker_types:
                            tracker_types[tracker_id] = CLASS_NAMES.get(class_id, "unknown")

                        vehicle_type = tracker_types[tracker_id]

                        top_left_labels.append(vehicle_type)
                        bottom_labels.append(f"#{tracker_id}")

                        if vehicle_type != "unknown" and tracker_id not in processed_tracker_ids:
                            vehicle_counts[vehicle_type] += 1
                            writer.writerow({"tracker_id": tracker_id, "vehicle_type": vehicle_type})
                            csvfile.flush()
                            processed_tracker_ids.add(tracker_id)

                    annotated_frame = trace_annotator.annotate(scene=frame.copy(), detections=detections)
                    annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)

                    annotated_frame = label_annotator_top_left.annotate(
                        scene=annotated_frame, detections=detections, labels=top_left_labels
                    )
                    annotated_frame = label_annotator_bottom.annotate(
                        scene=annotated_frame, detections=detections, labels=bottom_labels
                    )

                    sink.write_frame(annotated_frame)
                    cv2.imshow("Tracking Only", annotated_frame)
                    if cv2.waitKey(1) & 0xFF == ord("q"):
                        break

        except Exception as e:
            print(f"Error: {e}")

        finally:
            csvfile.flush()
            print("Tracking data has been saved.")

        cv2.destroyAllWindows()
        print("Vehicle Counts:")
        for vehicle, count in vehicle_counts.items():
            print(f"{vehicle}: {count}")



0: 384x640 8 cars, 59.6ms
Speed: 2.0ms preprocess, 59.6ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 110.6ms
Speed: 3.0ms preprocess, 110.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 92.3ms
Speed: 5.0ms preprocess, 92.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 69.5ms
Speed: 2.0ms preprocess, 69.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 67.5ms
Speed: 2.0ms preprocess, 67.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 cars, 67.5ms
Speed: 1.1ms preprocess, 67.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 60.5ms
Speed: 3.0ms preprocess, 60.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 67.1ms
Speed: 2.0ms preprocess, 67.1ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 38

In [16]:
import argparse
from collections import defaultdict, deque
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
import csv

# Define SOURCE and TARGET constants
SOURCE = np.array(((422, 10), (535, 649), (801, 665), (594, 16)))

TARGET_WIDTH = 5
TARGET_HEIGHT = 130

TARGET = np.array(
    [
        [0, 0],
        [TARGET_WIDTH - 1, 0],
        [TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
        [0, TARGET_HEIGHT - 1],
    ]
)

# Define Stop Zone (Before the giveaway sign)
STOP_ZONE = np.array(
    [
        (540, 307),
        (735, 310),
        (746, 557),
        (490, 555),
    ]
)

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

if __name__ == "__main__":
    video_info = sv.VideoInfo.from_video_path(video_path='./asset/videoplayback.mp4')
    video_info.fps = 25

    model = YOLO("yolo11n.pt")

    byte_track = sv.ByteTrack(
        frame_rate=video_info.fps, track_activation_threshold=0.3
    )

    thickness = sv.calculate_optimal_line_thickness(
        resolution_wh=video_info.resolution_wh
    )
    text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)

    box_annotator = sv.BoxAnnotator(thickness=thickness)
    trace_annotator = sv.TraceAnnotator(
        thickness=thickness,
        trace_length=video_info.fps * 2,
        position=sv.Position.BOTTOM_CENTER,
    )

    label_annotator_top_left = sv.LabelAnnotator(
        text_scale=text_scale,
        text_thickness=thickness,
        text_position=sv.Position.TOP_LEFT,
    )

    label_annotator_bottom = sv.LabelAnnotator(
        text_scale=text_scale,
        text_thickness=thickness,
        text_position=sv.Position.BOTTOM_CENTER,
    )

    frame_generator = sv.get_video_frames_generator(source_path='./asset/videoplayback.mp4')

    polygon_zone = sv.PolygonZone(polygon=SOURCE)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE)  # Define the stop zone
    view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

    vehicle_counts = defaultdict(int)
    processed_tracker_ids = set()
    stopped_vehicles = defaultdict(int)  # Track how long a vehicle has been stopped

    # Dictionary to store the latest status for each tracker ID
    tracker_status = {}

    with open('./asset/tracking_results.csv', mode='w', newline='') as csvfile:
        fieldnames = ["tracker_id", "vehicle_type", "status"]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()  # Write the header once

        try:
            with sv.VideoSink('./asset/TrackingWithStopResult.mp4', video_info) as sink:
                tracker_types = {}  # Moved outside the loop to persist vehicle types

                for frame in frame_generator:
                    result = model(frame)[0]
                    detections = sv.Detections.from_ultralytics(result)
                    detections = detections[detections.confidence > 0.4]
                    detections = detections[polygon_zone.trigger(detections)]
                    detections = detections.with_nms(threshold=0.6)
                    detections = byte_track.update_with_detections(detections=detections)

                    # Get both original and transformed anchor points
                    anchor_points = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                    transformed_points = view_transformer.transform_points(points=anchor_points).astype(int)

                    top_left_labels = []
                    bottom_labels = []

                    for tracker_id, orig_point, trans_point, class_id in zip(
                        detections.tracker_id, anchor_points, transformed_points, detections.class_id
                    ):
                        if tracker_id not in tracker_types:
                            tracker_types[tracker_id] = CLASS_NAMES.get(class_id, "unknown")

                        vehicle_type = tracker_types[tracker_id]
                        status = "moving"

                        # Check if this specific vehicle is inside the stop zone using original point
                        pt = tuple(map(float, orig_point))  # ensure it's (x, y) and float
                        if cv2.pointPolygonTest(stop_zone.polygon.astype(np.float32), pt, False) >= 0:

                            stopped_vehicles[tracker_id] += 1

                            if stopped_vehicles[tracker_id] > video_info.fps * 2:
                                status = "stopped"
                            elif stopped_vehicles[tracker_id] > video_info.fps:
                                status = "slower"
                        else:
                            stopped_vehicles[tracker_id] = 0

                        # Update tracker status in memory
                        if tracker_id not in tracker_status or (
                            tracker_status[tracker_id]["status"] != "stopped" and
                            (status == "stopped" or (status == "slower" and tracker_status[tracker_id]["status"] == "moving"))
                        ):
                            tracker_status[tracker_id] = {"vehicle_type": vehicle_type, "status": status}

                            # Overwrite the CSV file with updated tracker statuses
                            csvfile.seek(0)  # Move to the start of the file
                            csvfile.truncate()  # Clear the file
                            writer.writeheader()  # Re-write the header
                            for tid, data in tracker_status.items():
                                writer.writerow({"tracker_id": tid, "vehicle_type": data["vehicle_type"], "status": data["status"]})
                            csvfile.flush()

                        # Update labels
                        if status == "stopped":
                            top_left_labels.append(f"{vehicle_type} stopped")
                        elif status == "slower":
                            top_left_labels.append(f"{vehicle_type} slower")
                        else:
                            top_left_labels.append(vehicle_type)

                        bottom_labels.append(f"#{tracker_id}")

                    # Ensure labels match the number of detections
                    while len(top_left_labels) < len(detections):
                        top_left_labels.append("")
                    while len(bottom_labels) < len(detections):
                        bottom_labels.append("")

                    annotated_frame = trace_annotator.annotate(scene=frame.copy(), detections=detections)
                    annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)

                    annotated_frame = label_annotator_top_left.annotate(
                        scene=annotated_frame, detections=detections, labels=top_left_labels
                    )
                    annotated_frame = label_annotator_bottom.annotate(
                        scene=annotated_frame, detections=detections, labels=bottom_labels
                    )

                    # Draw stop zone polygon (optional but useful)
                    cv2.polylines(annotated_frame, [STOP_ZONE], isClosed=True, color=(0, 255, 255), thickness=2)

                    sink.write_frame(annotated_frame)
                    cv2.imshow("Tracking with Stop", annotated_frame)
                    if cv2.waitKey(1) & 0xFF == ord("q"):
                        break

        except Exception as e:
            print(f"Error: {e}")

        finally:
            csvfile.flush()
            print("Tracking data has been saved.")

        cv2.destroyAllWindows()
        print("Vehicle Counts:")
        for vehicle, count in vehicle_counts.items():
            print(f"{vehicle}: {count}")



0: 384x640 8 cars, 76.3ms
Speed: 3.0ms preprocess, 76.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 124.2ms
Speed: 4.8ms preprocess, 124.2ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 107.4ms
Speed: 3.1ms preprocess, 107.4ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 77.9ms
Speed: 2.0ms preprocess, 77.9ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 72.6ms
Speed: 1.0ms preprocess, 72.6ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 cars, 79.4ms
Speed: 2.0ms preprocess, 79.4ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 76.1ms
Speed: 4.5ms preprocess, 76.1ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 80.3ms
Speed: 2.0ms preprocess, 80.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict

# Path to the saved speed data file
file_path = './asset/speed_resultss.txt'

# Dictionary to store speed data per vehicle type
vehicle_speeds = defaultdict(list)
vehicle_counts = defaultdict(int)

# Read speed data from the file
with open(file_path, 'r') as f:
    for line in f:
        parts = line.strip().split(',')
        if len(parts) == 3:
            vehicle_type = parts[1].split(":")[1].strip()
            speed = int(parts[2].split(":")[1].strip().split()[0])  # Extract speed value
            vehicle_speeds[vehicle_type].append(speed)
            vehicle_counts[vehicle_type] += 1  # Count vehicles per type

# Compute and print statistics
print("\nVehicle Type Counts:")
for vehicle, count in vehicle_counts.items():
    print(f"{vehicle}: {count}")

print("\nSpeed Statistics by Vehicle Type:")
for vehicle, speeds in vehicle_speeds.items():
    avg_speed = np.mean(speeds)
    max_speed = np.max(speeds)
    min_speed = np.min(speeds)
    print(f"{vehicle} -> Avg: {avg_speed:.2f} km/h, Max: {max_speed} km/h, Min: {min_speed} km/h")

# Plot Speed Distribution by Vehicle Type
plt.figure(figsize=(10, 5))
for vehicle, speeds in vehicle_speeds.items():
    plt.hist(speeds, bins=10, alpha=0.6, label=vehicle)

plt.xlabel('Speed (km/h)')
plt.ylabel('Frequency')
plt.title('Speed Distribution by Vehicle Type')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
import cv2
import numpy as np
from collections import defaultdict, Counter
from ultralytics import YOLO
import supervision as sv
import csv
import os

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(509, 203), (705, 189), (784, 700), (461, 690)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130
MOVEMENT_THRESHOLD = 5  # Pixels movement threshold for stationary detection
FRAME_BUFFER = 10  # Number of frames to track position history

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    csvfile = open(filepath, mode='w', newline='')
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    return csvfile, writer

# ---------- MAIN PIPELINE ---------- #

def main():
    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 25

    model = YOLO(MODEL_PATH)
    tracker = sv.ByteTrack(frame_rate=video_info.fps, track_activation_threshold=0.3)
    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    # Annotators
    thickness = sv.calculate_optimal_line_thickness(video_info.resolution_wh)
    text_scale = sv.calculate_optimal_text_scale(video_info.resolution_wh)
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps * 2, position=sv.Position.BOTTOM_CENTER),
        'label_top': sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.TOP_LEFT),
        'label_bottom': sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.BOTTOM_CENTER)
    }

    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    tracker_types = {}
    position_history = defaultdict(list)  # Store recent positions for each track
    status_cache = {}
    compliance_set = set()
    stop_zone_history = {}
    counted_ids = set()
    vehicle_type_counter = Counter()

    csvfile, writer = initialize_csv(OUTPUT_CSV_PATH, ["tracker_id", "vehicle_type", "status", "compliance"])
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                for track_id, orig_pt, _, class_id in zip(detections.tracker_id, anchor_pts, transformed_pts, detections.class_id):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = "moving"
                    compliance = 0

                    # Update position history
                    position_history[track_id].append(orig_pt)
                    if len(position_history[track_id]) > FRAME_BUFFER:
                        position_history[track_id].pop(0)

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)

                        # Check movement
                        if len(position_history[track_id]) >= FRAME_BUFFER:
                            movements = [
                                np.linalg.norm(position_history[track_id][i] - position_history[track_id][i-1])
                                for i in range(1, len(position_history[track_id]))
                            ]
                            avg_movement = np.mean(movements) if movements else 0
                            
                            if avg_movement < MOVEMENT_THRESHOLD:
                                status, compliance = "stationary", 1
                                compliance_set.add(track_id)

                        stop_zone_history[track_id] = {
                            "vehicle_type": vehicle_type,
                            "status": status,
                            "compliance": compliance
                        }
                    else:
                        position_history[track_id].clear()
                        if track_id not in compliance_set:
                            status = "moving"

                    if status_cache.get(track_id) != status:
                        status_cache[track_id] = status

                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update tracking status CSV
                csvfile.seek(0)
                csvfile.truncate()
                writer.writeheader()
                for tid, data in stop_zone_history.items():
                    writer.writerow({"tracker_id": tid, **data})
                csvfile.flush()

                # Update count CSV
                count_csvfile.seek(0)
                count_csvfile.truncate()
                count_writer.writeheader()
                for v_type, count in vehicle_type_counter.items():
                    count_writer.writerow({"vehicle_type": v_type, "count": count})
                count_csvfile.flush()

                # Padding labels
                top_labels += [""] * (len(detections) - len(top_labels))
                bottom_labels += [""] * (len(detections) - len(bottom_labels))

                # Annotate and display
                annotated = annotators['trace'].annotate(scene=frame.copy(), detections=detections)
                annotated = annotators['box'].annotate(annotated, detections)
                annotated = annotators['label_top'].annotate(annotated, detections, top_labels)
                annotated = annotators['label_bottom'].annotate(annotated, detections, bottom_labels)

                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        csvfile.close()
        count_csvfile.close()
        cv2.destroyAllWindows()
        print("[INFO] Tracking and counting completed successfully.")

# ---------- ENTRY POINT ---------- #
if __name__ == "__main__":
    main()


0: 384x640 9 cars, 1 truck, 70.1ms
Speed: 1.0ms preprocess, 70.1ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 1 truck, 100.0ms
Speed: 1.0ms preprocess, 100.0ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 truck, 78.3ms
Speed: 2.0ms preprocess, 78.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 71.2ms
Speed: 1.0ms preprocess, 71.2ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 58.4ms
Speed: 1.0ms preprocess, 58.4ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 68.8ms
Speed: 1.0ms preprocess, 68.8ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 72.4ms
Speed: 2.0ms preprocess, 72.4ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 1 truck, 82.2ms
Speed: 2.0ms preprocess, 8

In [None]:
import cv2
import numpy as np
from collections import defaultdict, Counter
from ultralytics import YOLO
import supervision as sv
import csv
import os

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(540, 307), (735, 310), (746, 557), (490, 555)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130
STABILITY_THRESHOLD = 10  # Max pixel change in bbox center/size for stationary
FRAME_BUFFER = 10  # Number of frames to track bbox history

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    csvfile = open(filepath, mode='w', newline='')
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    return csvfile, writer

# ---------- MAIN PIPELINE ---------- #

def main():
    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 25

    model = YOLO(MODEL_PATH)
    tracker = sv.ByteTrack(frame_rate=video_info.fps, track_activation_threshold=0.3)
    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    # Annotators
    thickness = sv.calculate_optimal_line_thickness(video_info.resolution_wh)
    text_scale = sv.calculate_optimal_text_scale(video_info.resolution_wh)
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps * 2, position=sv.Position.BOTTOM_CENTER),
        'label_top': sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.TOP_LEFT),
        'label_bottom': sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.BOTTOM_CENTER)
    }

    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    tracker_types = {}
    bbox_history = defaultdict(list)  # Store recent bounding box data (center, size)
    status_cache = {}
    compliance_set = set()
    stop_zone_history = {}
    counted_ids = set()
    vehicle_type_counter = Counter()

    csvfile, writer = initialize_csv(OUTPUT_CSV_PATH, ["tracker_id", "vehicle_type", "status", "compliance"])
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                for track_id, orig_pt, _, class_id, bbox in zip(
                    detections.tracker_id, anchor_pts, transformed_pts, detections.class_id, detections.xyxy
                ):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = "moving"
                    compliance = 0

                    # Calculate bounding box center and size
                    x1, y1, x2, y2 = bbox
                    center = np.array([(x1 + x2) / 2, (y1 + y2) / 2])
                    size = np.array([x2 - x1, y2 - y1])
                    bbox_data = np.concatenate([center, size])

                    # Update bbox history
                    bbox_history[track_id].append(bbox_data)
                    if len(bbox_history[track_id]) > FRAME_BUFFER:
                        bbox_history[track_id].pop(0)

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)

                        # Check bounding box stability
                        if len(bbox_history[track_id]) >= FRAME_BUFFER:
                            bbox_array = np.array(bbox_history[track_id])
                            max_changes = np.max(np.abs(bbox_array - bbox_array[0]), axis=0)
                            if np.all(max_changes < STABILITY_THRESHOLD):
                                status, compliance = "stationary", 1
                                compliance_set.add(track_id)

                        stop_zone_history[track_id] = {
                            "vehicle_type": vehicle_type,
                            "status": status,
                            "compliance": compliance
                        }
                    else:
                        bbox_history[track_id].clear()
                        if track_id not in compliance_set:
                            status = "moving"

                    if status_cache.get(track_id) != status:
                        status_cache[track_id] = status

                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update tracking status CSV
                csvfile.seek(0)
                csvfile.truncate()
                writer.writeheader()
                for tid, data in stop_zone_history.items():
                    writer.writerow({"tracker_id": tid, **data})
                csvfile.flush()

                # Update count CSV
                count_csvfile.seek(0)
                count_csvfile.truncate()
                count_writer.writeheader()
                for v_type, count in vehicle_type_counter.items():
                    count_writer.writerow({"vehicle_type": v_type, "count": count})
                count_csvfile.flush()

                # Padding labels
                top_labels += [""] * (len(detections) - len(top_labels))
                bottom_labels += [""] * (len(detections) - len(bottom_labels))

                # Annotate and display
                annotated = annotators['trace'].annotate(scene=frame.copy(), detections=detections)
                annotated = annotators['box'].annotate(annotated, detections)
                annotated = annotators['label_top'].annotate(annotated, detections, top_labels)
                annotated = annotators['label_bottom'].annotate(annotated, detections, bottom_labels)

                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        csvfile.close()
        count_csvfile.close()
        cv2.destroyAllWindows()
        print("[INFO] Tracking and counting completed successfully.")

# ---------- ENTRY POINT ---------- #
if __name__ == "__main__":
    main()


0: 384x640 9 cars, 1 truck, 242.0ms
Speed: 6.5ms preprocess, 242.0ms inference, 12.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 1 truck, 167.3ms
Speed: 4.5ms preprocess, 167.3ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 truck, 178.1ms
Speed: 4.5ms preprocess, 178.1ms inference, 2.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 162.6ms
Speed: 4.0ms preprocess, 162.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 153.5ms
Speed: 4.0ms preprocess, 153.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 164.4ms
Speed: 4.0ms preprocess, 164.4ms inference, 4.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 144.7ms
Speed: 6.0ms preprocess, 144.7ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 1 truck, 142.4ms
Speed: 2.8ms

In [None]:
import cv2
import numpy as np
from collections import defaultdict, Counter
from ultralytics import YOLO
import supervision as sv
import csv
import os

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(509, 203), (705, 189), (784, 700), (461, 690)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130
VELOCITY_THRESHOLD = 3.0  # Pixels per frame for stationary detection
FRAME_BUFFER = 10  # Number of frames to track position history

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    csvfile = open(filepath, mode='w', newline='')
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    return csvfile, writer

# ---------- MAIN PIPELINE ---------- #

def main():
    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 30

    model = YOLO(MODEL_PATH)
    tracker = sv.ByteTrack(frame_rate=video_info.fps, track_activation_threshold=0.3)
    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    # Annotators
    thickness = sv.calculate_optimal_line_thickness(video_info.resolution_wh)
    text_scale = sv.calculate_optimal_text_scale(video_info.resolution_wh)
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps * 2, position=sv.Position.BOTTOM_CENTER),
        'label_top': sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.TOP_LEFT),
        'label_bottom': sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.BOTTOM_CENTER)
    }

    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    tracker_types = {}
    position_history = defaultdict(list)  # Store recent positions for each track
    status_cache = {}
    compliance_set = set()
    stop_zone_history = {}
    counted_ids = set()
    vehicle_type_counter = Counter()

    csvfile, writer = initialize_csv(OUTPUT_CSV_PATH, ["tracker_id", "vehicle_type", "status", "compliance"])
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                for track_id, orig_pt, _, class_id in zip(
                    detections.tracker_id, anchor_pts, transformed_pts, detections.class_id
                ):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = "moving"
                    compliance = 0

                    # Update position history
                    position_history[track_id].append(orig_pt)
                    if len(position_history[track_id]) > FRAME_BUFFER:
                        position_history[track_id].pop(0)

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)

                        # Check velocity
                        if len(position_history[track_id]) >= FRAME_BUFFER:
                            displacements = [
                                np.linalg.norm(position_history[track_id][i] - position_history[track_id][i-1])
                                for i in range(1, len(position_history[track_id]))
                            ]
                            avg_velocity = np.mean(displacements) if displacements else 0
                            
                            if avg_velocity < VELOCITY_THRESHOLD:
                                status, compliance = "stationary", 1
                                compliance_set.add(track_id)

                        stop_zone_history[track_id] = {
                            "vehicle_type": vehicle_type,
                            "status": status,
                            "compliance": compliance
                        }
                    else:
                        position_history[track_id].clear()
                        if track_id not in compliance_set:
                            status = "moving"

                    if status_cache.get(track_id) != status:
                        status_cache[track_id] = status

                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update tracking status CSV
                csvfile.seek(0)
                csvfile.truncate()
                writer.writeheader()
                for tid, data in stop_zone_history.items():
                    writer.writerow({"tracker_id": tid, **data})
                csvfile.flush()

                # Update count CSV
                count_csvfile.seek(0)
                count_csvfile.truncate()
                count_writer.writeheader()
                for v_type, count in vehicle_type_counter.items():
                    count_writer.writerow({"vehicle_type": v_type, "count": count})
                count_csvfile.flush()

                # Padding labels
                top_labels += [""] * (len(detections) - len(top_labels))
                bottom_labels += [""] * (len(detections) - len(bottom_labels))

                # Annotate and display
                annotated = annotators['trace'].annotate(scene=frame.copy(), detections=detections)
                annotated = annotators['box'].annotate(annotated, detections)
                annotated = annotators['label_top'].annotate(annotated, detections, top_labels)
                annotated = annotators['label_bottom'].annotate(annotated, detections, bottom_labels)

                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        csvfile.close()
        count_csvfile.close()
        cv2.destroyAllWindows()
        print("[INFO] Tracking and counting completed successfully.")

# ---------- ENTRY POINT (Using) ---------- #
if __name__ == "__main__":
    main()
    

YOLO11n summary (fused): 238 layers, 2,616,248 parameters, 0 gradients, 6.5 GFLOPs


TypeError: cannot unpack non-iterable NoneType object

In [3]:
import cv2
import numpy as np
from collections import defaultdict, Counter, deque
from ultralytics import YOLO
import supervision as sv
import csv
import os
import time

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(509, 203), (705, 189), (784, 700), (461, 690)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130
VELOCITY_THRESHOLD = 3.0  # Base pixels per frame for stationary detection
FRAME_BUFFER = 20  # Increased for stable velocity estimates
EMA_ALPHA = 0.2  # Smoothing factor for exponential moving average
MAX_DISPLACEMENT = 50.0  # Cap for outlier displacements (pixels)
CONFIDENCE_THRESHOLD = 0.4  # Tighter detection confidence

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# Vehicle-type specific velocity thresholds (relative to base)
VELOCITY_THRESHOLD_MODIFIERS = {
    "car": 1.0,
    "motorcycle": 1.2,  # Allow slightly faster movement
    "bus": 0.8,        # Stricter for larger vehicles
    "truck": 0.8
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    with open(filepath, mode='w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

def read_csv_to_dict(filepath, fieldnames):
    csv_dict = {}
    try:
        with open(filepath, mode='r', newline='') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                csv_dict[row['tracker_id']] = row
    except FileNotFoundError:
        initialize_csv(filepath, fieldnames)
    return csv_dict

def write_csv_from_dict(filepath, csv_dict, fieldnames):
    with open(filepath, mode='w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for row in csv_dict.values():
            writer.writerow(row)

# ---------- MAIN PIPELINE ---------- #

def main():
    # Initialize performance counters
    frame_count = 0
    start_time = time.time()

    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 30

    # Load model with optimized settings
    model = YOLO(MODEL_PATH)
    model.fuse()  # Fuse conv and bn for faster inference

    # Initialize tracker with optimized parameters
    tracker = sv.ByteTrack(
        frame_rate=video_info.fps,
        track_activation_threshold=0.3,
        lost_track_buffer=30,
        minimum_matching_threshold=0.7
    )

    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    # Precompute annotators with reduced complexity
    thickness = sv.calculate_optimal_line_thickness(video_info.resolution_wh) // 2
    text_scale = sv.calculate_optimal_text_scale(video_info.resolution_wh) * 0.8
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(
            thickness=thickness,
            trace_length=video_info.fps,
            position=sv.Position.BOTTOM_CENTER
        ),
        'label_top': sv.LabelAnnotator(
            text_scale=text_scale,
            text_thickness=thickness,
            text_position=sv.Position.TOP_LEFT
        ),
        'label_bottom': sv.LabelAnnotator(
            text_scale=text_scale,
            text_thickness=thickness,
            text_position=sv.Position.BOTTOM_CENTER
        )
    }

    # Precompute zones
    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    # Tracking state
    tracker_types = {}
    position_history = defaultdict(lambda: deque(maxlen=FRAME_BUFFER))  # Efficient fixed-size buffer
    smoothed_velocity = defaultdict(lambda: 0.0)  # Smoothed velocity
    status_cache = {}
    compliance_set = set()
    counted_ids = set()
    vehicle_type_counter = Counter()
    entry_frames = {}  # Track entry frame for each vehicle

    # Initialize CSVs
    csv_fieldnames = ["tracker_id", "vehicle_type", "status", "compliance", "frame_number", "entry_frame"]
    initialize_csv(OUTPUT_CSV_PATH, csv_fieldnames)
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                frame_count += 1

                # Run detection with optimized parameters
                result = model(frame, imgsz=640, conf=CONFIDENCE_THRESHOLD, iou=0.5, verbose=False)[0]
                detections = sv.Detections.from_ultralytics(result)

                # Apply filters
                detections = detections[detections.confidence > CONFIDENCE_THRESHOLD]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                # Read current CSV state
                csv_dict = read_csv_to_dict(OUTPUT_CSV_PATH, csv_fieldnames)

                for track_id, orig_pt, _, class_id in zip(
                    detections.tracker_id, anchor_pts, transformed_pts, detections.class_id
                ):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = status_cache.get(track_id, "moving")  # Default to previous or moving
                    compliance = 0

                    # Update position history
                    position_history[track_id].append(orig_pt)

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)
                        if track_id not in entry_frames:
                            entry_frames[track_id] = frame_count

                        # Check velocity only with sufficient history
                        if len(position_history[track_id]) >= FRAME_BUFFER:
                            # Vectorized velocity calculation
                            pos_array = np.array(list(position_history[track_id]))
                            displacements = np.linalg.norm(
                                pos_array[1:] - pos_array[:-1], axis=1
                            )
                            displacements = np.minimum(displacements, MAX_DISPLACEMENT)  # Cap outliers
                            current_velocity = np.mean(displacements) if displacements.size > 0 else 0
                            # Apply EMA
                            smoothed_velocity[track_id] = (
                                EMA_ALPHA * current_velocity + (1 - EMA_ALPHA) * smoothed_velocity[track_id]
                            )

                            # Adjust threshold by vehicle type
                            threshold = VELOCITY_THRESHOLD * VELOCITY_THRESHOLD_MODIFIERS.get(vehicle_type, 1.0)
                            if smoothed_velocity[track_id] < threshold:
                                status, compliance = "stationary", 1
                                compliance_set.add(track_id)
                            else:
                                status = "moving"
                                if track_id in compliance_set:
                                    compliance_set.remove(track_id)
                    else:
                        position_history[track_id].clear()
                        smoothed_velocity[track_id] = 0.0
                        status = "moving"
                        if track_id in compliance_set:
                            compliance_set.remove(track_id)
                        if track_id in entry_frames:
                            del entry_frames[track_id]

                    # Update status cache and CSV if status changed
                    if status_cache.get(track_id) != status:
                        status_cache[track_id] = status

                        # Update or create CSV row
                        if track_id not in csv_dict:
                            csv_dict[track_id] = {
                                "tracker_id": track_id,
                                "vehicle_type": vehicle_type,
                                "status": status,
                                "compliance": str(compliance),
                                "frame_number": str(frame_count),
                                "entry_frame": str(entry_frames.get(track_id, frame_count))
                            }
                        else:
                            csv_dict[track_id].update({
                                "status": status,
                                "compliance": str(compliance),
                                "frame_number": str(frame_count)
                            })

                        # Write updated CSV
                        write_csv_from_dict(OUTPUT_CSV_PATH, csv_dict, csv_fieldnames)

                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update count CSV less frequently
                if frame_count % 5 == 0:
                    count_csvfile.seek(0)
                    count_csvfile.truncate()
                    count_writer.writeheader()
                    for v_type, count in vehicle_type_counter.items():
                        count_writer.writerow({"vehicle_type": v_type, "count": count})
                    count_csvfile.flush()

                # Annotate frame
                annotated = frame.copy()
                if len(detections) > 0:
                    annotated = annotators['trace'].annotate(scene=annotated, detections=detections)
                    annotated = annotators['box'].annotate(scene=annotated, detections=detections)
                    annotated = annotators['label_top'].annotate(scene=annotated, detections=detections, labels=top_labels)
                    annotated = annotators['label_bottom'].annotate(scene=annotated, detections=detections, labels=bottom_labels)

                # Draw zones
                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)

                # Display FPS
                fps = frame_count / (time.time() - start_time) if frame_count > 1 else 0
                cv2.putText(annotated, f"FPS: {fps:.1f}", (10, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

                # Write to output
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        count_csvfile.close()
        cv2.destroyAllWindows()
        print(f"[INFO] Processed {frame_count} frames in {time.time() - start_time:.2f} seconds")

# ---------- ENTRY POINT ---------- #
if __name__ == "__main__":
    main()

YOLO11n summary (fused): 238 layers, 2,616,248 parameters, 0 gradients, 6.5 GFLOPs


TypeError: cannot unpack non-iterable NoneType object

In [5]:
import cv2
import numpy as np
from collections import defaultdict, Counter, deque
from ultralytics import YOLO
import supervision as sv
import csv
import os
import time

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(509, 203), (705, 189), (784, 700), (461, 690)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130
VELOCITY_THRESHOLD = 3.0  # Base pixels per frame for stationary detection
FRAME_BUFFER = 20  # Increased for stable velocity estimates
EMA_ALPHA = 0.2  # Smoothing factor for exponential moving average
MAX_DISPLACEMENT = 50.0  # Cap for outlier displacements (pixels)
CONFIDENCE_THRESHOLD = 0.4  # Tighter detection confidence

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# Vehicle-type specific velocity thresholds (relative to base)
VELOCITY_THRESHOLD_MODIFIERS = {
    "car": 1.0,
    "motorcycle": 1.2,  # Allow slightly faster movement
    "bus": 0.8,        # Stricter for larger vehicles
    "truck": 0.8
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    with open(filepath, mode='w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

def read_csv_to_dict(filepath, fieldnames):
    csv_dict = {}
    try:
        with open(filepath, mode='r', newline='') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                csv_dict[row['tracker_id']] = row
    except FileNotFoundError:
        initialize_csv(filepath, fieldnames)
    return csv_dict

def write_csv_from_dict(filepath, csv_dict, fieldnames):
    with open(filepath, mode='w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for row in csv_dict.values():
            writer.writerow(row)

# ---------- MAIN PIPELINE ---------- #

def main():
    # Initialize performance counters
    frame_count = 0
    start_time = time.time()

    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 30

    # Load model with optimized settings
    model = YOLO(MODEL_PATH)
    model.fuse()  # Fuse conv and bn for faster inference

    # Initialize tracker with optimized parameters
    tracker = sv.ByteTrack(
        frame_rate=video_info.fps,
        track_activation_threshold=0.3,
        lost_track_buffer=30,
        minimum_matching_threshold=0.7
    )

    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    # Precompute annotators with reduced complexity
    thickness = sv.calculate_optimal_line_thickness(video_info.resolution_wh) // 2
    text_scale = sv.calculate_optimal_text_scale(video_info.resolution_wh) * 0.8
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(
            thickness=thickness,
            trace_length=video_info.fps,
            position=sv.Position.BOTTOM_CENTER
        ),
        'label_top': sv.LabelAnnotator(
            text_scale=text_scale,
            text_thickness=thickness,
            text_position=sv.Position.TOP_LEFT
        ),
        'label_bottom': sv.LabelAnnotator(
            text_scale=text_scale,
            text_thickness=thickness,
            text_position=sv.Position.BOTTOM_CENTER
        )
    }

    # Precompute zones
    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    # Tracking state
    tracker_types = {}
    position_history = defaultdict(lambda: deque(maxlen=FRAME_BUFFER))  # Efficient fixed-size buffer
    smoothed_velocity = defaultdict(lambda: 0.0)  # Smoothed velocity
    status_cache = {}
    compliance_set = set()
    counted_ids = set()
    vehicle_type_counter = Counter()
    entry_frames = {}  # Track entry frame for each vehicle

    # Initialize CSVs
    csv_fieldnames = ["tracker_id", "vehicle_type", "status", "compliance", "frame_number", "entry_frame"]
    initialize_csv(OUTPUT_CSV_PATH, csv_fieldnames)
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                frame_count += 1

                # Run detection with optimized parameters
                result = model(frame, imgsz=640, conf=CONFIDENCE_THRESHOLD, iou=0.5, verbose=False)[0]
                detections = sv.Detections.from_ultralytics(result)

                # Apply filters
                detections = detections[detections.confidence > CONFIDENCE_THRESHOLD]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                # Read current CSV state
                csv_dict = read_csv_to_dict(OUTPUT_CSV_PATH, csv_fieldnames)

                for track_id, orig_pt, _, class_id in zip(
                    detections.tracker_id, anchor_pts, transformed_pts, detections.class_id
                ):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = status_cache.get(track_id, "moving")  # Default to previous or moving
                    compliance = 0

                    # Update position history
                    position_history[track_id].append(orig_pt)

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)
                        if track_id not in entry_frames:
                            entry_frames[track_id] = frame_count

                        # Check velocity only with sufficient history
                        if len(position_history[track_id]) >= FRAME_BUFFER:
                            # Vectorized velocity calculation
                            pos_array = np.array(list(position_history[track_id]))
                            displacements = np.linalg.norm(
                                pos_array[1:] - pos_array[:-1], axis=1
                            )
                            displacements = np.minimum(displacements, MAX_DISPLACEMENT)  # Cap outliers
                            current_velocity = np.mean(displacements) if displacements.size > 0 else 0
                            # Apply EMA
                            smoothed_velocity[track_id] = (
                                EMA_ALPHA * current_velocity + (1 - EMA_ALPHA) * smoothed_velocity[track_id]
                            )

                            # Adjust threshold by vehicle type
                            threshold = VELOCITY_THRESHOLD * VELOCITY_THRESHOLD_MODIFIERS.get(vehicle_type, 1.0)
                            if smoothed_velocity[track_id] < threshold:
                                status, compliance = "stationary", 1
                                compliance_set.add(track_id)
                            else:
                                status = "moving"
                                if track_id in compliance_set:
                                    compliance_set.remove(track_id)
                    else:
                        position_history[track_id].clear()
                        smoothed_velocity[track_id] = 0.0
                        status = "moving"
                        if track_id in compliance_set:
                            compliance_set.remove(track_id)
                        if track_id in entry_frames:
                            del entry_frames[track_id]

                    # Update status cache and CSV if status changed
                    if status_cache.get(track_id) != status:
                        status_cache[track_id] = status

                        # Update or create CSV row
                        if track_id not in csv_dict:
                            csv_dict[track_id] = {
                                "tracker_id": track_id,
                                "vehicle_type": vehicle_type,
                                "status": status,
                                "compliance": str(compliance),
                                "frame_number": str(frame_count),
                                "entry_frame": str(entry_frames.get(track_id, frame_count))
                            }
                        else:
                            csv_dict[track_id].update({
                                "status": status,
                                "compliance": str(compliance),
                                "frame_number": str(frame_count)
                            })

                        # Write updated CSV
                        write_csv_from_dict(OUTPUT_CSV_PATH, csv_dict, csv_fieldnames)

                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update count CSV less frequently
                if frame_count % 5 == 0:
                    count_csvfile.seek(0)
                    count_csvfile.truncate()
                    count_writer.writeheader()
                    for v_type, count in vehicle_type_counter.items():
                        count_writer.writerow({"vehicle_type": v_type, "count": count})
                    count_csvfile.flush()

                # Annotate frame
                annotated = frame.copy()
                if len(detections) > 0:
                    annotated = annotators['trace'].annotate(scene=annotated, detections=detections)
                    annotated = annotators['box'].annotate(scene=annotated, detections=detections)
                    annotated = annotators['label_top'].annotate(scene=annotated, detections=detections, labels=top_labels)
                    annotated = annotators['label_bottom'].annotate(scene=annotated, detections=detections, labels=bottom_labels)

                # Draw zones
                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)

                # Display FPS
                fps = frame_count / (time.time() - start_time) if frame_count > 1 else 0
                cv2.putText(annotated, f"FPS: {fps:.1f}", (10, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

                # Write to output
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        count_csvfile.close()
        cv2.destroyAllWindows()
        print(f"[INFO] Processed {frame_count} frames in {time.time() - start_time:.2f} seconds")

# ---------- ENTRY POINT ---------- #
if __name__ == "__main__":
    main()

YOLO11n summary (fused): 238 layers, 2,616,248 parameters, 0 gradients, 6.5 GFLOPs


TypeError: cannot unpack non-iterable NoneType object

In [14]:
import cv2
import numpy as np
import time
from collections import defaultdict, Counter, deque
from ultralytics import YOLO
import supervision as sv
import csv
import os

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(509, 203), (705, 189), (784, 700), (461, 690)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130
VELOCITY_THRESHOLD = 3.0
FRAME_BUFFER = 20
CSV_UPDATE_INTERVAL = 5

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    csvfile = open(filepath, mode='w', newline='')
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    return csvfile, writer

# ---------- MAIN PIPELINE ---------- #

def main():
    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 30

    model = YOLO(MODEL_PATH)
    model.fuse()  # Optimized inference
    tracker = sv.ByteTrack(frame_rate=video_info.fps)
    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    thickness = 1
    text_scale = 0.4
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps * 2, position=sv.Position.BOTTOM_CENTER),
        'label_top': sv.LabelAnnotator(text_scale=text_scale, text_thickness=1, text_position=sv.Position.TOP_LEFT),
        'label_bottom': sv.LabelAnnotator(text_scale=text_scale, text_thickness=1, text_position=sv.Position.BOTTOM_CENTER)
    }

    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    tracker_types = {}
    position_history = defaultdict(lambda: deque(maxlen=FRAME_BUFFER))
    status_cache = {}
    compliance_set = set()
    stop_zone_history = {}
    counted_ids = set()
    vehicle_type_counter = Counter()

    csvfile, writer = initialize_csv(OUTPUT_CSV_PATH, ["tracker_id", "vehicle_type", "status", "compliance"])
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    frame_idx = 0
    start_time = time.time()
    prev_fps_time = start_time

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                frame_idx += 1
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                for track_id, orig_pt, _, class_id in zip(
                    detections.tracker_id, anchor_pts, transformed_pts, detections.class_id
                ):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = "moving"
                    compliance = 0

                    position_history[track_id].append(orig_pt)

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)

                        if len(position_history[track_id]) >= FRAME_BUFFER:
                            displacements = np.array([
                                np.linalg.norm(position_history[track_id][i] - position_history[track_id][i - 1])
                                for i in range(1, len(position_history[track_id]))
                            ])
                            weights = np.linspace(1, 2, len(displacements))
                            avg_velocity = np.average(displacements, weights=weights)

                            if avg_velocity < VELOCITY_THRESHOLD:
                                status, compliance = "stationary", 1
                                compliance_set.add(track_id)

                        stop_zone_history[track_id] = {
                            "vehicle_type": vehicle_type,
                            "status": status,
                            "compliance": compliance
                        }
                    else:
                        position_history[track_id].clear()
                        if track_id not in compliance_set:
                            status = "moving"

                    if status_cache.get(track_id) != status:
                        status_cache[track_id] = status

                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update tracking status CSV every few frames
                if frame_idx % CSV_UPDATE_INTERVAL == 0:
                    csvfile.seek(0)
                    csvfile.truncate()
                    writer.writeheader()
                    for tid, data in stop_zone_history.items():
                        writer.writerow({"tracker_id": tid, **data})
                    csvfile.flush()

                    count_csvfile.seek(0)
                    count_csvfile.truncate()
                    count_writer.writeheader()
                    for v_type, count in vehicle_type_counter.items():
                        count_writer.writerow({"vehicle_type": v_type, "count": count})
                    count_csvfile.flush()

                top_labels += [""] * (len(detections) - len(top_labels))
                bottom_labels += [""] * (len(detections) - len(bottom_labels))

                annotated = annotators['trace'].annotate(scene=frame.copy(), detections=detections)
                annotated = annotators['box'].annotate(annotated, detections)
                annotated = annotators['label_top'].annotate(annotated, detections, top_labels)
                annotated = annotators['label_bottom'].annotate(annotated, detections, bottom_labels)

                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)

                if frame_idx % 30 == 0:
                    now = time.time()
                    fps = 30 / (now - prev_fps_time)
                    prev_fps_time = now
                    print(f"[INFO] FPS: {fps:.2f}")

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        end_time = time.time()
        total_time = end_time - start_time
        avg_fps = frame_idx / total_time
        print(f"[INFO] Total Time: {total_time:.2f}s, Frames: {frame_idx}, Avg FPS: {avg_fps:.2f}")
        csvfile.close()
        count_csvfile.close()
        cv2.destroyAllWindows()
        print("[INFO] Tracking and counting completed successfully.")

# ---------- ENTRY POINT (Using *)---------- #
if __name__ == "__main__":
    main()


YOLO11n summary (fused): 238 layers, 2,616,248 parameters, 0 gradients, 6.5 GFLOPs

0: 384x640 9 cars, 1 truck, 100.6ms
Speed: 4.0ms preprocess, 100.6ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 1 truck, 112.5ms
Speed: 3.0ms preprocess, 112.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 truck, 98.4ms
Speed: 2.0ms preprocess, 98.4ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 74.5ms
Speed: 2.0ms preprocess, 74.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 91.9ms
Speed: 3.0ms preprocess, 91.9ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 104.3ms
Speed: 2.0ms preprocess, 104.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 111.5ms
Speed: 2.0ms preprocess, 111.5ms inference, 2.0ms postprocess per image

In [1]:
import cv2
import numpy as np
import time
from collections import defaultdict, Counter, deque
from ultralytics import YOLO
import supervision as sv
import csv
import os

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(509, 203), (705, 189), (784, 700), (461, 690)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130
VELOCITY_THRESHOLD = 3.0
FRAME_BUFFER = 20
CSV_UPDATE_INTERVAL = 5

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    csvfile = open(filepath, mode='w', newline='')
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    return csvfile, writer

# ---------- MAIN PIPELINE ---------- #

def main():
    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 30

    model = YOLO(MODEL_PATH)
    model.fuse()
    tracker = sv.ByteTrack(frame_rate=video_info.fps)
    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    thickness = 1
    text_scale = 0.4
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps * 2, position=sv.Position.BOTTOM_CENTER),
        'label_top': sv.LabelAnnotator(text_scale=text_scale, text_thickness=1, text_position=sv.Position.TOP_LEFT),
        'label_bottom': sv.LabelAnnotator(text_scale=text_scale, text_thickness=1, text_position=sv.Position.BOTTOM_CENTER)
    }

    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    tracker_types = {}
    position_history = defaultdict(lambda: deque(maxlen=FRAME_BUFFER))
    status_cache = {}
    compliance_set = set()
    stop_zone_history = {}
    counted_ids = set()
    vehicle_type_counter = Counter()
    entry_times = {}
    reaction_times = {}

    csvfile, writer = initialize_csv(OUTPUT_CSV_PATH, ["tracker_id", "vehicle_type", "status", "compliance", "reaction_time"])
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    frame_idx = 0
    start_time = time.time()
    prev_fps_time = start_time

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                frame_idx += 1
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                for track_id, orig_pt, _, class_id in zip(
                    detections.tracker_id, anchor_pts, transformed_pts, detections.class_id
                ):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = "moving"
                    compliance = 0

                    position_history[track_id].append(orig_pt)

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)

                        if track_id not in entry_times:
                            entry_times[track_id] = time.time()

                        if len(position_history[track_id]) >= FRAME_BUFFER:
                            displacements = np.array([
                                np.linalg.norm(position_history[track_id][i] - position_history[track_id][i - 1])
                                for i in range(1, len(position_history[track_id]))
                            ])
                            weights = np.linspace(1, 2, len(displacements))
                            avg_velocity = np.average(displacements, weights=weights)

                            if avg_velocity < VELOCITY_THRESHOLD:
                                status, compliance = "stationary", 1
                                compliance_set.add(track_id)

                                if track_id not in reaction_times:
                                    reaction_times[track_id] = round(time.time() - entry_times[track_id], 2)

                        stop_zone_history[track_id] = {
                            "vehicle_type": vehicle_type,
                            "status": status,
                            "compliance": compliance
                        }
                    else:
                        position_history[track_id].clear()
                        if track_id in entry_times and track_id not in reaction_times:
                            reaction_times[track_id] = None  # exited without stopping
                        if track_id not in compliance_set:
                            status = "moving"

                    if status_cache.get(track_id) != status:
                        status_cache[track_id] = status

                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update tracking status CSV every few frames
                if frame_idx % CSV_UPDATE_INTERVAL == 0:
                    csvfile.seek(0)
                    csvfile.truncate()
                    writer.writeheader()
                    for tid, data in stop_zone_history.items():
                        writer.writerow({
                            "tracker_id": tid,
                            **data,
                            "reaction_time": reaction_times.get(tid)
                        })
                    csvfile.flush()

                    count_csvfile.seek(0)
                    count_csvfile.truncate()
                    count_writer.writeheader()
                    for v_type, count in vehicle_type_counter.items():
                        count_writer.writerow({"vehicle_type": v_type, "count": count})
                    count_csvfile.flush()

                top_labels += [""] * (len(detections) - len(top_labels))
                bottom_labels += [""] * (len(detections) - len(bottom_labels))

                annotated = annotators['trace'].annotate(scene=frame.copy(), detections=detections)
                annotated = annotators['box'].annotate(annotated, detections)
                annotated = annotators['label_top'].annotate(annotated, detections, top_labels)
                annotated = annotators['label_bottom'].annotate(annotated, detections, bottom_labels)

                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)

                if frame_idx % 30 == 0:
                    now = time.time()
                    fps = 30 / (now - prev_fps_time)
                    prev_fps_time = now
                    print(f"[INFO] FPS: {fps:.2f}")

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        end_time = time.time()
        total_time = end_time - start_time
        avg_fps = frame_idx / total_time
        print(f"[INFO] Total Time: {total_time:.2f}s, Frames: {frame_idx}, Avg FPS: {avg_fps:.2f}")
        csvfile.close()
        count_csvfile.close()
        cv2.destroyAllWindows()
        print("[INFO] Tracking and counting completed successfully.")

# ---------- ENTRY POIN **T ---------- #
if __name__ == "__main__":
    main()


YOLO11n summary (fused): 238 layers, 2,616,248 parameters, 0 gradients, 6.5 GFLOPs

0: 384x640 9 cars, 1 truck, 167.2ms
Speed: 13.2ms preprocess, 167.2ms inference, 22.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 1 truck, 110.7ms
Speed: 11.3ms preprocess, 110.7ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 truck, 103.2ms
Speed: 3.5ms preprocess, 103.2ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 97.0ms
Speed: 2.1ms preprocess, 97.0ms inference, 7.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 96.2ms
Speed: 2.8ms preprocess, 96.2ms inference, 6.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 112.7ms
Speed: 2.1ms preprocess, 112.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 89.5ms
Speed: 4.3ms preprocess, 89.5ms inference, 2.0ms postprocess per im

In [None]:
import cv2
import numpy as np
import time
from collections import defaultdict, Counter, deque
from ultralytics import YOLO
import supervision as sv
import csv
import os

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(509, 203), (705, 189), (784, 700), (461, 690)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130
VELOCITY_THRESHOLD = 2.0
FRAME_BUFFER = 20
CSV_UPDATE_INTERVAL = 5

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    csvfile = open(filepath, mode='w', newline='')
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    return csvfile, writer

# ---------- MAIN PIPELINE ---------- #

def main():
    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 30

    model = YOLO(MODEL_PATH)
    model.fuse()
    tracker = sv.ByteTrack(frame_rate=video_info.fps)
    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    thickness = 1
    text_scale = 0.4
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps * 2, position=sv.Position.BOTTOM_CENTER),
        'label_top': sv.LabelAnnotator(text_scale=text_scale, text_thickness=1, text_position=sv.Position.TOP_LEFT),
        'label_bottom': sv.LabelAnnotator(text_scale=text_scale, text_thickness=1, text_position=sv.Position.BOTTOM_CENTER)
    }

    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    tracker_types = {}
    position_history = defaultdict(lambda: deque(maxlen=FRAME_BUFFER))
    status_cache = {}
    compliance_set = set()
    stop_zone_history = {}
    counted_ids = set()
    vehicle_type_counter = Counter()
    entry_times = {}
    reaction_times = {}
    csv_updates = {}  # Track how many times we've updated CSV for each ID

    csvfile, writer = initialize_csv(OUTPUT_CSV_PATH, ["tracker_id", "vehicle_type", "status", "compliance", "reaction_time"])
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    frame_idx = 0
    start_time = time.time()
    prev_fps_time = start_time

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                frame_idx += 1
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                for track_id, orig_pt, _, class_id in zip(
                    detections.tracker_id, anchor_pts, transformed_pts, detections.class_id
                ):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = "moving"
                    compliance = 0

                    position_history[track_id].append(orig_pt)

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)

                        if track_id not in entry_times:
                            entry_times[track_id] = time.time()
                            # First CSV update - entry into zone
                            stop_zone_history[track_id] = {
                                "vehicle_type": vehicle_type,
                                "status": "entered",
                                "compliance": 0,
                                "reaction_time": None
                            }
                            csv_updates[track_id] = 1  # Mark first update

                        if len(position_history[track_id]) >= FRAME_BUFFER:
                            displacements = np.array([
                                np.linalg.norm(position_history[track_id][i] - position_history[track_id][i - 1])
                                for i in range(1, len(position_history[track_id]))
                            ])
                            weights = np.linspace(1, 2, len(displacements))
                            avg_velocity = np.average(displacements, weights=weights)

                            if avg_velocity < VELOCITY_THRESHOLD:
                                status, compliance = "stationary", 1
                                compliance_set.add(track_id)

                                if track_id not in reaction_times:
                                    reaction_times[track_id] = round(time.time() - entry_times[track_id], 2)
                                    # Second CSV update - became stationary
                                    if csv_updates.get(track_id, 0) < 2:
                                        stop_zone_history[track_id] = {
                                            "vehicle_type": vehicle_type,
                                            "status": status,
                                            "compliance": compliance,
                                            "reaction_time": reaction_times[track_id]
                                        }
                                        csv_updates[track_id] = 2  # Mark second update
                    else:
                        position_history[track_id].clear()
                        if track_id in entry_times and track_id not in reaction_times:
                            reaction_times[track_id] = None  # exited without stopping

                    status_cache[track_id] = status
                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update tracking status CSV every few frames
                if frame_idx % CSV_UPDATE_INTERVAL == 0:
                    csvfile.seek(0)
                    csvfile.truncate()
                    writer.writeheader()
                    for tid, data in stop_zone_history.items():
                        writer.writerow({
                            "tracker_id": tid,
                            **data
                        })
                    csvfile.flush()

                    count_csvfile.seek(0)
                    count_csvfile.truncate()
                    count_writer.writeheader()
                    for v_type, count in vehicle_type_counter.items():
                        count_writer.writerow({"vehicle_type": v_type, "count": count})
                    count_csvfile.flush()

                top_labels += [""] * (len(detections) - len(top_labels))
                bottom_labels += [""] * (len(detections) - len(bottom_labels))

                annotated = annotators['trace'].annotate(scene=frame.copy(), detections=detections)
                annotated = annotators['box'].annotate(annotated, detections)
                annotated = annotators['label_top'].annotate(annotated, detections, top_labels)
                annotated = annotators['label_bottom'].annotate(annotated, detections, bottom_labels)

                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)

                if frame_idx % 30 == 0:
                    now = time.time()
                    fps = 30 / (now - prev_fps_time)
                    prev_fps_time = now
                    print(f"[INFO] FPS: {fps:.2f}")

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        end_time = time.time()
        total_time = end_time - start_time
        avg_fps = frame_idx / total_time
        print(f"[INFO] Total Time: {total_time:.2f}s, Frames: {frame_idx}, Avg FPS: {avg_fps:.2f}")
        csvfile.close()
        count_csvfile.close()
        cv2.destroyAllWindows()
        print("[INFO] Tracking and counting completed successfully.")

if __name__ == "__main__": #used
    main()

YOLO11n summary (fused): 238 layers, 2,616,248 parameters, 0 gradients, 6.5 GFLOPs

0: 384x640 9 cars, 1 truck, 80.9ms
Speed: 3.0ms preprocess, 80.9ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 1 truck, 117.9ms
Speed: 2.0ms preprocess, 117.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 truck, 107.5ms
Speed: 3.5ms preprocess, 107.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 88.0ms
Speed: 2.0ms preprocess, 88.0ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 truck, 80.7ms
Speed: 2.0ms preprocess, 80.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 83.1ms
Speed: 2.0ms preprocess, 83.1ms inference, 2.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 truck, 82.6ms
Speed: 2.5ms preprocess, 82.6ms inference, 1.0ms postprocess per image at 

In [None]:
import cv2
import numpy as np
from collections import defaultdict, Counter
from ultralytics import YOLO
import supervision as sv
import csv
import os

# ---------- CONFIGURATION ---------- #

VIDEO_PATH = './asset/videoplayback.mp4'
OUTPUT_VIDEO_PATH = './asset/TrackingWithStopResult.mp4'
OUTPUT_CSV_PATH = './asset/tracking_results.csv'
COUNT_CSV_PATH = './asset/vehicle_count.csv'
MODEL_PATH = 'yolo11n.pt'

SOURCE_POLYGON = np.array([
    (422, 10),   # Top-left
    (594, 16),   # Top-right
    (801, 665),  # Bottom-right
    (535, 649)   # Bottom-left
])

STOP_ZONE_POLYGON = np.array([(540, 307), (735, 310), (746, 557), (490, 555)])
TARGET_WIDTH, TARGET_HEIGHT = 50, 130

CLASS_NAMES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# ---------- CLASSES ---------- #

class ViewTransformer:
    def __init__(self, source: np.ndarray, target_size: tuple[int, int]):
        target = np.array([
            [0, 0],
            [target_size[0] - 1, 0],
            [target_size[0] - 1, target_size[1] - 1],
            [0, target_size[1] - 1]
        ], dtype=np.float32)
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target)

    def transform(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        return cv2.perspectiveTransform(points.reshape(-1, 1, 2).astype(np.float32), self.m).reshape(-1, 2)

# ---------- HELPERS ---------- #

def point_inside_polygon(point, polygon):
    return cv2.pointPolygonTest(polygon.astype(np.float32), tuple(map(float, point)), False) >= 0

def initialize_csv(filepath, fieldnames):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    csvfile = open(filepath, mode='w', newline='')
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    return csvfile, writer

# ---------- MAIN PIPELINE ---------- #

def main():
    video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)
    video_info.fps = 25

    model = YOLO(MODEL_PATH)
    tracker = sv.ByteTrack(frame_rate=video_info.fps, track_activation_threshold=0.3)
    frame_gen = sv.get_video_frames_generator(source_path=VIDEO_PATH)

    # Annotators
    thickness = sv.calculate_optimal_line_thickness(video_info.resolution_wh)
    text_scale = sv.calculate_optimal_text_scale(video_info.resolution_wh)
    annotators = {
        'box': sv.BoxAnnotator(thickness=thickness),
        'trace': sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps * 2, position=sv.Position.BOTTOM_CENTER),
        'label_top': sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.TOP_LEFT),
        'label_bottom': sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.BOTTOM_CENTER)
    }

    polygon_zone = sv.PolygonZone(polygon=SOURCE_POLYGON)
    stop_zone = sv.PolygonZone(polygon=STOP_ZONE_POLYGON)
    transformer = ViewTransformer(SOURCE_POLYGON, (TARGET_WIDTH, TARGET_HEIGHT))

    tracker_types = {}
    stopped_frames = defaultdict(int)
    status_cache = {}
    compliance_set = set()
    stop_zone_history = {}
    counted_ids = set()
    vehicle_type_counter = Counter()

    csvfile, writer = initialize_csv(OUTPUT_CSV_PATH, ["tracker_id", "vehicle_type", "status", "compliance"])
    count_csvfile, count_writer = initialize_csv(COUNT_CSV_PATH, ["vehicle_type", "count"])

    try:
        with sv.VideoSink(OUTPUT_VIDEO_PATH, video_info) as sink:
            for frame in frame_gen:
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                detections = detections[detections.confidence > 0.3]
                detections = detections[polygon_zone.trigger(detections)].with_nms(threshold=0.6)
                detections = tracker.update_with_detections(detections)

                anchor_pts = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
                transformed_pts = transformer.transform(anchor_pts).astype(int)

                top_labels, bottom_labels = [], []

                for track_id, orig_pt, _, class_id in zip(detections.tracker_id, anchor_pts, transformed_pts, detections.class_id):
                    vehicle_type = tracker_types.setdefault(track_id, CLASS_NAMES.get(class_id, "unknown"))
                    status = "moving"
                    compliance = 0

                    if point_inside_polygon(orig_pt, STOP_ZONE_POLYGON):
                        if track_id not in counted_ids:
                            vehicle_type_counter[vehicle_type] += 1
                            counted_ids.add(track_id)

                        stopped_frames[track_id] += 1

                        if stopped_frames[track_id] > video_info.fps * 2:
                            status, compliance = "stopped", 1
                            compliance_set.add(track_id)
                        elif stopped_frames[track_id] > video_info.fps * 1.3:
                            status, compliance = "slow down", 1

                        stop_zone_history[track_id] = {
                            "vehicle_type": vehicle_type,
                            "status": status,
                            "compliance": compliance
                        }
                    else:
                        stopped_frames[track_id] = 0
                        if track_id not in compliance_set:
                            status = "moving"

                    if status_cache.get(track_id) != status:
                        status_cache[track_id] = status

                    top_labels.append(f"{vehicle_type} {status}" if status != "moving" else vehicle_type)
                    bottom_labels.append(f"#{track_id}")

                # Update tracking status CSV
                csvfile.seek(0)
                csvfile.truncate()
                writer.writeheader()
                for tid, data in stop_zone_history.items():
                    writer.writerow({"tracker_id": tid, **data})
                csvfile.flush()

                # Update count CSV
                count_csvfile.seek(0)
                count_csvfile.truncate()
                count_writer.writeheader()
                for v_type, count in vehicle_type_counter.items():
                    count_writer.writerow({"vehicle_type": v_type, "count": count})
                count_csvfile.flush()

                # Padding labels
                top_labels += [""] * (len(detections) - len(top_labels))
                bottom_labels += [""] * (len(detections) - len(bottom_labels))

                # Annotate and display
                annotated = annotators['trace'].annotate(scene=frame.copy(), detections=detections)
                annotated = annotators['box'].annotate(annotated, detections)
                annotated = annotators['label_top'].annotate(annotated, detections, top_labels)
                annotated = annotators['label_bottom'].annotate(annotated, detections, bottom_labels)

                cv2.polylines(annotated, [STOP_ZONE_POLYGON], True, (0, 255, 255), 2)
                sink.write_frame(annotated)
                cv2.imshow("Tracking with Stop", annotated)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    except Exception as e:
        print(f"[ERROR] {e}")
    finally:
        csvfile.close()
        count_csvfile.close()
        cv2.destroyAllWindows()
        print("[INFO] Tracking and counting completed successfully.")

# ---------- ENTRY POINT ----------  (old)#
if __name__ == "__main__":
    main()
