In [1]:
import cv2
from ultralytics import YOLO
import torch
from threading import Thread

def capture_video(rtsp_url, queue):
    cap = cv2.VideoCapture(rtsp_url)
    while True:
        success, img = cap.read()
        if not success:
            print("Failed to access the camera.")
            break
        queue.append(img)
    cap.release()



def video_detection():
    rtsp_url = 'rtsp://Test:Test@123@172.27.4.28'
    classNames = ['Halmet', 'Mask', 'NO-Halmet', 'NO-Mask', 'NO-Safety Vest', 'Person', 'Safety Cone',
                  'Safety Vest', 'machinery', 'vehicle']
    

    # model = YOLO("YOLO-Weights/ppe.pt") 
    model = YOLO("YOLO-Weights/ppe.pt").to('cuda') 

    frame_queue = []

    capture_thread = Thread(target=capture_video, args=(rtsp_url, frame_queue))
    capture_thread.daemon = True
    capture_thread.start()

    while True:
        if len(frame_queue) == 0:
            continue 
        
        img = frame_queue.pop(0)
        img_resized = cv2.resize(img, (640, 480))  # Reduce resolution for faster processing
        # img_tensor = torch.from_numpy(img_resized).permute(2, 0, 1).float().div(255.0).unsqueeze(0)
        img_tensor = torch.from_numpy(img_resized).permute(2, 0, 1).float().div(255.0).unsqueeze(0).to('cuda')
        results = model(img_tensor)

        for r in results:
            boxes = r.boxes
            for box in boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0]) 
                conf = round(float(box.conf[0]) * 100) / 100  
                cls = int(box.cls[0])
                class_name = classNames[cls]
                label = f'{class_name} {conf:.2f}'
                
                if class_name in ['Mask', 'Halmet', 'Safety Vest']:
                    color = (0, 255, 0)  
                elif class_name in ['NO-Halmet', 'NO-Mask', 'NO-Safety Vest']:
                    color = (0, 0, 255)  
                elif class_name in ['machinery', 'vehicle']:
                    color = (0, 149, 255)  
                else:
                    color = (85, 45, 255)  
                
                if conf > 0.5:  
                    cv2.rectangle(img_resized, (x1, y1), (x2, y2), color, 3)
                    t_size = cv2.getTextSize(label, 0, fontScale=1, thickness=2)[0]
                    c2 = (x1 + t_size[0], y1 - t_size[1] - 3)
                    cv2.rectangle(img_resized, (x1, y1), c2, color, -1, cv2.LINE_AA)  
                    cv2.putText(img_resized, label, (x1, y1 - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, [255, 255, 255], 2)

        cv2.imshow("Camera Feed", img_resized)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            print("Exiting...")
            break
    
    cv2.destroyAllWindows()



In [3]:
# Install the required packages for OpenCV GUI support

%pip install opencv-python-headless

# Run the video detection function
video_detection()

Note: you may need to restart the kernel to use updated packages.

0: 480x640 1 NO-Safety Vest, 1 Person, 71.1ms
Speed: 0.0ms preprocess, 71.1ms inference, 1.8ms postprocess per image at shape (1, 3, 480, 640)


error: OpenCV(4.11.0) /io/opencv/modules/highgui/src/window.cpp:1301: error: (-2:Unspecified error) The function is not implemented. Rebuild the library with Windows, GTK+ 2.x or Cocoa support. If you are on Ubuntu or Debian, install libgtk2.0-dev and pkg-config, then re-run cmake or configure script in function 'cvShowImage'
