In [1]:

def analyze_video(filename,modelname,iou,conf,imsz):
    import sys
    import ultralytics
    import supervision as sv
    from ultralytics import YOLO
    import numpy as np
    import cv2
    import cvzone
    import vehicle_count
    import json

    import torch
    from torchvision import models, transforms
    from PIL import Image, ImageDraw, ImageFont
    import matplotlib.pyplot as plt


    # print("\n=======================================================================================")
    # print(f"Intiating traffic analysis for filename: {filename}")
    HOME = "/Users/matthew/Jupyter/Thesis"
    FILE_NAME = filename
    MODEL_NAME = modelname #"yolov9e"
    IOU=iou #0.8
    CONF=conf #0.3
    SOURCE_VIDEO_PATH = f"{HOME}/Dataset/{FILE_NAME}.MOV"
    TARGET_VIDEO_PATH = f"{HOME}/Outputs/FRCNN/{FILE_NAME}-iou0.5-final.mp4"
    TARGET_COUNT_PATH = f"{HOME}/Outputs/FRCNN/{FILE_NAME}-iou0.5-final.txt"
    vehicle_count.FILE_NAME = TARGET_COUNT_PATH

    # SETTINGS
    JSON_PATH = f"{HOME}/Dataset/Ground-Truth/{FILE_NAME}.json"
    with open(JSON_PATH) as json_file:
        json = json.load(json_file)
        lanes = json["lanes"]
        linezone = json["linezone"]
    vehicle_count.reset(TARGET_COUNT_PATH,json["vehicle_counts"])

    print("\n=======================================================================================")
    print(f"Intiating traffic analysis for {FILE_NAME}-{MODEL_NAME}-{imsz}-iou{IOU}-con{CONF}")

    # LINE_START_NORTH = sv.Point(50, 1200)
    # LINE_END_NORTH = sv.Point(1900, 1200)
    # LINE_START_SOUTH = sv.Point(1900, 1200)
    # LINE_END_SOUTH = sv.Point(3840-50, 1200)
    LINE_START_NORTH = sv.Point(10, 400)
    LINE_END_NORTH = sv.Point(633, 400)
    LINE_START_SOUTH = sv.Point(634, 400)
    LINE_END_SOUTH = sv.Point(1280-10, 400)
    sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

    selected_classes = [3, 4, 6, 8]
    CLASSES = [
        'N/A', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
        'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A',
        'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse',
        'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack',
        'umbrella', 'N/A', 'N/A', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis',
        'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove',
        'skateboard', 'surfboard', 'tennis racket', 'bottle', 'N/A', 'wine glass',
        'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich',
        'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake',
        'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A',
        'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
        'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A',
        'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
        'toothbrush', 'empty'
    ]

    # VIDEO PROCESSING
    # create BYTETracker instance
    sv.ByteTrack()
    byte_tracker = sv.ByteTrack(track_activation_threshold=0.25, lost_track_buffer=30, minimum_matching_threshold=0.8, frame_rate=60)
    # byte_tracker = sv.ByteTrack()
    byte_tracker.reset()
    # create VideoInfo instance
    video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
    # create frame generator
    generator = sv.get_video_frames_generator(SOURCE_VIDEO_PATH)
    # create LineZone instance, it is previously called LineCounter class
    line_zone_north = sv.LineZone(start=LINE_START_NORTH, end=LINE_END_NORTH, triggering_anchors=[sv.Position.BOTTOM_LEFT,sv.Position.BOTTOM_RIGHT,sv.Position.BOTTOM_CENTER])
    line_zone_south = sv.LineZone(start=LINE_START_SOUTH, end=LINE_END_SOUTH, triggering_anchors=[sv.Position.BOTTOM_LEFT,sv.Position.BOTTOM_RIGHT,sv.Position.BOTTOM_CENTER])
    # create instance of BoxAnnotator
    # box_annotator = sv.BoxAnnotator(thickness=4, text_thickness=4, text_scale=2)
    box_annotator = sv.BoundingBoxAnnotator()
    label_annotator = sv.LabelAnnotator(text_scale=0.5, text_padding=1, text_position=sv.Position.TOP_CENTER)
    # create instance of TraceAnnotator
    trace_annotator = sv.TraceAnnotator()
    # create LineZoneAnnotator instance, it is previously called LineCounterAnnotator class
    line_zone_annotator_north = sv.LineZoneAnnotator(text_scale=0.5, text_thickness=1, text_padding=2, display_out_count=False, custom_in_text="North")
    line_zone_annotator_south = sv.LineZoneAnnotator(text_scale=0.5, text_thickness=1, text_padding=2, display_in_count=False, custom_out_text="South")
    totalCountUp = []
    totalCountDown= []
    # define call back function to be used in video processing
    def callback(frame: np.ndarray, index:int) -> np.ndarray:

        # Load the pretrained Faster R-CNN model
        model = models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
        model.eval()
        models.detection.fasterrcnn_resnet50_fpn()

        # Transformation for the input image
        transform = transforms.Compose([
            transforms.ToTensor(),
        ])

        # Convert the frame to a PIL image and process it
        pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        image_tensor = transform(pil_image).unsqueeze(0)  # Add batch dimension

        # Perform detection
        with torch.no_grad():
            predictions = model(image_tensor)

        # print("################")
        # print(predictions)
        # print("################")
        detections = sv.Detections.from_transformers(predictions[0]).with_nms(threshold=0.5)
        detections = detections[np.isin(detections.class_id, selected_classes)]
        detections = byte_tracker.update_with_detections(detections)
        labels = [
            f"#{tracker_id} {CLASSES[class_id]} {confidence:0.2f}"
            # f"#{tracker_id} {class_id} {confidence:0.2f}"
            for confidence, class_id, tracker_id
            in zip(detections.confidence, detections.class_id, detections.tracker_id)
        ]
        annotated_frame = trace_annotator.annotate(
            scene=frame.copy(),
            detections=detections
        )
        annotated_frame = box_annotator.annotate(
            scene=annotated_frame, detections=detections
        )
        annotated_frame = label_annotator.annotate(
            scene=annotated_frame, detections=detections, labels=labels
        )
        # update line counter
        crossed_north, _ = line_zone_north.trigger(detections)
        _, crossed_south = line_zone_south.trigger(detections)

        if np.any(crossed_north):
            detections_crossed_north = detections[crossed_north]
            for detection in (detections_crossed_north):
                xyxy, _, conf, cls_id, tracker_id, cls_name = detection
                cls_name = CLASSES[cls_id]
                x1, y1, x2, y2 = xyxy
                x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
                w, h = x2 - x1, y2 - y1
                cx, cy = x1 + w // 2, y1 + h // 2
                if lanes["n1_start"][0] < cx < lanes["n1_end"][0] :
                    lane_id = "N1"
                elif lanes["n2_start"][0] < cx < lanes["n2_end"][0] :
                    lane_id = "N2"
                elif lanes["n3_start"][0] < cx < lanes["n3_end"][0] :
                    lane_id = "N3"
                else:
                    lane_id = "NX"
                is_count_success = vehicle_count.update(annotated_frame, index, x1, y1, h, w, tracker_id, cls_name, conf, lane_id)
                # if is_count_success != False:
                #     # print(f'id:{tracker_id} cls:{cls_name} lane: {lane_id}')
                #     crop = sv.crop_image(frame, xyxy)
                #     rs_image = sv.resize_image(
                #         image=crop, resolution_wh=(150, 150), keep_aspect_ratio=True
                #     )
                #     cv2.imwrite(f"{HOME}/Outputs/Loop/Nov04/detection-images/{filename}-{tracker_id}.jpg", rs_image)

        if np.any(crossed_south):
            detections_crossed_south = detections[crossed_south]
            for detection in (detections_crossed_south):
                xyxy, _, conf, cls_id, tracker_id, cls_name = detection
                cls_name = CLASSES[cls_id]
                x1, y1, x2, y2 = xyxy
                x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
                w, h = x2 - x1, y2 - y1
                cx, cy = x1 + w // 2, y1 + h // 2
                if lanes["s1_start"][0] < cx < lanes["s1_end"][0] :
                    lane_id = "S1"
                elif lanes["s2_start"][0] < cx < lanes["s2_end"][0] :
                    lane_id = "S2"
                elif lanes["s3_start"][0] < cx < lanes["s3_end"][0] :
                    lane_id = "S3"
                else:
                    lane_id = "SX"
                is_count_success = vehicle_count.update(annotated_frame, index, x1, y1, h, w, tracker_id, cls_name, conf, lane_id)
                # if is_count_success != False:
                #     # print(f'id:{tracker_id} cls:{cls_name} lane: {lane_id}')
                #     crop = sv.crop_image(frame, xyxy)
                #     rs_image = sv.resize_image(
                #         image=crop, resolution_wh=(150, 150), keep_aspect_ratio=True
                #     )
                #     cv2.imwrite(f"{HOME}/Outputs/Loop/Nov04/detection-images/{filename}-{tracker_id}.jpg", rs_image)
        
        # # SAVE DETECTION IMAGE 
        # if is_count_success != False:
        #     print(f'id:{tracker_id} cls:{cls_name} lane: {lane_id}')
        #     crop = sv.crop_image(frame, [x1, y1, x2, y2])
        #     rs_image = sv.resize_image(
        #         image=frame, resolution_wh=(150, 150), keep_aspect_ratio=True
        #     )
        #     cv2.imwrite(f"{HOME}/Outputs/Loop/Nov04/detection-images/{filename}-{tracker_id}.jpg", rs_image)

        annotated_frame=line_zone_annotator_north.annotate(annotated_frame, line_counter=line_zone_north)
        annotated_frame=line_zone_annotator_south.annotate(annotated_frame, line_counter=line_zone_south)

        sv.draw_line(annotated_frame,start=sv.Point(lanes["n1_start"][0],lanes["n1_start"][1]),end=sv.Point(lanes["n1_end"][0],lanes["n1_end"][1]), color=sv.Color.BLUE)
        sv.draw_line(annotated_frame,start=sv.Point(lanes["n2_start"][0],lanes["n2_start"][1]),end=sv.Point(lanes["n2_end"][0],lanes["n2_end"][1]), color=sv.Color.RED)
        sv.draw_line(annotated_frame,start=sv.Point(lanes["n3_start"][0],lanes["n3_start"][1]),end=sv.Point(lanes["n3_end"][0],lanes["n3_end"][1]), color=sv.Color.BLUE)

        sv.draw_line(annotated_frame,start=sv.Point(lanes["s1_start"][0],lanes["s1_start"][1]),end=sv.Point(lanes["s1_end"][0],lanes["s1_end"][1]), color=sv.Color.BLUE)
        sv.draw_line(annotated_frame,start=sv.Point(lanes["s2_start"][0],lanes["s2_start"][1]),end=sv.Point(lanes["s2_end"][0],lanes["s2_end"][1]), color=sv.Color.RED)
        sv.draw_line(annotated_frame,start=sv.Point(lanes["s3_start"][0],lanes["s3_start"][1]),end=sv.Point(lanes["s3_end"][0],lanes["s3_end"][1]), color=sv.Color.BLUE)

        vehicle_count.progressbar(index,video_info.total_frames)

        # sv.plot_image(annotated_frame, (16,16))
        # cv2.imshow("Processing Video", annotated_frame)
        return annotated_frame

    # process the whole video
    sv.process_video(
        source_path = SOURCE_VIDEO_PATH,
        target_path = TARGET_VIDEO_PATH,
        callback=callback
    )

    vehicle_count.end()

