In [1]:
import time
from pprint import pprint
from concurrent.futures import ProcessPoolExecutor

import cv2
import torch
import mediapipe as mp

2023-12-11 20:22:57.666353: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-12-11 20:22:57.687053: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-12-11 20:22:57.687074: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-12-11 20:22:57.687548: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-12-11 20:22:57.691311: I tensorflow/core/platform/cpu_feature_guar

In [2]:
class PersonDetection():
    def __init__(self, yolo_model):
        self.model = yolo_model
        # We are only intrested in detecting person
        self.model.classes=[0]
        
        self.frame = None
        self.result = None

    def set_frame(self, frame):
        self.frame = frame
    
    def detect(self):
        # Inference
        self.result = self.model(self.frame)

    def get_coordinate(self):
        return self.result.xyxy[0]

    def debug(self):
        print("\nDEBUG INFORMATION - Person Detection")
        print("FRAME")
        pprint(self.frame)
        print("RESULT")
        pprint(self.result)      


In [3]:
class SinglePoseEstimation():
    def __init__(self):
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_pose = mp.solutions.pose

        self.pose = self.mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)
        
        self.frame = None
        self.result = None
        self.landmarks = None
        
    
    def set_frame(self, frame):
        self.frame = frame

    def estimate(self):
        # Convert BGR to RGB
        image = cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        
        # Process image
        self.result = self.pose.process(image)
    
        # Convert RGB to BGR
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        image.flags.writeable = True

    def get_landmarks(self):
        # Extract feature
        try:
            self.landmarks = self.result.pose_landmarks.landmark
            return self.landmarks
        except:
            return None
    
    def get_annotated_frame(self):
        # Render detection
        self.mp_drawing.draw_landmarks(self.frame, self.result.pose_landmarks, self.mp_pose.POSE_CONNECTIONS,
                                self.mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
                                self.mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2))
        return self.frame
        

In [4]:
# Model
yolo_model = torch.hub.load('ultralytics/yolov5', 'yolov5s')

Using cache found in /home/muzakki/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2023-12-9 Python-3.9.15 torch-2.1.0+cu121 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


# ALL IN ONE PROCESS 

In [6]:
person_detector = PersonDetection(yolo_model)
single_pose = SinglePoseEstimation()

cap = cv2.VideoCapture("sample/rollin720.mp4")
w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
while True:
    ret, frame = cap.read()
    if not ret:
        break

    start = time.time()
    
    person_detector.set_frame(frame)
    person_detector.detect()
    
    for person in person_detector.get_coordinate():
        x1, y1, x2, y2, confidence, _ = person.to(int)

        cropped_frame = frame[y1:y2, x1:x2]

        single_pose.set_frame(cropped_frame)
        single_pose.estimate()

        landmarks = single_pose.get_landmarks()
        if landmarks is not None:
            cropped_frame = single_pose.get_annotated_frame()
            frame[y1:y2, x1:x2] = cropped_frame
    
    cv2.imshow("webcam", frame)

    end = time.time()
    inference_time = end-start
    print(f"\rInference time : {round(inference_time, 3)} second | {round((end-start)*1000, 3)} ms", end='', flush=True)

    # Break the loop if 'q' key is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the capture object and close the window
cap.release()
cv2.destroyAllWindows()


Inference time : 0.03 second | 30.297 mss

I0000 00:00:1702300984.845947   11857 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1702300984.892703   11986 gl_context.cc:344] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 510.108.03), renderer: NVIDIA GeForce RTX 3060/PCIe/SSE2
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


Inference time : 0.091 second | 90.585 mss

In [None]:
left_ear = {
    "x" : int(landmarks[single_pose.mp_pose.PoseLandmark.LEFT_EAR.value].x * w),
    "y" : int(landmarks[single_pose.mp_pose.PoseLandmark.LEFT_EAR.value].y * h)
}

right_ear = {
    "x" : int(landmarks[single_pose.mp_pose.PoseLandmark.RIGHT_EAR.value].x * w),
    "y" : int(landmarks[single_pose.mp_pose.PoseLandmark.RIGHT_EAR.value].y * h)
}

left_shoulder = {
    "x" : int(landmarks[single_pose.mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * w),
    "y" : int(landmarks[single_pose.mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * h)
}

print((left_ear["x"], 0))
print((right_ear["x"], left_shoulder["y"]))

# Draw the rectangle on the face
cv2.rectangle(cropped_frame, (left_ear["x"], 0), (right_ear["x"], left_shoulder["y"]), (255, 0, 0))

In [None]:
break

# MULTIPROCESSING - ONLY MEDIA PIPE SKELETAL DETECTION

In [None]:
person_detector = PersonDetection(yolo_model)
single_pose = SinglePoseEstimation()

In [None]:
# Function to simulate a time-consuming task
def detect_skeletal(cropped_frame):
    single_pose.set_frame(cropped_frame)
    single_pose.estimate()

In [None]:
cap = cv2.VideoCapture("sample/rollin720.mp4")
while True:
    ret, frame = cap.read()
    if not ret:
        break

    start = time.time()
    
    person_detector.set_frame(frame)
    person_detector.detect()
    
    with ProcessPoolExecutor(3) as executor:
        futures = []
        for person in person_detector.get_coordinate():
            x1, y1, x2, y2, confidence, _ = person.to(int)
    
            cropped_frame = frame[y1:y2, x1:x2]

            # Submit tasks to the executor
            futures.append(executor.submit(detect_skeletal, cropped_frame))
    
            # single_pose.set_frame(cropped_frame)
            # single_pose.estimate()

        # Wait for all tasks to complete
        results = [future.result() for future in futures]

        # for result in results:
        #     if single_pose.get_landmarks() is not None:
        #         cropped_frame = single_pose.get_annotated_frame()
        #         frame[y1:y2, x1:x2] = cropped_frame
    
    cv2.imshow("webcam", frame)

    end = time.time()
    inference_time = end-start
    print(f"\rInference time : {round(inference_time, 3)} second | {round((end-start)*1000, 3)} ms", end='', flush=True)

    # Break the loop if 'q' key is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the capture object and close the window
cap.release()
cv2.destroyAllWindows()
