##### https://github.com/itzThillaiC/AI-Fitness-trainer

In [None]:
import mediapipe as mp
import argparse
import pandas as pd
import numpy as np
import cv2

#### Utils

In [None]:
mp_pose = mp.solutions.pose

# 임의의 관절 각도 계산
def calculate_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) -\
              np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180.0:
        angle = 360 - angle

    return angle
# 각 landmark 좌표 추출
def detection_body_part(landmarks, body_part_name):
    return [
        landmarks[mp_pose.PoseLandmark[body_part_name].value].x,
        landmarks[mp_pose.PoseLandmark[body_part_name].value].y,
        landmarks[mp_pose.PoseLandmark[body_part_name].value].visibility
    ]

def detection_body_parts(landmarks):
    body_parts = pd.DataFrame(columns=["body_part", "x", "y"])

    for i, lndmrk in enumerate(mp_pose.PoseLandmark):
        lndmrk = str(lndmrk).split(".")[1]
        cord = detection_body_part(landmarks, lndmrk)
        body_parts.loc[i] = lndmrk, cord[0], cord[1]

    return body_parts

def score_table(exercise, frame , counter, status):
    cv2.putText(frame, "Activity : " + exercise.replace("_", " "),
                (600, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2,
                cv2.LINE_AA)
    cv2.putText(frame, "Counter : " + str(counter), (600, 100),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, cv2.LINE_AA)
    cv2.putText(frame, "Status : " + str(status), (600, 135),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, cv2.LINE_AA)
    return frame

def score_table_plus(exercise, frame , counter, good, bad, status):
    cv2.putText(frame, "Activity : " + exercise.replace("_", " "),
                (570, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2,
                cv2.LINE_AA)
    cv2.putText(frame, "Counter : " + str(counter), (570, 100),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, cv2.LINE_AA)
    cv2.putText(frame, "Good : " + str(good) + "  Bad : " + str(bad), (570, 135),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, cv2.LINE_AA)
    cv2.putText(frame, "Status : " + str(status), (570, 170),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, cv2.LINE_AA)
    return frame

#### body part angle

In [None]:
# 각 관절마다의 각도 계산 함수 정의
class BodyPartAngle:
    def __init__(self, landmarks):
        self.landmarks = landmarks

    def angle_of_the_left_arm(self):
        l_shoulder = detection_body_part(self.landmarks, "LEFT_SHOULDER")
        l_elbow = detection_body_part(self.landmarks, "LEFT_ELBOW")
        l_wrist = detection_body_part(self.landmarks, "LEFT_WRIST")
        return calculate_angle(l_shoulder, l_elbow, l_wrist)

    def angle_of_the_right_arm(self):
        r_shoulder = detection_body_part(self.landmarks, "RIGHT_SHOULDER")
        r_elbow = detection_body_part(self.landmarks, "RIGHT_ELBOW")
        r_wrist = detection_body_part(self.landmarks, "RIGHT_WRIST")
        return calculate_angle(r_shoulder, r_elbow, r_wrist)

    def angle_of_the_left_leg(self):
        l_hip = detection_body_part(self.landmarks, "LEFT_HIP")
        l_knee = detection_body_part(self.landmarks, "LEFT_KNEE")
        l_ankle = detection_body_part(self.landmarks, "LEFT_ANKLE")
        return calculate_angle(l_hip, l_knee, l_ankle)

    def angle_of_the_right_leg(self):
        r_hip = detection_body_part(self.landmarks, "RIGHT_HIP")
        r_knee = detection_body_part(self.landmarks, "RIGHT_KNEE")
        r_ankle = detection_body_part(self.landmarks, "RIGHT_ANKLE")
        return calculate_angle(r_hip, r_knee, r_ankle)

    def angle_of_the_neck(self):
        r_shoulder = detection_body_part(self.landmarks, "RIGHT_SHOULDER")
        l_shoulder = detection_body_part(self.landmarks, "LEFT_SHOULDER")
        r_mouth = detection_body_part(self.landmarks, "MOUTH_RIGHT")
        l_mouth = detection_body_part(self.landmarks, "MOUTH_LEFT")
        r_hip = detection_body_part(self.landmarks, "RIGHT_HIP")
        l_hip = detection_body_part(self.landmarks, "LEFT_HIP")

        shoulder_avg = [(r_shoulder[0] + l_shoulder[0]) / 2,
                        (r_shoulder[1] + l_shoulder[1]) / 2]
        mouth_avg = [(r_mouth[0] + l_mouth[0]) / 2,
                     (r_mouth[1] + l_mouth[1]) / 2]
        hip_avg = [(r_hip[0] + l_hip[0]) / 2, (r_hip[1] + l_hip[1]) / 2]

        return abs(180 - calculate_angle(mouth_avg, shoulder_avg, hip_avg))

    def angle_of_the_abdomen(self):
        # calculate angle of the avg shoulder
        r_shoulder = detection_body_part(self.landmarks, "RIGHT_SHOULDER")
        l_shoulder = detection_body_part(self.landmarks, "LEFT_SHOULDER")
        shoulder_avg = [(r_shoulder[0] + l_shoulder[0]) / 2,
                        (r_shoulder[1] + l_shoulder[1]) / 2]

        # calculate angle of the avg hip
        r_hip = detection_body_part(self.landmarks, "RIGHT_HIP")
        l_hip = detection_body_part(self.landmarks, "LEFT_HIP")
        hip_avg = [(r_hip[0] + l_hip[0]) / 2, (r_hip[1] + l_hip[1]) / 2]

        # calculate angle of the avg knee
        r_knee = detection_body_part(self.landmarks, "RIGHT_KNEE")
        l_knee = detection_body_part(self.landmarks, "LEFT_KNEE")
        knee_avg = [(r_knee[0] + l_knee[0]) / 2, (r_knee[1] + l_knee[1]) / 2]

        return calculate_angle(shoulder_avg, hip_avg, knee_avg)

#### types_of_exercise

In [None]:
# 관절 각도의 변화로 카운트
# 시작자세 True
# 카운트 올라가는 시점은 무조건 False -> True
class TypeOfExercise(BodyPartAngle):

    def __init__(self, landmarks):
        super().__init__(landmarks)

    # 시작자세 수축 -> 카운트 올라가는 시점 True
    def push_up(self, counter, status):
        left_arm_angle = self.angle_of_the_left_arm()
        right_arm_angle = self.angle_of_the_right_arm()
        avg_arm_angle = (left_arm_angle + right_arm_angle) // 2
        if status:
            if avg_arm_angle < 100:
                status = False
        else:
            if avg_arm_angle > 160:
                counter += 1
                status = True

        return [counter, status]

    # 시작자세 이완 -> 카운트 올라가는 시점 True
    def barbell_low(self, counter, status):
        left_arm_angle = self.angle_of_the_left_arm()
        right_arm_angle = self.angle_of_the_right_arm()
        avg_arm_angle = (left_arm_angle + right_arm_angle) // 2
        if status:
            if avg_arm_angle < 120:
                status = False
        else:
            if avg_arm_angle > 160:
                counter += 1
                status = True

        return [counter, status]

    # 시작자세 이완 -> 카운트 올라가는 시점 True
    def overhead_press(self, counter, status):
        left_arm_angle = self.angle_of_the_left_arm()
        right_arm_angle = self.angle_of_the_right_arm()
        avg_arm_angle = (left_arm_angle + right_arm_angle) // 2
        if status:
            if avg_arm_angle > 160:
                status = False
        else:
            if avg_arm_angle < 100:
                counter += 1
                status = True

        return [counter, status]

    # 시작자세 이완 -> 카운트 올라가는 시점 True
    # 서서 시작하여 바벨 잡으려 숙임 & 시작부터 바벨 잡고 있는것 경우의 수 나눠야함 (일단 시작부터 잡고 있는 걸로 짜놓음)********** test 용으로 부적합
    def dead_lift(self, counter, status):
        angle = self.angle_of_the_abdomen()
        if status:
            if angle > 160:
                status = False
        else:
            if angle < 120:
                counter += 1
                status = True

        return [counter, status]

    # 시작자세 수축 -> 카운트 올라가는 시점 True
    def squat(self, counter, status):
        left_leg_angle = self.angle_of_the_left_leg()
        right_leg_angle = self.angle_of_the_right_leg()
        avg_leg_angle = (left_leg_angle + right_leg_angle) // 2
        if status:
            if avg_leg_angle < 100:
                status = False
        else:
            if avg_leg_angle > 160:
                counter += 1
                status = True

        return [counter, status]

    def dips(self, counter, status):
        left_arm_angle = self.angle_of_the_left_arm()
        right_arm_angle = self.angle_of_the_left_arm()
        avg_arm_angle = (left_arm_angle + right_arm_angle) // 2
        if status:
            if avg_arm_angle < 120:
                status = False
        else:
            if avg_arm_angle > 160:
                counter += 1
                status = True

        return [counter, status]

    def pull_up(self, counter, status):
        left_arm_angle = self.angle_of_the_left_arm()
        right_arm_angle = self.angle_of_the_left_arm()
        avg_arm_angle = (left_arm_angle + right_arm_angle) // 2
        if status:
            if avg_arm_angle < 100:
                status = False
        else:
            if avg_arm_angle > 160:
                counter += 1
                status = True

        return [counter, status]

    def walk(self, counter, status):
        right_knee = detection_body_part(self.landmarks, "RIGHT_KNEE")
        left_knee = detection_body_part(self.landmarks, "LEFT_KNEE")
        if status:
            if left_knee[0] > right_knee[0]:
                counter += 1
                status = False
        else:
            if left_knee[0] < right_knee[0]:
                counter += 1
                status = True

        return [counter, status]

    def sit_up(self, counter, status):
        angle = self.angle_of_the_abdomen()
        if status:
            if angle < 55:
                counter += 1
                status = False
        else:
            if angle > 105:
                status = True

        return [counter, status]

    def calculate_exercise(self, exercise_type, counter, status):
        if exercise_type == "push-up":
            counter, status = TypeOfExercise(self.landmarks).push_up(
                counter, status)
        elif exercise_type == "dips":
            counter, status = TypeOfExercise(self.landmarks).dips(
                counter, status)
        elif exercise_type == "pull-up":
            counter, status = TypeOfExercise(self.landmarks).pull_up(
                counter, status)
        elif exercise_type == "overhead-press":
            counter, status = TypeOfExercise(self.landmarks).overhead_press(
                counter, status)
        elif exercise_type == "barbell-low":
            counter, status = TypeOfExercise(self.landmarks).barbell_low(
                counter, status)
        elif exercise_type == "dead-lift":
            counter, status = TypeOfExercise(self.landmarks).dead_lift(
                counter, status)
        elif exercise_type == "squat":
            counter, status = TypeOfExercise(self.landmarks).squat(
                counter, status)
        elif exercise_type == "walk":
            counter, status = TypeOfExercise(self.landmarks).walk(
                counter, status)
        elif exercise_type == "sit-up":
            counter, status = TypeOfExercise(self.landmarks).sit_up(
                counter, status)

        return [counter, status]

#### 정확도 레이블 테스트용

In [None]:
test_label = [1, 0, 1, 1, 0]

#### 분류 모델 결과값 테스트용

In [None]:
exercise_type = "squat"

#### 전체 통합 코드

#### 분류 & 정확도 모델로부터 결과값 받아옴 -> skelton only + 운동정보 문구 동영상 저장

In [None]:
# video 파일 불러오기
video_source = "exercise/squat.mp4"

# skeleton only video 파일 저장하기
skeleton_output_path = 'skeleton/squat_plus.mp4'

cap = cv2.VideoCapture(video_source)

# 빈 프레임 규격 설정 (흰 배경)
frame_width = 800
frame_height = 480

# VideoWriter 설정
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(skeleton_output_path, fourcc, 30, (frame_width, frame_height))

# status 변화를 저장할 리스트
status_list = []

# 프레임 수 계산
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# setup mediapipe
mp_drawing_styles = mp.solutions.drawing_styles
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

with mp_pose.Pose(min_detection_confidence=0.5,
                  min_tracking_confidence=0.5) as pose:

    prev_status = True    # status 바뀔 때 동영상 시간 초 표시 위함
    prev_frame_time = 0

    counter = 0  # movement of exercise
    status = True  # state of move

    # 정확도 측정용 카운트
    good = 0
    bad = 0
    acc_i = 0

    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break

        # 불러온 동영상 frame setting
        frame = cv2.resize(frame, (frame_width, frame_height), interpolation=cv2.INTER_AREA)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # mediapipe 적용
        results = pose.process(frame)

        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        try:
            # count 계산
            landmarks = results.pose_landmarks.landmark
            counter, status = TypeOfExercise(landmarks).calculate_exercise(
                exercise_type, counter, status)

            # 정확도 label 1이면 good 이라고 설정
            if prev_status == False and status == True:
                if test_label[acc_i] == 1:
                    good += 1
                    acc_i += 1
                else:
                    bad += 1
                    acc_i += 1

            # prev_status = status

        except Exception as e:
            print(f'Error in row: {e}')

        # status 변화를 저장
        status_list.append(status)

        # 빈 프레임 생성 (흰 배경)
        blank_frame = np.ones((frame_height, frame_width, 3), np.uint8) * 255

        # RGB로 변환 (없으면 skeleton 품질 안좋아짐)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Pose 적용
        results = pose.process(frame_rgb)

        if results.pose_landmarks is not None:

            # 화면에 정보 표시
            blank_frame = score_table_plus(exercise_type, blank_frame, counter, good, bad, status)

            # 스켈레톤 그리기
            mp_drawing.draw_landmarks(
                blank_frame,
                results.pose_landmarks,
                mp_pose.POSE_CONNECTIONS,
                landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
            )

            # output 영상에 추가
            out.write(blank_frame)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    out.release()
    cap.release()
    cv2.destroyAllWindows()