In [1]:
import numpy as np
import cv2
from pathlib import Path
import onnxruntime as ort
from ast import literal_eval

## Inputs

In [2]:
vid_dir_path = Path("../input_videos")
out_dir_path = Path("./output_videos")

model_path = Path("../yolo11s.onnx")
iou_tresh = 0.45
conf_tresh = 0.35
label_d = {
    1: "bicycle",
    2: "car",
    3: "motorcycle",
    4: "airplane",
    5: "bus",
    6: "train",
    7: "truck",
    8: "boat",
}
imshow = True
display_color = (0, 255, 0)

In [None]:
class ObjectBBox:
    def __init__(self, class_id, conf, center_x, center_y, width, height, scale_width=1, scale_height=1):
        self.class_id = class_id
        self.conf = conf
        scaled_center = center_x * scale_width, center_y * scale_height
        scaled_size = width * scale_width, height * scale_height
        half_size = scaled_size[0] * 0.5, scaled_size[1] * 0.5
        self.x1, self.y1 = scaled_center[0] - half_size[0], scaled_center[1] - half_size[1]
        self.x2, self.y2 = scaled_center[0] + half_size[0], scaled_center[1] + half_size[1]
        self.area = scaled_size[0] * scaled_size[1]
    
    def __str__(self):
        return f"{self.class_id} ({self.conf}) {self.x1, self.y1} {self.x2, self.y2}"

def calc_iou(bbox1, bbox2):
    x1 = max(bbox1.x1, bbox2.x1)
    y1 = max(bbox1.y1, bbox2.y1)
    x2 = min(bbox1.x2, bbox2.x2)
    y2 = min(bbox1.y2, bbox2.y2)
    inter_w, inter_h = x2 - x1, y2 - y1
    if inter_w <= 0 or inter_h <= 0:
        return 0
    interArea = inter_w * inter_h
    union = bbox1.area + bbox2.area - interArea
    if union <= 0:
        return 0
    return interArea / union

class YOLOv11:
    
    input_size = np.array((640, 640))
    providers = ["CUDAExecutionProvider", "CoreMLExecutionProvider", "CPUExecutionProvider"]
    
    def __init__(self, model_path, min_conf, iou_thresh, valid_class_checker=None):
        self.model_path = model_path
        self.ort_sess = ort.InferenceSession(
            model_path, providers=YOLOv11.providers
        )
        self.input_name = self.ort_sess.get_inputs()[0].name
        self.output_name = self.ort_sess.get_outputs()[0].name
        self.min_conf = min_conf
        self.iou_thresh = iou_thresh
        self.valid_class_checker = valid_class_checker if valid_class_checker else lambda cls: True
            
        ## Get class name map from onnx model's metadata
        meta = self.ort_sess.get_modelmeta()
        custom_metadata = meta.custom_metadata_map  # Corrected key name
        assert "names" in custom_metadata, "Error: ONNX model does not contain 'names' metadata"
        self.class_name_map = literal_eval(custom_metadata["names"])

    def _preprocess_input(self, input_img):
        inp = cv2.resize(input_img, YOLOv11.input_size)
        inp = inp.astype(np.float32) / 255.0
        inp = np.transpose(inp, (2, 0, 1)) # from HxWxC to CxHxW
        inp = np.expand_dims(inp, axis=0)
        return inp
    
    # perform Non Max Suppression
    def _postprocess_output(self, raw_out, original_shape):
        raw_out = np.squeeze(raw_out)
        assert raw_out.shape[0] == 4 + len(self.class_name_map), f"Output not in valid shape {raw_out.shape}"
        n_det = raw_out.shape[1]
        assert n_det > 0, f"Output not in valid shape {raw_out.shape}"
        scale = np.array(original_shape) / YOLOv11.input_size
        
        # List to keep track of valid bboxs
        valid_bbox_l = []
        # Keep track of suppressed detections
        suppresed_mask = np.zeros(n_det, dtype=bool)
        
        ## Sort object scores in descending order
        # find out the max scored class in each detection
        class_idx_l = np.argmax(raw_out[4:, :], axis=0)
        # get max scores for each detection
        max_conf_l = np.take_along_axis(raw_out[4:, :], class_idx_l[None, :], axis=0).squeeze()
        # now sort based on scores
        order_idx_l = np.argsort(max_conf_l)[::-1] # descending
        
        ## Iterate through all detections
        for i, idx1 in enumerate(order_idx_l):
            cls = class_idx_l[idx1]
            conf = max_conf_l[idx1]
            # if it is suppressed or if not a valid class or if conf is less then min_conf
            if suppresed_mask[idx1] or (not self.valid_class_checker(cls)) or conf < self.min_conf:
                continue
            lbl = self.class_name_map[cls]
            bbox1 = ObjectBBox(lbl, conf, *raw_out[:4, idx1], scale[1], scale[0])
            ## Select detection as valid
            valid_bbox_l.append(bbox1)
            
            for idx2 in order_idx_l[i+1:]:
                cls = class_idx_l[idx2]
                conf = max_conf_l[idx2]
                # if it is suppressed or if not a valid class or if conf is less then min_conf
                if suppresed_mask[idx2] or (not self.valid_class_checker(cls)) or conf < self.min_conf:
                    continue
                lbl = self.class_name_map[cls]
                bbox2 = ObjectBBox(lbl, conf, *raw_out[:4, idx2], scale[1], scale[0])
                iou = calc_iou(bbox1, bbox2)
                if (iou > self.iou_thresh):
                    suppresed_mask[idx2] = True
        
        return valid_bbox_l
    
    def detect(self, image):
        image = np.squeeze(image)
        assert isinstance(image, np.ndarray) and image.dtype == np.uint8, "Not a valid image"
        shape = image.shape
        assert len(shape) == 3 and shape[-1] == 3, "Input is not in HxWxC formaat"
        original_shape = shape[:2]
        
        # Preprocess
        inp = self._preprocess_input(image)
        # Infer
        out_l = self.ort_sess.run([self.output_name], {self.input_name: inp})
        if len(out_l) != 1:
            print("Error: Infering YOLOv11 failed.")
            return None
        bbox_l = self._postprocess_output(out_l[0], original_shape)
        return bbox_l

