In [48]:
# Thêm autoreload vào để tự động reload lại module nếu có thay đổi code trong module
%load_ext autoreload
%autoreload 2

import mediapipe as mp
import numpy as np
import pandas as pd
import pickle
import cv2
from PIL import Image
import warnings
warnings.filterwarnings('ignore')
from joblib import parallel_backend

import os, sys
sys.path.append(os.path.abspath(".."))
from utils.training_storage_helper import load_model

os.environ['JOBLIB_VERBOSE'] = '0'

# Drawing helpers
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [49]:
# RF_model.pkl là sau điều chỉnh tham số
RF_model = load_model('../best_models/random_forest.pkl')

# Load input scaler
input_scaler = load_model("../best_models/input_scaler.pkl")

In [50]:
IMPORTANT_LMS = [
    "nose",
    "left_shoulder",
    "right_shoulder",
    "left_elbow",
    "right_elbow",
    "left_pinky",
    "right_pinky",
    "left_index",
    "right_index",
    "left_hip",
    "right_hip",
    "left_knee",
    "right_knee",
    "left_foot_index",
    "right_foot_index",
]

# Tạo các cột cho dữ liệu đầu vào
HEADERS = ["label"]
for landmark in IMPORTANT_LMS:
    for dim in ['x', 'y', 'z']:
        HEADERS.append(f"{landmark.lower()}_{dim}")

In [51]:
class PoseRotationHelper:
    def __init__(self):
        pass

    def rotate_keypoints(self, keypoints, origin_size = (612, 408)) -> np.array:
        left_shoulder = keypoints.landmark[11]
        right_shoulder = keypoints.landmark[12]
        left_hip = keypoints.landmark[23]
        right_hip = keypoints.landmark[24]

        center_shoulder = (left_shoulder.x + right_shoulder.x) / 2, (left_shoulder.y + right_shoulder.y) / 2
        center_hip = (left_hip.x + right_hip.x) / 2, (left_hip.y + right_hip.y) / 2

        center = (center_shoulder[0] + center_hip[0]) / 2, (center_shoulder[1] + center_hip[1]) / 2

        O_center_shoulder = (center_shoulder[0] - center[0], center_shoulder[1] - center[1])
        Oy = (0, -1)

        theta = self.calculate_phase_difference(O_center_shoulder, Oy)

        # rotate each key point
        for point in keypoints.landmark:
            point_rotated = self.rotate_point((point.x, point.y), center, theta, origin_size)
            point.x = point_rotated[0]
            point.y = point_rotated[1]

        return keypoints
    
    def calculate_phase_difference(self, OA: tuple, OB: tuple) -> float:
        """
        Calculate the phase difference (in degrees) of vector OB relative to vector OA.
        :param OA: A tuple representing the first vector (OA).
        :param OB: A tuple representing the second vector (OB).
        :return: Phase difference in degrees from 0 to 360.
        """
        # Calculate the dot product and magnitudes
        dot_product = OA[0] * OB[0] + OA[1] * OB[1]
        norm_OA = np.sqrt(OA[0] ** 2 + OA[1] ** 2)
        norm_OB = np.sqrt(OB[0] ** 2 + OB[1] ** 2)

        # Calculate the cosine of the angle
        cos_theta = dot_product / (norm_OA * norm_OB)
        
        # Ensure the value is within the valid range for arccos due to floating-point errors
        cos_theta = np.clip(cos_theta, -1, 1)
        
        # Calculate the angle in radians
        theta = np.arccos(cos_theta)
        
        # Calculate the cross product (only the z-component for 2D vectors)
        cross_product = OA[0] * OB[1] - OA[1] * OB[0]
        
        # Adjust the angle based on the direction
        if cross_product < 0:
            theta = 2 * np.pi - theta

        # Convert the angle to degrees
        theta_degrees = np.degrees(theta)

        return theta_degrees
    
    def rotate_point(self, point: tuple, center: tuple, angle: float, origin_size = (612, 408)) -> tuple:
        x, y = point
        cx, cy = center

        x_new = (x - cx) * np.cos(np.radians(angle)) * origin_size[0] - (y - cy) * np.sin(np.radians(angle)) * origin_size[1] + cx * origin_size[0]
        y_new = (x - cx) * np.sin(np.radians(angle)) * origin_size[0] + (y - cy) * np.cos(np.radians(angle)) * origin_size[1] + cy * origin_size[1]

        return x_new / origin_size[0], y_new / origin_size[1]

