In [18]:
import cv2
import numpy as np
import torch
import onnxruntime as ort
from pathlib import Path
import time
from queue import Queue
from threading import Thread

In [19]:
from ultralytics import YOLO

# Load the PyTorch model
model = YOLO(r"C:\Learning\AIPC\YoloV8\data\runs\detect\yolov8s_ppe_css_50_epochs\weights\best.pt")  # Provide path to your best.pt model

# Export to ONNX format
model.export(format="onnx", imgsz=640, dynamic=True)  # 640 is the input size, change if needed


  ckpt = torch.load(file, map_location="cpu")


Ultralytics YOLOv8.1.29 🚀 Python-3.10.10 torch-2.5.1+cu118 CPU (12th Gen Intel Core(TM) i9-12900HK)
Model summary (fused): 168 layers, 11129454 parameters, 0 gradients, 28.5 GFLOPs

[34m[1mPyTorch:[0m starting from 'C:\Learning\AIPC\YoloV8\data\runs\detect\yolov8s_ppe_css_50_epochs\weights\best.pt' with input shape (1, 3, 640, 640) BCHW and output shape(s) (1, 14, 8400) (21.5 MB)

[34m[1mONNX:[0m starting export with onnx 1.17.0 opset 19...
[34m[1mONNX:[0m export success ✅ 0.7s, saved as 'C:\Learning\AIPC\YoloV8\data\runs\detect\yolov8s_ppe_css_50_epochs\weights\best.onnx' (42.5 MB)

Export complete (2.5s)
Results saved to [1mC:\Learning\AIPC\YoloV8\data\runs\detect\yolov8s_ppe_css_50_epochs\weights[0m
Predict:         yolo predict task=detect model=C:\Learning\AIPC\YoloV8\data\runs\detect\yolov8s_ppe_css_50_epochs\weights\best.onnx imgsz=640  
Validate:        yolo val task=detect model=C:\Learning\AIPC\YoloV8\data\runs\detect\yolov8s_ppe_css_50_epochs\weights\best.onnx img

'C:\\Learning\\AIPC\\YoloV8\\data\\runs\\detect\\yolov8s_ppe_css_50_epochs\\weights\\best.onnx'

In [28]:
import cv2
import numpy as np
import onnxruntime as ort
import os
from ultralytics import YOLO

# Export model to ONNX format
def export_model_to_onnx(model_path, onnx_path, input_size=640):
    model = YOLO(r"C:\Learning\AIPC\YoloV8\data\runs\detect\yolov8s_ppe_css_50_epochs\weights\best.pt")  # Load the PyTorch model
    model.export(format="onnx", imgsz=input_size, dynamic=True)  # Export to ONNX format

# Class to handle YOLOv8 model inference using ONNX
class YOLOv8ONNX:
    def __init__(self, model_path, input_size=640):
        # Load the ONNX model using ONNX Runtime
        self.session = ort.InferenceSession(model_path, providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
        self.input_name = self.session.get_inputs()[0].name
        self.output_name = self.session.get_outputs()[0].name
        self.input_size = input_size

    def preprocess(self, frame):
        """Preprocess a single frame/image."""
        original_shape = frame.shape[:2]
        resized = cv2.resize(frame, (self.input_size, self.input_size))
        rgb_image = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
        normalized = rgb_image / 255.0
        transposed = np.transpose(normalized, (2, 0, 1))  # HWC -> CHW format
        input_tensor = np.expand_dims(transposed, axis=0).astype(np.float32)
        return input_tensor, original_shape

    def run_inference(self, batch_tensor):
        """Run inference on the image."""
        outputs = self.session.run([self.output_name], {self.input_name: batch_tensor})
        return outputs

    def postprocess(self, outputs, original_shapes, conf_threshold=0.25):
        """Postprocess model outputs to extract bounding boxes."""
        all_boxes = []
        for idx, predictions in enumerate(outputs[0]):
            boxes = []
            for pred in predictions:
                conf = pred[4] * pred[5]
                if conf > conf_threshold:
                    x, y, w, h = pred[:4]
                    x1 = int((x - w / 2) * original_shapes[idx][1])
                    y1 = int((y - h / 2) * original_shapes[idx][0])
                    x2 = int((x + w / 2) * original_shapes[idx][1])
                    y2 = int((y + h / 2) * original_shapes[idx][0])
                    boxes.append((x1, y1, x2, y2, conf))
            all_boxes.append(boxes)
        return all_boxes

# Image processing function
def process_image(input_image_path, output_image_path, model_path, input_size=640, conf_threshold=0.25):
    # Initialize the ONNX model
    model = YOLOv8ONNX(model_path, input_size)

    # Read the image
    image = cv2.imread(input_image_path)
    if image is None:
        raise ValueError(f"Failed to load image from path: {input_image_path}")

    # Preprocess the image
    input_tensor, original_shape = model.preprocess(image)

    # Run inference
    outputs = model.run_inference(input_tensor)

    # Postprocess the results
    detections = model.postprocess(outputs, [original_shape], conf_threshold)

    # Draw bounding boxes on the image
    for x1, y1, x2, y2, conf in detections[0]:  # Only one image (batch size 1)
        x1, y1, x2, y2 = max(0, x1), max(0, y1), min(x2, image.shape[1]), min(y2, image.shape[0])
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(image, f"{conf:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Ensure the output path has the correct file name (add extension if missing)
    if not output_image_path.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        output_image_path += '.jpg'

    # Save the output image and check if it was successful
    if cv2.imwrite(output_image_path, image):
        print(f"Processed image saved to: {output_image_path}")
    else:
        print(f"Failed to save processed image to: {output_image_path}")

# Video processing function
def process_video(input_video_path, output_video_path, model_path, input_size=640, conf_threshold=0.25, batch_size=4):
    # Initialize the ONNX model
    model = YOLOv8ONNX(model_path, input_size)

    # Open video capture
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        raise ValueError(f"Failed to open video file: {input_video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_size = (frame_width, frame_height)

    # Test with MJPG codec (more universal)
    fourcc = cv2.VideoWriter_fourcc(*'MJPG')  # 'MJPG' codec should work on most systems
    out = cv2.VideoWriter(output_video_path, fourcc, fps, frame_size)

    # Check if the VideoWriter was opened successfully
    if not out.isOpened():
        raise ValueError(f"Failed to open video writer for output file: {output_video_path}")

    frame_count = 0  # Track the number of frames processed
    frame_queue = []
    results_queue = []

    while True:
        ret, frame = cap.read()
        if not ret:
            print(f"End of video reached. Processed {frame_count} frames.")
            break

        frame_count += 1

        # Preprocess frame
        input_tensor, original_shape = model.preprocess(frame)

        # Run inference on a batch (in this case, we are processing one frame at a time)
        frame_queue.append(input_tensor)
        if len(frame_queue) == batch_size:
            batch_tensor = np.vstack(frame_queue)
            outputs = model.run_inference(batch_tensor)
            detections = model.postprocess(outputs, [original_shape] * batch_size, conf_threshold)
            for idx, det in enumerate(detections):
                results_queue.append((frame_queue[idx], det))
            frame_queue = []

        # Process results and write frames
        if results_queue:
            frame, detections = results_queue.pop(0)
            frame = np.squeeze(frame, axis=0)  # Remove batch dimension
            frame = np.transpose(frame, (1, 2, 0))  # (C, H, W) -> (H, W, C)
            frame = (frame * 255).astype(np.uint8)  # Denormalize

            # Convert from RGB to BGR before writing to video
            frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

            # Draw bounding boxes on the frame
            for x1, y1, x2, y2, conf in detections:
                x1, y1, x2, y2 = max(0, x1), max(0, y1), min(x2, frame.shape[1]), min(y2, frame.shape[0])
                cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame_bgr, f"{conf:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            # Write the frame to the output video
            out.write(frame_bgr)
        
    # Release video resources
    cap.release()
    out.release()
    print(f"Video processing completed. {frame_count} frames processed.")

# Process input based on file type (image or video)
def process_input(input_path, output_path, model_path, input_size=640, conf_threshold=0.25, batch_size=4):
    # Check if the input is a video or an image
    file_extension = os.path.splitext(input_path)[-1].lower()
    
    if file_extension in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']:
        # Process as an image
        process_image(input_path, output_path, model_path, input_size, conf_threshold)
    elif file_extension in ['.mp4', '.avi', '.mov', '.mkv', '.flv']:
        # Process as a video
        process_video(input_path, output_path, model_path, input_size, conf_threshold, batch_size)
    else:
        raise ValueError("Unsupported file format. Please upload an image or video.")

# Example usage
input_path = r"C:\Learning\AIPC\YoloV8\data\css-data\test\images\777_jpg.rf.92dc6945342410ced7ac93f3dfbff0c5.jpg"  # Replace with your input file path (image or video)
output_path = r"C:\Learning\AIPC\YoloV8\data\css-data\test\images\output.jpg"  # Output file path
onnx_model = r"C:\Learning\AIPC\YoloV8\data\runs\detect\yolov8s_ppe_css_50_epochs\weights\best.onnx"  # Path to the ONNX model

process_input(input_path, output_path, onnx_model)


Processed image saved to: C:\Learning\AIPC\YoloV8\data\css-data\test\images\output.jpg


In [1]:
import cv2

video_path = r"C:\Learning\AIPC\YoloV8\redZone\css-data\example_video.mp4" 
cap = cv2.VideoCapture(video_path)
if cap.isOpened():
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
    height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
    print(f"FPS: {fps}, Width: {width}, Height: {height}")
cap.release()


FPS: 29.0, Width: 1280.0, Height: 720.0
