In [1]:
%matplotlib inline
import supervision as sv
from matplotlib import pyplot as plt
from tqdm.notebook import tqdm
from ultralytics import YOLO
import plotly.express as px
import pandas as pd
import numpy as np
import shapely
import torch
import cv2
import os

In [2]:
HOME = os.getcwd()
print(f"{HOME = }")
print(f"{torch.cuda.is_available() = }")

HOME = 'c:\\Repos\\inz'
torch.cuda.is_available() = True


In [3]:
model = YOLO('yolov8n.pt')  # Load a pretrained YOLO model https://docs.ultralytics.com/usage/python/

In [4]:
video_path = r'C:\Repos\inz\src_vids\test_lagrange_23.mkv'
target_path = r'C:\Repos\inz\runs\test\test_polygon_zone_lagrange_23.mkv'

In [5]:
classes = {1:'bicycle', 2:'car', 3:'motorcycle', 5:'bus', 7:'truck'}

In [6]:
video_info = sv.VideoInfo.from_video_path(video_path)
video_info

VideoInfo(width=1920, height=1080, fps=30, total_frames=717)

# Draw polygon zones

In [11]:
def get_color(cmap="Pastel1", n_colors=9):
    """Color generator. Loops colors if end is reached.

    Args:

        cmap (str, optional): Cmap name. cmap from matplotlib.pyplot.get_cmap(cmap, n_colors). Defaults to 'Pastel1'.

        n_colors (int, optional): Number of colors. n_colors from matplotlib.pyplot.get_cmap(cmap, n_colors). Defaults to 9.

    Yields:

        list: List of r, g, b values. Eg.: (255, 255, 255)
    """
    cmap = plt.get_cmap(cmap, n_colors)
    all_colors = []
    for i in range(n_colors):
        all_colors.append([c * 255 for c in cmap(0 + (i / n_colors))])
    n = 0
    while True:
        yield all_colors[n % n_colors][:-1]
        n += 1


cap = cv2.VideoCapture(video_path)
status, first_frame = cap.read()
cap.release()
good_image = first_frame.copy()
color_gen = get_color()
color = next(color_gen)
zones = dict()
curr_zone_id = 0


def draw_zones(event, x, y, flags, param):
    global zones, curr_zone_id, good_image, first_frame, color
    # Add next point to current polygon zone
    if flags == cv2.EVENT_FLAG_CTRLKEY + cv2.EVENT_FLAG_LBUTTON:
        first_frame = good_image.copy()
        if curr_zone_id in zones:
            zones[curr_zone_id].append((x, y))
        else:
            zones[curr_zone_id] = [(x, y)]
        try:
            cv2.polylines(first_frame, np.array([zones[curr_zone_id]]), True, color, 5)
        except Exception as e:
            print(e)
    # Delete last added point from current polygon zone
    elif flags == cv2.EVENT_FLAG_CTRLKEY + cv2.EVENT_FLAG_RBUTTON:
        first_frame = good_image.copy()
        try:
            zones[curr_zone_id].pop(-1)
        except IndexError:
            zones.pop(curr_zone_id, None)
        except KeyError:
            pass
        try:
            cv2.polylines(first_frame, np.array([zones[curr_zone_id]]), True, color, 5)
        except Exception as e:
            print(e)
    # Save current polygon zone and move to next one
    elif flags == cv2.EVENT_FLAG_MBUTTON:
        good_image = first_frame
        curr_zone_id += 1
        color = next(color_gen)
    cv2.imshow("drawing polygon zones", first_frame)


cv2.namedWindow("drawing polygon zones")
cv2.setMouseCallback("drawing polygon zones", draw_zones)
while True:
    cv2.imshow("drawing polygon zones", first_frame)
    if cv2.waitKey(10) == 27:
        break
cv2.destroyAllWindows()

In [12]:
zones

{0: [(572, 679), (960, 516), (251, 453)],
 1: [(187, 978), (189, 978), (819, 748), (820, 748), (954, 1000), (953, 1000)],
 2: [(1791, 776),
  (1790, 776),
  (1789, 775),
  (1788, 774),
  (1675, 685),
  (1815, 596)],
 3: [(1323, 482), (1322, 481), (1265, 305), (1724, 318), (1745, 527)]}

In [None]:
# video_info = VideoInfo.from_video_path(video_path)
# print("TUTAJ: ", video_info)
# cap = cv2.VideoCapture(video_path)
# out = cv2.VideoWriter(target_path, cv2.VideoWriter_fourcc(*'DIVX'), video_info.fps, video_info.resolution_wh)
# frame_no = 0
# while cap.isOpened():
#     ret, frame = cap.read()
#     if not ret:
#         cv2.destroyAllWindows()
#         break

#     results = model(source=frame[440:,:], classes=[1, 2, 3, 5, 7], imgsz=640, conf=0.25, show=False)[0]  # (source=frame[440:,:]
#     detections = []
#     for r in results.boxes.data.tolist():
#         x1, y1, x2, y2, score, class_id = r
#         detections.append([int(x1 + 0.5), int(y1 + 0.5), int(x2 + 0.5), int(y2 + 0.5), score, int(class_id)])

