## Imports

In [1]:
from ultralytics import YOLO
import cv2 
from matplotlib import pyplot as plt
import time
import psutil

## Load model 

In [2]:
# Load models
v8n_person = YOLO("models/yolo/train_runs/yolov8n_person_100epochs/detect/train/weights/best.pt")
v8n_mask = YOLO("models/yolo/train_runs/yolov8n_mask_100epochs/train/weights/best.pt")
v11n_person = YOLO("models/yolo/train_runs/yolo11n_person_100epochs/train/weights/best.pt")
v11n_mask = YOLO("models/yolo/train_runs/yolo11n_mask_100epochs/train/weights/best.pt")


# Create model list
model_dict = {"v8n_person":v8n_person, "v8n_mask":v8n_mask, "v11n_person":v11n_person, "v11n_mask":v11n_mask}

## Test if model was loaded correctly

In [3]:
# Function to plot result images
def matplot_image(results: list, model_name: str):
    """
        Display the image using matplotlib
        :param results: list of results from the model
        :param model_name: name of the model
    """

    im  = results[0].plot()
    # Convert BGR (OpenCV format) to RGB (matplotlib format)
    im_rgb = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)

    # Add model name to the image
    plt.title(model_name)

    # Display the image using matplotlib
    plt.imshow(im_rgb)
    plt.axis("off")  # Turn off axes for a cleaner display
    plt.show()


In [4]:
# Set confidence and IOU threshold
conf = 0.3
iou = 0.3

# Test models
for model_name, model in model_dict.items():
    for i in range(3):
        results = model.predict(f"data_sets/image_data/mask_person_test/{i}.jpg", conf=conf, iou=iou, verbose=False)

        # Show results
        #matplot_image(results, model_name)

## Get RAM usage

In [5]:
# Get RAM usage
def get_ram_usage():
    """
        Get the RAM usage of the system
        :return: RAM usage in MB
    """


    # Get current process
    process = psutil.Process()

    # Get memory usage in bytes and convert to MB
    return process.memory_info().rss / (1024 ** 2)  # Resident Set Size (RSS)
    

## Put stats bar on frame

In [6]:
# Put stats bar on frame
def put_stats_bar(frame, stats_dict: dict, bar_height=30,font_scale=0.5, font_thickness=1):
    """
        Add a stats bar at the bottom of the frame
        :param frame: frame to add the stats bar to
        :param t0_inf: start time of inference
        :param t1_inf: end time of inference
        :param t0_ann: start time of annotation
        :param t1_ann: end time of annotation
        :param loop_time: time taken to process the frame
        :param bar_height: height of the stats bar
        :param font_scale: font scale of the text
        :param font_thickness: font thickness of the text
    """
    stats_text = ""
    # Convert stats dict to text
    for stat_name, stat in stats_dict.items():
        stats_text += f"{stat_name}: {stat:.2f}  "
    
        
    # Add a rectangle at the bottom of the frame
    h, w, _ = frame.shape  # Get frame dimensions
    cv2.rectangle(frame, (0, h - bar_height), (w, h), (0, 0, 0), -1)  # Black rectangle

    # Add text inside the rectangle
    text_color = (255, 255, 255)  # White text
    cv2.putText(frame, stats_text, (10, h - 10), cv2.FONT_HERSHEY_SIMPLEX, font_scale, text_color, font_thickness)

## Put bounding boxes

In [7]:
def put_bounding_boxes(frame, result, model, colorcode=(0,255,0), color_static=True):

    for i in range(len(result[0].boxes.cls)):
        x1, y1, x2, y2 = map(int, result[0].boxes.xyxy[i])
        clss, conf = result[0].boxes.cls[i], result[0].boxes.conf[i]
        text = f"{model.names[int(clss)]} ({conf:.2f})"

        if not color_static:
            if int(clss) == 0:
                colorcode = (255, 165, 0)
            elif int(clss) == 1:
                colorcode = (0, 0, 255)
            elif int(clss) == 2:
                colorcode = (0,255,0)
        cv2.rectangle(frame, (x1, y1), (x2, y2), colorcode, 2)  # Red for model 2
        cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, colorcode, 2)

## Put distance mesure line