## Init

In [4]:
# Model
model = YOLOv11(
    model_path=model_path,
    min_conf=conf_tresh,
    iou_thresh=iou_tresh,
    valid_class_checker=lambda x: x in label_d.keys(),
)

# Input
vid_path_l = list(vid_dir_path.glob("*.mp4"))
assert len(vid_path_l) > 0, "Videos not found"

# Output
out_dir_path.mkdir(exist_ok=True)
assert out_dir_path.is_dir(), f"Error in creating out dir {out_dir_path}"

[0;93m2025-02-08 10:48:19.268221 [W:onnxruntime:, coreml_execution_provider.cc:115 GetCapability] CoreMLExecutionProvider::GetCapability, number of partitions supported by CoreML: 15 number of nodes in the graph: 320 number of nodes supported by CoreML: 304[m


In [5]:
for vid_path in vid_path_l:
    cap = cv2.VideoCapture(vid_path)
    if not cap.isOpened():
        print(f"Error in reading {vid_path} video")
        continue
    out_vid_p = out_dir_path / vid_path.name
    
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(out_vid_p, fourcc, fps, (frame_width, frame_height))
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break 
        
        bbox_l = model.detect(frame)
        if imshow:
            cv2.namedWindow(vid_path.name)
        
        for bbox in bbox_l:
            label = bbox.class_id
            conf = bbox.conf
            x1, y1, x2, y2 = map(lambda x: int(x), (bbox.x1, bbox.y1, bbox.x2, bbox.y2))
            cv2.rectangle(frame, (x1, y1), (x2, y2), display_color, 2) 
            cv2.putText(frame, f"{label}({conf:.2f})", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, display_color, 2)
        
        if imshow:
            cv2.imshow(vid_path.name, frame) 
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

    cv2.destroyAllWindows()

2025-02-08 10:48:22.369 Python[7956:134822] +[IMKClient subclass]: chose IMKClient_Modern
2025-02-08 10:48:22.369 Python[7956:134822] +[IMKInputSession subclass]: chose IMKInputSession_Modern


In [6]:
cv2.destroyAllWindows()