#     # display current frame
#     cv2.imshow(f"frame {0}", frame)
#     if cv2.waitKey(30) & 0xFF == 27:
#         cv2.destroyAllWindows()
#         break
    
#     frame_no += 1
#     out.write(frame)

# cap.release()
# out.release()

In [None]:
# video_info = sv.VideoInfo.from_video_path(video_path)
# with sv.VideoSink(target_path, video_info) as s:
#     for frame in tqdm(sv.get_video_frames_generator(video_path), total=video_info.total_frames-2):
#         result = model(source=frame, classes=list(classes.keys()), imgsz=640, conf=0.25, show=False)[0]  # (source=frame[440:,:]

#         # detections = []
#         # for r in results.boxes.data.tolist():
#         #     x1, y1, x2, y2, score, class_id = r
#         #     detections.append([int(x1 + 0.5), int(y1 + 0.5), int(x2 + 0.5), int(y2 + 0.5), score, int(class_id)])
#         # for d in detections:
#         #     # if (d[1]+d[3])/2 < 440:
#         #     cv2.rectangle(frame, (d[0],d[1]), (d[2],d[3]), (0,0,255))

#         detections = sv.Detections.from_yolov8(result)
#         box_annotator = sv.BoxAnnotator(color=sv.ColorPalette.default(),  # sv.ColorPalette.default()
#                                         thickness=2,
#                                         text_color=sv.Color.black(),
#                                         text_scale=0.5,
#                                         text_thickness=1,
#                                         text_padding=5)
#         labels = [
#             f"{classes[class_id]} {confidence:0.2f}"
#             for _, _, confidence, class_id, _ in detections
#         ]
#         annotated_frame = box_annotator.annotate(
#             scene=frame,  # scene=frame.copy()
#             detections=detections,
#             labels=labels
#         )

#         s.write_frame(frame)

In [None]:
# Process Whole Video
results = model.track(source=video_path,
                      stream=True,  # stream=True should be used for long videos to avoid OOM errors
                      tracker='botsort.yaml',  # custombotsort.yaml
                      persist=True,
                      classes=list(classes.keys()),
                      imgsz=640,
                      conf=0.25,
                      iou=0.7,
                      project=r'C:\Repos\inz\runs',
                      name='hway_test_line',
                      save=True,
                      show=False,
                      line_width=3,
                      # save_txt=True,
                      # save_conf=True,
                      # agnostic_nms=False,
                      # visualize=False,
                      # save_crop=True,
                      )

In [None]:
# Process Whole Video and create DataFrame
LINE_START = Point(30, 800)
LINE_END = Point(1920-30, 800)
line_counter = LineZone(start=LINE_START, end=LINE_END)
line_annotator = LineZoneAnnotator(thickness=4, text_thickness=4, text_scale=2)

temp_table = []  # list with data for df creation

video_info = sv.VideoInfo.from_video_path(video_path)
with sv.VideoSink(target_path, video_info) as s:
    for frame, r in enumerate(results):
        # detections = sv.Detections.from_yolov8(r)
        detections = sv.Detections(xyxy=r.boxes.xyxy.cpu().numpy(),
                                    confidence=r.boxes.conf.cpu().numpy(),
                                    class_id=r.boxes.cls.cpu().numpy().astype(int),
                                    tracker_id=r.boxes.id.cpu().numpy().astype(int))

        # bboxes graphics settings
        box_annotator = sv.BoxAnnotator(color=sv.ColorPalette.default(),  # sv.ColorPalette.default()
                                        thickness=2,
                                        text_color=sv.Color.black(),
                                        text_scale=0.5,
                                        text_thickness=1,
                                        text_padding=5)
        labels = [
            f"id:{tracker_id} {classes[class_id]} {confidence:0.2f}"
            for _, _, confidence, class_id, tracker_id in detections
        ]

        # updating line counter
        # line_counter.update(detections=detections)
        line_counter.trigger(detections=detections)

        # annotating bboxes
        annotated_frame = box_annotator.annotate(
            scene=r.orig_img,  # scene=frame.copy()
            detections=detections,
            labels=labels
        )

        # annotating the line        
        # line_annotator.annotate(frame=r.orig_img, line_counter=line_counter)
        line_annotator.annotate(frame=annotated_frame, line_counter=line_counter)

        # write processed frame to video
        s.write_frame(r.orig_img)

        # append useful data to temp_table
        for row in r.boxes.data.cpu().tolist():
            x1, y1, x2, y2, id, conf, class_no = row
            temp_table.append([int(id), int(class_no), conf, frame, x1, y1, x2, y2])

# create df
df = pd.DataFrame(temp_table, columns=['id', 'class_no', 'confidence', 'frame', 'x1', 'y1', 'x2', 'y2'])

In [None]:
df

In [None]:
unique_classes = df['class_no'].unique()
frames = {class_no:[None]*len(unique_classes) for class_no in df['frame'].unique()}
for i, class_no in enumerate(unique_classes):
    series = df[df['class_no']==class_no]['frame'].value_counts()
    for frame, count in zip(series.index, series.values):
        frames[frame][i] = count
df_class_counts = pd.DataFrame.from_dict(frames, orient='index', columns=[classes[n] for n in unique_classes])  # , dtype={c:int for c in unique_classes}
df_class_counts