In [8]:
def put_distance_line(frame, result, distance_threshold=150, avg_mask_box_size=25):
    
    boxes = []   # Set to store bounding box tuples (center, scale)
    for i in range(len(result[0].boxes.cls)):
        # Get bounding box coordinates
        x1, y1, x2, y2 = map(int, result[0].boxes.xyxy[i])

        # Calculate the number of pixels per cm and the center of the bounding box
        pxl_per_cm = (y2-y1)/avg_mask_box_size  # pixels per cm
        box_center_x = (x1+x2)/2  # x center coordinate
        box_center_y = (y1+y2)/2  # y center coordinate
        boxes.append((pxl_per_cm, box_center_x, box_center_y))  # Add tuple to the list

    # Sort the boxes by scale    
    boxes_sort_scale = sorted(boxes, key=lambda x: x[0])    # Sort the boxes by scale
    for i in range(len(boxes_sort_scale)-1):

        # Calculate the distance between the bounding boxes
        pxl_distance_x = int(boxes_sort_scale[i+1][1]-boxes_sort_scale[i][1])  # X distance
        pxl_distance_y = int(boxes_sort_scale[i+1][2]-boxes_sort_scale[i][2])  # Y distance
        avg_scale = int((boxes_sort_scale[i][0] + boxes_sort_scale[i+1][0]) / 2) # Calculate the average scale

        # Calculate euclidean distance
        distance = (pxl_distance_x**2 + pxl_distance_y**2)**0.5 / avg_scale  # Calculate the distance between the bounding boxes

        # Get line start and end points
        pt1= (int(boxes_sort_scale[i][1]), int(boxes_sort_scale[i][2]))     # Get the first point
        pt2= (int(boxes_sort_scale[i+1][1]), int(boxes_sort_scale[i+1][2])) # Get the second point
        pt_middle = (int((pt1[0]+pt2[0])/2), int((pt1[1]+pt2[1])/2))    # Get the middle point

        # Check distance threshold
        colorcode = (0, 165, 0)
        if distance < distance_threshold:
            colorcode = (0, 0, 255)

        # Add line and text to the frame
        cv2.line(frame, pt1, pt2, colorcode, 2)
        cv2.putText(frame, f"{distance:.2f} cm", pt_middle, cv2.FONT_HERSHEY_SIMPLEX, 1, colorcode, 2)

## Class frame fetcher with multithreading

In [9]:
import threading

class FrameFetcher:
    def __init__(self, src=0):
        self.cap = cv2.VideoCapture(src)
        self.cap.set(cv2.CAP_PROP_FPS, 30)
        self.ret, self.frame = self.cap.read()
        self.stopped = False
        self.lock = threading.Lock()  # Thread safety

        # Start the background frame fetching thread
        self.thread = threading.Thread(target=self.update, daemon=True)
        self.thread.start()

    def update(self):
        """Continuously fetch frames in a separate thread"""
        while not self.stopped:
            ret, frame = self.cap.read()
            if ret:
                with self.lock:  # Ensure thread safety
                    self.ret, self.frame = ret, frame

    def get_frame(self):
        """Get the latest available frame"""
        with self.lock:
            return self.ret, self.frame

    def stop(self):
        """Stop the background thread and release the camera"""
        self.stopped = True
        self.thread.join()
        self.cap.release()

## Test Webcam as input

In [10]:
# Set confidence and IOU threshold
conf_tresh = 0.3
iou_tresh = 0.3
pred_framerate = 1
model_mask = v11n_mask

# Initialize the threaded frame fetcher
fetcher = FrameFetcher(0)

# Start time and frame counter
start_time = time.time()
frame_count = 0
stats_dict = {}
while True:
    frame_fetch_time = time.time()    # Start time
    ret, frame = fetcher.get_frame()
    if not ret:
        break
    stats_dict["Frame fetch time"] = (time.time() - frame_fetch_time)*1000    # End time

    frame_count += 1  # Count total frames
    if frame_count == 100:
        frame_count = 0
        start_time = time.time()

    # Check every pred_framerate frames
    if frame_count % pred_framerate == 0:
        # Perform inference for person and mask models
        inf_time = time.time()    # Start time  
        result = model_mask(frame, conf=conf_tresh, iou=iou_tresh, stream=True, verbose=False)
        result = list(result)   # Convert generator to list
        inf_time = (time.time() - inf_time)*1000    # End time
        stats_dict["Inference time"] = inf_time

    box_time = time.time()    # Start time
    # Add bounding boxes to the frame
    put_bounding_boxes(frame, result, model_mask, color_static=False)
    stats_dict["Annotation time"]=(time.time() - box_time)*1000    # End time

    line_time = time.time()    # Start time
    # Add distance line to the frame
    put_distance_line(frame=frame, result=result, distance_threshold=150, avg_mask_box_size=25)
    stats_dict["Line time"]=(time.time() - line_time)*1000    # End time

    # Calculate FPS (frames per second)
    elapsed_time = time.time() - start_time  # Total time since start
    stats_dict["FPS"] = frame_count / elapsed_time  # Frames per second
    stats_dict["PROP-FPS"] = fetcher.cap.get(cv2.CAP_PROP_FPS)

    # Add stats bar to the frame
    put_stats_bar(frame, stats_dict=stats_dict)

    # Show the frame
    if cv2.waitKey(1) & 0xFF == ord('q'):  # Quit with 'q'
        break
    cv2.imshow("Detections", frame)

    pred_framerate = 5


fetcher.stop()
cv2.destroyAllWindows()




2025-01-30 10:44:12.847 Python[63226:4917998] +[IMKClient subclass]: chose IMKClient_Modern
2025-01-30 10:44:12.847 Python[63226:4917998] +[IMKInputSession subclass]: chose IMKInputSession_Modern
