In [2]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
import math
from sklearn.cluster import KMeans
model = YOLO('yolov8n.pt')

In [3]:
def get_vehicle_color(image, bbox):
    """
    Use KMeans clustering to determine the dominant color of a vehicle.
    """
    x1, y1, x2, y2 = [int(coord) for coord in bbox]
    vehicle_image = image[y1:y2, x1:x2]

    # Reshape image data for clustering
    data = vehicle_image.reshape(-1, 3)
    kmeans = KMeans(n_clusters=1, random_state=0).fit(data)
    dominant_color = kmeans.cluster_centers_[0]

    # Determine the color name based on the dominant RGB values
    color = "unknown"
    if dominant_color[2] > 100 and dominant_color[1] < 80 and dominant_color[0] < 80:
        color = "red"
    elif dominant_color[2] > 150 and dominant_color[1] > 150 and dominant_color[0] < 80:
        color = "yellow"
    elif dominant_color[2] < 80 and dominant_color[1] > 100 and dominant_color[0] < 80:
        color = "green"
    elif dominant_color[2] < 80 and dominant_color[1] < 80 and dominant_color[0] > 100:
        color = "blue"
    return color

In [4]:
def classify_vehicle_type(model, cls):
    """
    Classify the vehicle type based on the class ID.
    cls: class ID
    """
    class_name = model.names[int(cls)]
    if class_name in ['car', 'truck', 'bus', 'motorcycle']:
        return class_name
    return None

In [5]:
def check_in_out(center, line_in, line_out, vehicle_info, vehicle_id):
    """
    Check if the vehicle has crossed the in/out line.
    """
    if not vehicle_info[vehicle_id]["in"] and center[1] < line_in[1]:
        vehicle_info[vehicle_id]["in"] = True
    if not vehicle_info[vehicle_id]["out"] and center[1] > line_out[1]:
        vehicle_info[vehicle_id]["out"] = True

In [6]:
def calculate_speed(trajectory, fps, distance_per_pixel):
    """
    Calculate the vehicle speed based on the trajectory.
    trajectory: a list of points representing the vehicle trajectory
    fps: frame per second
    distance_per_pixel: the distance represented by each pixel
    """
    if len(trajectory) > 2:
        p1 = np.array(trajectory[-2])
        p2 = np.array(trajectory[-1])
        distance = np.linalg.norm(p2 - p1) * distance_per_pixel
        speed = distance * fps
        return speed
    return 0.0

In [7]:
def main(video_path, model, output_path='output.mp4'):
    # Read video file
    cap = cv2.VideoCapture(video_path)

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Define the codec and create VideoWriter object for output
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Specify the codec
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Dictionary to store information about vehicle color, type, ID, trajectory, and speed
    vehicle_info = defaultdict(lambda: {"color": None, "type": None, "trajectory": [], "in": False, "out": False, "speed": 0.0})

    # Define the position of in and out lines (x, y, x2, y2)
    line_in = (100, 200, 500, 200)  # Replace with desired positions
    line_out = (100, 300, 500, 300)

    # Unit conversion for speed calculation (adjust based on frame rate and actual scene size)
    distance_per_pixel = 0.05  # Example: each pixel represents 0.05 meters
    max_distance = 50  # Maximum distance for associating objects between frames

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Use YOLOv8 for vehicle detection
        results = model(frame)

        detected_centers = []
        current_frame_info = []

        for result in results:
            for bbox, confidence, cls in zip(result.boxes.xyxy, result.boxes.conf, result.boxes.cls):
                class_name = classify_vehicle_type(model, cls)

                if class_name:
                    # Calculate the center of the bounding box
                    bbox = bbox.cpu().numpy()
                    center = (int((bbox[0] + bbox[2]) / 2), int((bbox[1] + bbox[3]) / 2))
                    detected_centers.append(center)

                    # Find existing vehicle or create a new one based on distance
                    matched_id = None
                    for vehicle_id, info in vehicle_info.items():
                        if info["trajectory"]:
                            last_center = info["trajectory"][-1]
                            distance = np.linalg.norm(np.array(center) - np.array(last_center))
                            if distance < max_distance:
                                matched_id = vehicle_id
                                break

                    if matched_id is None:
                        # Create a new vehicle ID if no match found
                        matched_id = len(vehicle_info)

                    vehicle_info[matched_id]["trajectory"].append(center)

                    # Get vehicle color (record only once)
                    if vehicle_info[matched_id]["color"] is None:
                        vehicle_info[matched_id]["color"] = get_vehicle_color(frame, bbox)

                    # Get vehicle type (record only once)
                    if vehicle_info[matched_id]["type"] is None:
                        vehicle_info[matched_id]["type"] = class_name

                    # Store the current frame info for visualization
                    current_frame_info.append((bbox, matched_id, vehicle_info[matched_id]["color"], class_name))

                    # Check in and out lines
                    check_in_out(center, line_in, line_out, vehicle_info, matched_id)

        # Visualization of detection results and in/out lines
        for bbox, vehicle_id, color, class_name in current_frame_info:
            # Draw bounding box
            x1, y1, x2, y2 = [int(coord) for coord in bbox]
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # Display vehicle info
            trajectory = vehicle_info[vehicle_id]["trajectory"]
            speed = calculate_speed(trajectory, fps, distance_per_pixel)
            vehicle_info[vehicle_id]["speed"] = speed
            cv2.putText(frame, f"{color}, {class_name}, Speed: {speed:.2f} m/s", (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

        # Draw in and out lines
        cv2.line(frame, (line_in[0], line_in[1]), (line_in[2], line_in[3]), (0, 0, 255), 2)
        cv2.line(frame, (line_out[0], line_out[1]), (line_out[2], line_out[3]), (255, 0, 0), 2)

        # Write the processed frame to the output video
        out.write(frame)

    # Release video capture and writer
    cap.release()
    out.release()

    # Count the number of vehicles by color and type
    color_count = defaultdict(int)
    type_count = defaultdict(int)
    for info in vehicle_info.values():
        if info["color"]:
            color_count[info["color"]] += 1
        if info["type"]:
            type_count[info["type"]] += 1

    print("Vehicle Color Statistics:")
    for color, count in color_count.items():
        print(f"{color}: {count}")

    print("\nVehicle Type Statistics:")
    for v_type, count in type_count.items():
        print(f"{v_type}: {count}")

    print(f"\nProcessed video saved to: {output_path}")

In [8]:
# Run the main function with your video path
file_name = "I94-US20-35.1.mp4"
video_path = f"../data/{file_name}"

output_path = f"../demo/{file_name}_output.mp4"
main(video_path, model, output_path)



0: 384x640 2 cars, 1 bench, 49.2ms
Speed: 0.0ms preprocess, 49.2ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 1 truck, 46.5ms
Speed: 0.0ms preprocess, 46.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 1 truck, 1 bench, 29.7ms
Speed: 12.2ms preprocess, 29.7ms inference, 15.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 54.0ms
Speed: 15.7ms preprocess, 54.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 45.7ms
Speed: 2.2ms preprocess, 45.7ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 39.9ms
Speed: 0.0ms preprocess, 39.9ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 40.3ms
Speed: 0.0ms preprocess, 40.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 46.8ms
Speed: 0.0ms preprocess, 46.8ms inference, 0.0ms postprocess per i

In [10]:
from IPython.core.display import Video
Video(output_path)