In [1]:
import json
import csv
from collections import defaultdict
import cv2
import numpy as np
from ultralytics import YOLO
import torch
import ast

In [2]:
# Load the YOLO model
model1 = YOLO("Models/zm.pt")
device = 'mps' if torch.backends.mps.is_available() else 'cuda' if torch.cuda.is_available() else 'cpu'
model1.to(device)

In [3]:
def convert_defaultdict_to_dict(d):
    if isinstance(d, defaultdict):
        d = {k.replace("-", " "): convert_defaultdict_to_dict(v) for k, v in d.items()}
    elif isinstance(d, dict):
        d = {k.replace("-", " "): convert_defaultdict_to_dict(v) for k, v in d.items()}
    elif isinstance(d, list):
        d = [convert_defaultdict_to_dict(i) for i in d]
    return d 

In [4]:
def detect_track(video_path, regions_param, turning_pat_param):
    
    cap = cv2.VideoCapture(video_path)
    regions = {key: ast.literal_eval(value) for key, value in regions_param.items()}
    patterns = {ast.literal_eval(key): value for key, value in turning_pat_param.items()}

    class_names_model1 = {0: 'Bicycle', 1: 'Bus', 2: 'Car', 3: 'LCV', 4: 'Three-Wheeler', 5: 'Truck', 6: 'Two-Wheeler'}

    class_colors = {
        'Bicycle': (255, 0, 0),
        'Bus': (0, 255, 0),
        'Car': (0, 0, 255),
        'LCV': (255, 255, 0),
        'Three-Wheeler': (255, 0, 255),
        'Truck': (0, 255, 255),
        'Two-Wheeler': (128, 0, 128)
    }

    track_history = defaultdict(lambda: [])
    entry_count = defaultdict(lambda: defaultdict(int))
    vehicle_regions = defaultdict(lambda: defaultdict(int))
    turning_points = defaultdict(lambda: defaultdict(list))
    tracking_data = defaultdict(lambda: {"class_name": None, "start_region": None, "end_region": None})
    last_region = defaultdict(lambda: None)

    def point_in_region(point, region):
        x, y = point
        (x1, y1), (x2, y2) = region

        # Ensure that the coordinates are correctly ordered
        x_min, x_max = min(x1, x2), max(x1, x2)
        y_min, y_max = min(y1, y2), max(y1, y2)

        return x_min <= x <= x_max and y_min <= y <= y_max

    def update_region_counts(track_id, class_name, tracking_trail):
        for center_point in tracking_trail:
            current_region = None
            for region_name, region_coords in regions.items():
                if point_in_region(center_point, region_coords):
                    current_region = region_name
                    if track_id not in vehicle_regions[current_region]:
                        vehicle_regions[current_region][track_id] = {'entry': True}
                        entry_count[current_region][class_name] += 1
                        if not tracking_data[track_id]["start_region"]:
                            tracking_data[track_id]["start_region"] = current_region
                        last_region[track_id] = current_region
                    else:
                        if vehicle_regions[current_region][track_id]['entry']:
                            vehicle_regions[current_region][track_id]['entry'] = False
        tracking_data[track_id]["end_region"] = last_region[track_id]

    class_id_counters = defaultdict(int)

    def generate_class_id(class_name):
        class_id_counters[class_name] += 1
        prefix_map = {
            'Car': 'C',
            'LCV': 'L',
            'Truck': 'T',
            'Three-Wheeler': 'Tr',
            'Two-Wheeler': 'Tw',
            'Bus': 'Bs',
            'Bicycle': 'Bi'
        }
        prefix = prefix_map.get(class_name, 'V')
        return f"{prefix}{class_id_counters[class_name]}"

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    csv_file = open('tracking_data.csv', 'w', newline='')
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(['Track ID', 'Class ID', 'Class Name', 'Start Region', 'End Region'])

    while cap.isOpened():
        success, frame = cap.read()
        if success:
            results1 = model1.track(frame, persist=True, verbose=False, classes=[0, 1, 2, 3, 4, 5, 6], conf=0.7)

            detections = []

            if results1:
                for result in results1:
                    if result is not None:
                        boxes = result.boxes.xywh.cpu()
                        try:
                            ids = result.boxes.id.int().cpu().tolist()
                        except AttributeError:
                            ids = [None] * len(boxes)
                        classes = result.boxes.cls.int().cpu().tolist()
                        for box, track_id, cls in zip(boxes, ids, classes):
                            detections.append((box, track_id, cls))

            annotated_frame = frame.copy()
            for box, track_id, cls in detections:
                x, y, w, h = box
                class_name = class_names_model1.get(cls, 'Unknown')
                color = class_colors.get(class_name, (255, 255, 255))

                track = track_history[track_id]
                track.append((float(x + w / 2), float(y + h / 2)))
                if len(track) > 180:
                    track.pop(0)

                if not tracking_data[track_id]["class_name"]:
                    tracking_data[track_id]["class_name"] = class_name
                    tracking_data[track_id]["class_id"] = generate_class_id(class_name)

                update_region_counts(track_id, class_name, track)

                csv_writer.writerow([track_id, tracking_data[track_id]["class_id"], class_name, tracking_data[track_id]["start_region"], tracking_data[track_id]["end_region"]])

                for i in range(1, len(track)):
                    cv2.line(annotated_frame, tuple(map(int, track[i - 1])), tuple(map(int, track[i])), color, 2)

                cv2.rectangle(annotated_frame, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), color, 2)
                cv2.putText(annotated_frame, f'{tracking_data[track_id]["class_id"]}', (int(x - w / 2), int(y - h / 2) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

            cv2.imshow("Tracking", annotated_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        else:
            break

    cap.release()
    csv_file.close()
    cv2.destroyAllWindows()

    json_data = convert_defaultdict_to_dict(tracking_data)
    with open('tracking_data.json', 'w') as json_file:
        json.dump(json_data, json_file, indent=4)

    return json_data
    