In [1]:
import cv2
import torch
import os
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# Fix OpenMP crash
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

In [2]:
if gpu := torch.cuda.is_available():
    print(gpu and torch.cuda.get_device_name(0))

In [None]:
URL_STREAM = r"video/vietnam2.mp4"

def realtime_detection(stream_url):
    colors = {
        "car" : (255, 0, 0),
        "truck" : (255, 0, 0),
        "bus" : (255, 0, 0),
        "motorcycle" : (0, 255, 0),
        "bicycle" : (0, 255, 0),
        "person" : (0, 0, 255),
    }
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    model = YOLO('yolo11n.pt')
    model.to(device)

    # COCO label map from YOLO model
    label_map = model.names


    # Initialize Deep SORT
    tracker = DeepSort(max_age=30)

    cap = cv2.VideoCapture(stream_url)
    if not cap.isOpened():
        print("Error opening video stream.")
        return

    while True:
        ret, frame = cap.read()
        if not ret or frame is None:
            print("Stream ended or invalid frame.")
            break

        results = model(frame)[0]
        detections = []

        for box in results.boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            label = label_map.get(f"{cls_id} {conf}", "unknown")



            xyxy = box.xyxy[0].cpu().numpy()  # [x1, y1, x2, y2]
            x1, y1, x2, y2 = xyxy
            w, h = x2 - x1, y2 - y1
            x_center, y_center = x1, y1

            # Format for Deep SORT: [center_x, center_y, width, height]
            detections.append(([x_center, y_center, w, h], conf, cls_id))

        # Update Deep SORT tracker
        try:
            tracks = tracker.update_tracks(detections, frame=frame)
        except Exception as e:
            print("Tracking error:", e)
            continue

        # Annotate frame
        for track in tracks:
            if not track.is_confirmed():
                continue

            track_id = track.track_id
            ltrb = track.to_ltrb()  # [left, top, right, bottom]
            x1, y1, x2, y2 = map(int, ltrb)
            print(track.__dict__)

            cls_id = track.det_class  # class ID from Deep SORT (optional)
            cls_conf = track.det_conf
            label = label_map.get(cls_id, 'unknown')
            if cls_conf == None:
                continue
            if label not in colors:
                continue

            label_text = f"{label} {track_id} {cls_conf:.2f}"

            # Draw box and label
            cv2.rectangle(frame, (x1, y1), (x2, y2), colors[label], 2)
            cv2.putText(frame, label_text, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, colors[label], 2)

        # Show result
        cv2.imshow('Deteksi CCTV Real-time - DAGO', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    realtime_detection(URL_STREAM)



0: 384x640 2 persons, 3 cars, 3 motorcycles, 49.1ms
Speed: 1.8ms preprocess, 49.1ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 3 cars, 3 motorcycles, 39.0ms
Speed: 1.4ms preprocess, 39.0ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 3 cars, 3 motorcycles, 38.2ms
Speed: 1.5ms preprocess, 38.2ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)
{'mean': array([     829.53,      771.46,     0.39641,      351.81,  -0.0089491,   0.0060678, -1.7551e-11,    0.044141]), 'covariance': array([[      241.1,           0,           0,           0,      99.659,           0,           0,           0],
       [          0,       241.1,           0,           0,           0,      99.659,           0,           0],
       [          0,           0,  0.00028756,           0,           0,           0,  2.8947e-10,           0],
       [          0,           0,           0,       241.1,           0,   