In [52]:
class PoseScalerHelper:
    def __init__(self, IMPORTANT_LMS):
        self.IMPORTANT_LMS = IMPORTANT_LMS

    def extract_and_recalculate_landmarks(self, pose_landmarks):
        # Prepare data for CSV
        columns_name = [] + [f"{landmark}_{axis}" for landmark in self.IMPORTANT_LMS for axis in ['x', 'y', 'z']]
        columns_value = []

        for id, landmark in enumerate(pose_landmarks):
            if mp_pose.PoseLandmark(id).name.lower() in self.IMPORTANT_LMS:
                # Adjust keypoint coordinates
                columns_value.extend([landmark.x, landmark.y, landmark.z])

        df_key_points = pd.DataFrame([columns_value], columns=columns_name)

        center = self.find_center_of_mass(df_key_points)
        shifting = (0 - center[0], 0 - center[1])

        for landmark in self.IMPORTANT_LMS:
            df_key_points[f"{landmark}_x"] += shifting[0]
            df_key_points[f"{landmark}_y"] += shifting[1]

        # Calculate the scale value based on the distance between nose and hip
        scale_value = self.calculate_scale_value(df_key_points)

        # Scale keypoints
        df_key_points = self.scale_keypoints(df_key_points, scale_value)

        return df_key_points
    
    def find_center_of_mass(self, df_key_points):
        left_hip = (df_key_points["left_hip_x"], df_key_points["left_hip_y"])
        right_hip = (df_key_points["right_hip_x"], df_key_points["right_hip_y"])
        
        # Find the center of the hips
        center_hip = ((left_hip[0] + right_hip[0]) / 2, (left_hip[1] + right_hip[1]) / 2)

        # Get the sholder coordinates
        left_shoulder = (df_key_points["left_shoulder_x"], df_key_points["left_shoulder_y"])
        right_shoulder = (df_key_points["right_shoulder_x"], df_key_points["right_shoulder_y"])

        center_shoulder = ((left_shoulder[0] + right_shoulder[0]) / 2, (left_shoulder[1] + right_shoulder[1]) / 2)

        return (center_hip[0] + center_shoulder[0]) / 2, (center_hip[1] + center_shoulder[1]) / 2

    def calculate_scale_value(self, df_key_points):
        """Calculate the scale value based on nose and hip points."""
        left_hip = (df_key_points["left_hip_x"], df_key_points["left_hip_y"])
        right_hip = (df_key_points["right_hip_x"], df_key_points["right_hip_y"])
        
        # Find the center of the hips
        center_hip = ((left_hip[0] + right_hip[0]) / 2, (left_hip[1] + right_hip[1]) / 2)

        # Get the sholder coordinates
        left_shoulder = (df_key_points["left_shoulder_x"], df_key_points["left_shoulder_y"])
        right_shoulder = (df_key_points["right_shoulder_x"], df_key_points["right_shoulder_y"])

        center_shoulder = ((left_shoulder[0] + right_shoulder[0]) / 2, (left_shoulder[1] + right_shoulder[1]) / 2)

        # Calculate the distance between nose and center of the hips
        distance = np.sqrt((center_hip[0] - center_shoulder[0])**2 + (center_hip[1] - center_shoulder[1])**2)

        # Scale value based on distance
        return 0.5 / distance

    def scale_keypoints(self, df_key_points, scale_value):
        """Scale all key points based on the calculated scale value."""
        for landmark in self.IMPORTANT_LMS:
            df_key_points[f"{landmark}_x"] *= scale_value
            df_key_points[f"{landmark}_y"] *= scale_value
        return df_key_points

In [53]:
def extract_and_recalculate_landmarks(pose_landmarks):
        """
        Tịnh tiến thân người vào giữa bức hình, đồng thời dời lại trục toạ độ
        """
        columns_name = []
        columns_value = []
        for id, landmark in enumerate(pose_landmarks):
            land_mark_name = mp.solutions.pose.PoseLandmark(id).name.lower()
            if land_mark_name not in IMPORTANT_LMS:
                continue
            
            columns_name += [
                f"{ land_mark_name }_x",
                f"{ land_mark_name }_y",
                f"{ land_mark_name }_z",
            ]

            # landmark.x, landmark.y là các giá trị trước khi dịch chuyển gốc toạ độ vào giữa bức hình
            # Do đó khi đưa gốc toạ độ về giữa bức hình thì phải trừ chúng cho 0.5
            columns_value += [
                landmark.x - 0.5,
                landmark.y - 0.5,
                landmark.z,
            ]

        df_key_points = pd.DataFrame([columns_value], columns=columns_name)

        # Lấy tọa độ hông trái và phải
        left_hip = (df_key_points["left_hip_x"], df_key_points["left_hip_y"])
        right_hip = (df_key_points["right_hip_x"], df_key_points["right_hip_y"])

        # Tìm điểm trung tâm của hông
        center_hip = ((left_hip[0] + right_hip[0]) / 2, (left_hip[1] + right_hip[1]) / 2)

        # Lấy tọa độ của mũi
        nose = (df_key_points["nose_x"], df_key_points["nose_y"])

        # Khoảng cách giữa mũi và trung tâm hông
        distance = np.sqrt((center_hip[0] - nose[0])**2 + (center_hip[1] - nose[1])**2)

        # Tính toán scale value
        scale_value = 0.5 / distance

        # **Scale** tất cả các key points
        for column in df_key_points.columns:
            if "_x" in column or "_y" in column:
                df_key_points[column] = df_key_points[column] * scale_value

        return df_key_points