#  Run multiple files in loop and record output
if __name__ == "__main__":
    models = [
        "FRCNN"
    ]
    ious = [0.5]
    cons = [0.4]
    # [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
    imszs = [
        # 480,
        # 640,
        # 960,
        1280
        ]
    video_files = [
        "DS1",
        "DS2",
        "DC1", 
        "DC2",
        "DR1",
        "DR2",
        "HR1",
        "HR2",
        "EC1", 
        "FG1",
        "NC1",
        "NC2",

        # "TEST",
        # "DS4",
        # "DS3",
        ]
    for video_file in video_files:
        for model in models:
            for imsz in imszs:
                for iou in ious:
                    for conf in cons:
                        analyze_video(video_file, model,iou,conf,imsz)





Intiating traffic analysis for DS1-FRCNN-1280-iou0.5-con0.4




 (2888/2889)	N: 184	S: 126	99.97%	▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░
Intiating traffic analysis for DS2-FRCNN-1280-iou0.5-con0.4
 (2890/2891)	N: 195	S: 132	99.97%	▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░
Intiating traffic analysis for DC1-FRCNN-1280-iou0.5-con0.4
 (2896/2897)	N: 140	S: 94	99.97%	▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░
Intiating traffic analysis for DC2-FRCNN-1280-iou0.5-con0.4
 (2886/2887)	N: 157	S: 125	99.97%	▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░
Intiating traffic analysis for DR1-FRCNN-1280-iou0.5-con0.4
 (2957/2958)	N: 136	S: 131	99.97%	▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░
Intiating traffic analysis for DR2-FRCNN-1280-iou0.5-con0.4
 (2954/2955)	N: 127	S: 135	99.97%	▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░
Intiating traffic analysis for HR1-FRCNN-1280-iou0.5-con0.4
 (3135/3136)	N: 233	S: 143	99.97%	▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░
Intiating traffic analysis for HR2-FRCNN-1280-