In [54]:
def square_for_image(image: np.array):
        # Nếu hình ảnh là một đối tượng NumPy (cv2 image)
        if isinstance(image, np.ndarray):
            # Chuyển đổi từ định dạng cv2 (NumPy array) sang PIL image để xử lý
            image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        else:
            # Nếu hình ảnh đã ở định dạng PIL thì không cần chuyển đổi
            image_pil = image

        # Lấy kích thước ảnh gốc
        original_size = image_pil.size

        # Kích thước mới sẽ là kích thước lớn nhất giữa chiều rộng và chiều cao
        max_width = max(original_size)
        new_size = (max_width, max_width)

        # Tạo một ảnh mới với nền đen (kích thước vuông)
        new_image = Image.new("RGB", new_size, (0, 0, 0))

        # Dán ảnh gốc vào ảnh mới với khoảng trống màu đen
        new_image.paste(image_pil, 
                        ((max_width - original_size[0]) // 2, (max_width - original_size[1]) // 2))

        # Chuyển ảnh mới từ định dạng PIL sang định dạng NumPy (cv2)
        new_image_cv2 = cv2.cvtColor(np.array(new_image), cv2.COLOR_RGB2BGR)

        return new_image_cv2, new_size

In [55]:
def process_image(image):
        """Load and pre-process the image."""
        image, new_size = square_for_image(image)
        return cv2.cvtColor(image, cv2.COLOR_BGR2RGB), new_size

In [56]:
pose_scaler_helper = PoseScalerHelper(IMPORTANT_LMS)
pose_rotation_helper = PoseRotationHelper()

In [None]:
def predict(image, prediction_probability_threshold=0.5) -> int:
        with mp_pose.Pose(
            static_image_mode=True, model_complexity=1, smooth_landmarks=True
        ) as pose:
            
            image, new_size = process_image(image)
            results = pose.process(image)

            if not results.pose_landmarks:
                raise Exception("No pose landmarks detected")

            image.flags.writeable = True

            # Cần khôi phục lại màu gốc của ảnh
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # Get landmarks
            try:
                results.pose_landmarks = pose_rotation_helper.rotate_keypoints(results.pose_landmarks, new_size)

                # draw_landmarks(mp_drawing, mp_pose, image, results.pose_landmarks)

                key_points_df = pose_scaler_helper.extract_and_recalculate_landmarks(results.pose_landmarks.landmark)

                # Convert DataFrame to numpy array
                key_points = key_points_df.values.reshape(1, -1)  # Chuyển DataFrame thành mảng 2D với đúng kích thước

                # Scale input trước khi dự đoán
                X = input_scaler.transform(key_points)

                # Dự đoán
                predicted_class = RF_model.predict(X)[0]  # Dự đoán dựa trên mô hình và input đã được scale

                return int(predicted_class)
            except Exception as e:
                raise Exception(f"Error when predicting: {e}")

In [58]:
image = cv2.imread("../../data/3_procumbent/10.jpg")
predict(image)

[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s


2

In [62]:
for file in os.listdir("../../data/2_side"):
    try:
        image = cv2.imread(f"../../data/2_side/{file}")
        result = predict(image)
        if result != 1:
            print(f"File: {file} - Result: {result}")
            with open("result.txt", "a") as f:
                f.write(f"{file} {result}\n")
    except Exception as e:
        print(f"Error when processing file {file}: {e}")

[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Do

Error when processing file 132.jpg: No pose landmarks detected


[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Do

Error when processing file 52.jpg: No pose landmarks detected


[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Do

Error when processing file 664.jpg: No pose landmarks detected


[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Do

File: 78.jpg - Result: 0


[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 199 tasks      | elapsed:    0.0s


Error when processing file 99.jpg: No pose landmarks detected
