In [2]:
import os
import glob
import argparse
import cv2
import csv
from time import time
import numpy as np
import mediapipe as mp
from pose_classification_utils import (
    FullBodyPoseEmbedder,
    PoseClassifier,
    EMADictSmoothing,
    RepetitionCounter
)
from matplotlib import pyplot as plt
from scipy.spatial.distance import cosine

In [3]:
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose

In [4]:
def show_image(img, figsize=(10, 10)):
    """Shows output PIL image."""
    plt.figure(figsize=figsize)
    plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    plt.show()

In [5]:
class PoseSample(object):

    def __init__(self, name, landmarks, class_name, embedding):
        self.name = name
        self.landmarks = landmarks
        self.class_name = class_name

        self.embedding = embedding

In [6]:
class PoseInterpreter(object):
    """Interpret the pose for accuracy"""

    def __init__(self,
                 pose_samples_folder,
                 pose_embedder,
                 file_extension='csv',
                 file_separator=',',
                 n_landmarks=33,
                 n_dimensions=3,
                 top_n_by_max_distance=30,
                 top_n_by_mean_distance=10,
                 axes_weights=(1., 1., 0.2)):

        self._pose_embedder = pose_embedder
        self._n_landmarks = n_landmarks
        self._n_dimensions = n_dimensions
        self._top_n_by_max_distance = top_n_by_max_distance
        self._top_n_by_mean_distance = top_n_by_mean_distance
        self._axes_weights = axes_weights

        self._pose_samples = self._load_pose_samples(pose_samples_folder,
                                                     file_extension,
                                                     file_separator,
                                                     n_landmarks,
                                                     n_dimensions,
                                                     pose_embedder)

    def _load_pose_samples(self,
                           pose_samples_folder,
                           file_extension,
                           file_separator,
                           n_landmarks,
                           n_dimensions,
                           pose_embedder):
        """Loads pose samples from a given folder.

        Required folder structure:
          neutral_standing.csv
          pushups_down.csv
          pushups_up.csv
          squats_down.csv
          ...

        Required CSV structure:
          sample_00001,x1,y1,z1,x2,y2,z2,....
          sample_00002,x1,y1,z1,x2,y2,z2,....
          ...
        """
        # Each file in the folder represents one pose class.
        file_names = [name for name in os.listdir(
            pose_samples_folder) if name.endswith(file_extension)]

        pose_samples = []
        for file_name in file_names:
            # Use file name as pose class name.
            class_name = file_name[:-(len(file_extension) + 1)]

            # Parse CSV.
            with open(os.path.join(pose_samples_folder, file_name)) as csv_file:
                csv_reader = csv.reader(csv_file, delimiter=file_separator)
                for row in csv_reader:
                    if len(row) == 0:
                        continue
                    assert len(row) == n_landmarks * n_dimensions + \
                        1, 'Wrong number of values: {}'.format(len(row))
                    landmarks = np.array(row[1:], np.float32).reshape(
                        [n_landmarks, n_dimensions])
                    pose_samples.append(PoseSample(
                        name=row[0],
                        landmarks=landmarks,
                        class_name=class_name,
                        embedding=pose_embedder(landmarks),
                    ))

        return pose_samples

    def __call__(self, pose_landmarks):
        """Classifies given pose.

        Classification is done in two stages:
          * First we pick top-N samples by MAX distance. It allows to remove samples
            that are almost the same as given pose, but has few joints bent in the
            other direction.
          * Then we pick top-N samples by MEAN distance. After outliers are removed
            on a previous step, we can pick samples that are closes on average.

        Args:
          pose_landmarks: NumPy array with 3D landmarks of shape (N, 3).

        Returns:
          Dictionary with count of nearest pose samples from the database. Sample:
            {
              'pushups_down': 8,
              'pushups_up': 2,
            }
        """
        # Check that provided and target poses have the same shape.
        assert pose_landmarks.shape == (
            self._n_landmarks, self._n_dimensions), 'Unexpected shape: {}'.format(pose_landmarks.shape)

        # Get given pose embedding.
        pose_embedding = self._pose_embedder(pose_landmarks)
        flipped_pose_embedding = self._pose_embedder(
            pose_landmarks * np.array([-1, 1, 1]))

        # Filter by max distance.
        #
        # That helps to remove outliers - poses that are almost the same as the
        # given one, but has one joint bent into another direction and actually
        # represnt a different pose class.
        max_dist_heap = []
        for sample_idx, sample in enumerate(self._pose_samples):
            max_dist = min(
                np.max(np.abs(sample.embedding - pose_embedding)
                       * self._axes_weights),
                np.max(np.abs(sample.embedding - flipped_pose_embedding)
                       * self._axes_weights),
            )
            max_dist_heap.append([max_dist, sample_idx])

        max_dist_heap = sorted(max_dist_heap, key=lambda x: x[0])
        max_dist_heap = max_dist_heap[:self._top_n_by_max_distance]

        # Filter by mean distance.
        #
        # After removing outliers we can find the nearest pose by mean distance.
        mean_dist_heap = []
        for _, sample_idx in max_dist_heap:
            sample = self._pose_samples[sample_idx]
            mean_dist = min(
                np.mean(np.abs(sample.embedding - pose_embedding)
                        * self._axes_weights),
                np.mean(np.abs(sample.embedding - flipped_pose_embedding)
                        * self._axes_weights),
            )
            mean_dist_heap.append([mean_dist, sample_idx])

        mean_dist_heap = sorted(mean_dist_heap, key=lambda x: x[0])
        mean_dist_heap = mean_dist_heap[:self._top_n_by_mean_distance]

        # Collect results into map: (class_name -> n_samples)
        class_names = [
            self._pose_samples[sample_idx].class_name for _, sample_idx in mean_dist_heap]
        result = {class_name: class_names.count(
            class_name) for class_name in set(class_names)}

        return result, mean_dist_heap

In [7]:
exercise = 'squats'
pose_samples_folder = f'fitness_data/processed_train/{exercise}/csvs_out'

In [8]:
pose_embedder = FullBodyPoseEmbedder()

In [9]:
pose_interpreter = PoseInterpreter(
        pose_samples_folder=pose_samples_folder,
        pose_embedder=pose_embedder,
        top_n_by_max_distance=30,
        top_n_by_mean_distance=10)

In [10]:
wrong = cv2.imread('test_images/wrong_squat.jpg')
right = cv2.imread('test_images/right_squat.jpg')

In [225]:
def get_landmarks(image, return_array=True):
    with mp_pose.Pose(
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5,
            model_complexity=1) as pose:
    
        input_frame = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        result = pose.process(image=input_frame)
        pose_landmarks = result.pose_landmarks

        if return_array:

            frame_height, frame_width = input_frame.shape[0], input_frame.shape[1]
            visibility = np.mean([lmk.visibility for lmk in pose_landmarks.landmark])
            pose_landmarks = np.array([[lmk.x * frame_width, lmk.y * frame_height, lmk.z * frame_width]
                                        for lmk in pose_landmarks.landmark], dtype=np.float32)

        return pose_landmarks, result

In [233]:
os.path.split("/python/fitness_data/pushups/push_down.csv")

('/python/fitness_data/pushups', 'push_down.csv')

In [226]:
wrong_landmarks = get_landmarks(wrong)
right_landmarks = get_landmarks(right, False)

In [231]:
wrong_landmarks[1].pose_landmarks

landmark {
  x: 0.4124770164489746
  y: 0.31489765644073486
  z: -0.320087194442749
  visibility: 0.9999226331710815
}
landmark {
  x: 0.41924816370010376
  y: 0.30059146881103516
  z: -0.28784626722335815
  visibility: 0.9999055862426758
}
landmark {
  x: 0.422321081161499
  y: 0.30218419432640076
  z: -0.2880152761936188
  visibility: 0.9999022483825684
}
landmark {
  x: 0.4250374436378479
  y: 0.3034431040287018
  z: -0.2882159650325775
  visibility: 0.9998973608016968
}
landmark {
  x: 0.4064311385154724
  y: 0.2938883602619171
  z: -0.3114550709724426
  visibility: 0.9998748302459717
}
landmark {
  x: 0.40081554651260376
  y: 0.29142987728118896
  z: -0.3114817440509796
  visibility: 0.9998674392700195
}
landmark {
  x: 0.3950314521789551
  y: 0.2893002927303314
  z: -0.31143712997436523
  visibility: 0.9998494386672974
}
landmark {
  x: 0.42086195945739746
  y: 0.31067144870758057
  z: -0.1298212707042694
  visibility: 0.9998821020126343
}
landmark {
  x: 0.37866589426994324
  y:

In [13]:
sample_landmarks = [s.landmarks for s in pose_interpreter._pose_samples]

In [16]:
guide = [
    {
        'joints': [
            ['left_hip', 'right_hip'],
            ['left_ankle', 'right_ankle']
        ], 
        'criteria': 'distance'
    }  
]

In [17]:
pose_embedder._get_average_by_names(wrong_landmarks, guide[0]['joints'][0][0], guide[0]['joints'][0][1])

array([2.0829297e+02, 3.1005829e+02, 5.7956696e-02], dtype=float32)

In [18]:
pose_embedder._get_average_by_names(wrong_landmarks, guide[0]['joints'][0][0], guide[0]['joints'][0][1])

array([2.0829297e+02, 3.1005829e+02, 5.7956696e-02], dtype=float32)

In [19]:
pose_embedder._get_distance(
    pose_embedder._get_average_by_names(wrong_landmarks, guide[0]['joints'][0][0], guide[0]['joints'][0][1]),
    pose_embedder._get_average_by_names(wrong_landmarks, guide[0]['joints'][1][0], guide[0]['joints'][1][1]),
)

array([ 40.76059  , 139.75629  ,  10.3689995], dtype=float32)

In [20]:
pose_embedder._get_distance(
    pose_embedder._get_average_by_names(right_landmarks, guide[0]['joints'][0][0], guide[0]['joints'][0][1]),
    pose_embedder._get_average_by_names(right_landmarks, guide[0]['joints'][1][0], guide[0]['joints'][1][1]),
)

TypeError: 'NormalizedLandmarkList' object is not subscriptable

In [25]:
def get_streaming_landmarks(frames, return_array=True):
    video_pose_landmarks = []
    with mp_pose.Pose(
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5,
            model_complexity=1) as pose:

        for image in frames:

            input_frame = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            result = pose.process(image=input_frame)
            pose_landmarks = result.pose_landmarks

            if return_array:
                frame_height, frame_width = input_frame.shape[0], input_frame.shape[1]
                pose_landmarks = np.array([[lmk.x * frame_width, lmk.y * frame_height, lmk.z * frame_width]
                                            for lmk in pose_landmarks.landmark], dtype=np.float32)

            video_pose_landmarks.append(pose_landmarks)

    return video_pose_landmarks

In [26]:
exercise = 'squats'
state = 'squats_down'
ref_dir = f'fitness_data/videos/{exercise}/{state}'

In [27]:
image_paths = glob.glob(f"{ref_dir}/*.jpg")
image_paths = sorted(image_paths, key=lambda x: int(x.split('_')[-1].split('.')[0]))

In [28]:
ref_images = [cv2.imread(img) for img in image_paths]

In [29]:
ref_landmarks = get_streaming_landmarks(ref_images)

In [32]:
wrong_landmarks = get_landmarks(wrong)
right_landmarks = get_landmarks(right)

In [33]:
wrong_embedding = pose_embedder(wrong_landmarks)
right_embedding = pose_embedder(wrong_landmarks)

In [34]:
wrong_embedding2 = pose_embedder(ref_landmarks[-1])

In [35]:
flipped_wrong_embedding = pose_embedder(
            wrong_landmarks * np.array([-1, 1, 1]))

flipped_wrong_embedding2 = pose_embedder(
            ref_landmarks[-1] * np.array([-1, 1, 1]))

In [57]:
sample_embeddings = [s.embedding for s in pose_interpreter._pose_samples]

In [66]:
sample_embeddings[0].shape

(23, 3)

In [94]:
p1, p2, p3 = right_landmarks[[23, 25, 27]]

In [109]:
def compute_joint_angle(landmarks, joints):
    assert len(joints) == 3, 'Angle can only be calculated between 3 points'

    points = []
    for j in joints:
        p = landmarks[pose_embedder._landmark_names.index(j)]
        points.append(p)
    
    p1, p2, p3 = points

    angle = calculate_angle(p1, p2, p3)
    
    return angle

In [130]:
def check_squats(pose_state: str, landmarks: np.array):
    '''Criteria for Squats
    -> Squats Down Knee Angle is less than 45
    -> Squats Up is skipped
    '''
    if pose_state == 'squats_down':
        knee_angle = np.mean([
            compute_joint_angle(landmarks, ['left_hip', 'left_knee', 'left_ankle']),
            compute_joint_angle(landmarks, ['right_hip', 'right_knee', 'right_ankle'])
        ])
        print(knee_angle)

        if knee_angle <= 46:
            return True, 'Good Job'
        elif knee_angle > 46:
            return False, 'Bend your knees further'
    else:
        return True, None

In [132]:
a = {'up': 8, 'down': 2}

In [135]:
sorted(a.items(), key=lambda x: x[1], reverse=True)[0][0]

'up'

In [139]:
np.mean([2, 3]).round()

2.0

In [127]:
pose_critic = {
    'squats': check_squats
}

In [129]:
pose_critic['squats']('squats_up', right_landmarks)

37.930800000000005


(True, None)

In [103]:
compute_joint_angle(ref_landmarks[-1], ['left_hip', 'left_knee', 'left_ankle'])

47.8298

In [104]:
compute_joint_angle(ref_landmarks[-1], ['right_hip', 'right_knee', 'right_ankle'])

42.5727

In [102]:
(40 + 35) / 2

37.5

In [105]:
(47 + 42) / 2

44.5

In [36]:
right_embedding[11:13]

array([[ 14.567765,  41.479027,  23.049997],
       [ 13.665173,  55.32354 , -15.86788 ]], dtype=float32)

In [37]:
wrong_embedding2[11:13]

array([[ 21.088562,  43.21163 , -24.614782],
       [ 17.80828 ,  52.5922  , -55.031704]], dtype=float32)

In [54]:
cosine(right_embedding[11:13].reshape(-1), right_embedding[11:13].reshape(-1))

0.0

In [230]:
mean_dist = min(
                np.mean(np.abs((right_embedding[11:13] - wrong_embedding[11:13])
                        )),
                np.mean(np.abs((right_embedding[11:13] - flipped_wrong_embedding[11:13])
                        )),
            )

In [231]:
mean_dist

7.9725227

In [210]:
min(pose_interpreter(ref_landmarks[-1])[1], key=lambda x: x[0])

[4.365752372188844, 8]

In [211]:
min(pose_interpreter(wrong_landmarks)[1], key=lambda x: x[0])

[4.995928324823795, 3]

In [165]:
pdr = cv2.imread('test_images/push_down.jpg')
puw = cv2.imread('test_images/push_up_wrong.jpg')

In [166]:
pdr_landmarks = get_landmarks(pdr)
puw_landmarks = get_landmarks(puw)

In [149]:
landmark_names = [
    'nose',
    'left_eye_inner', 'left_eye', 'left_eye_outer',
    'right_eye_inner', 'right_eye', 'right_eye_outer',
    'left_ear', 'right_ear',
    'mouth_left', 'mouth_right',
    'left_shoulder', 'right_shoulder',
    'left_elbow', 'right_elbow',
    'left_wrist', 'right_wrist',
    'left_pinky_1', 'right_pinky_1',
    'left_index_1', 'right_index_1',
    'left_thumb_2', 'right_thumb_2',
    'left_hip', 'right_hip',
    'left_knee', 'right_knee',
    'left_ankle', 'right_ankle',
    'left_heel', 'right_heel',
    'left_foot_index', 'right_foot_index',
]

In [176]:
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_percentage_error

In [222]:
def check_body_straight(landmarks):
    torso_joints = ['left_shoulder', 
                    'right_shoulder', 
                    'left_hip', 
                    'right_hip', 
                    'left_knee', 
                    'right_knee', 
                    'left_ankle', 
                    'right_ankle']
                    
    points = np.array([landmarks[landmark_names.index(j)] for j in torso_joints])

    line_model = LinearRegression()
    line_model.fit(points[:, :1], points[:, 1]) # use X and Y only for now

    return mean_absolute_percentage_error(points[:, 1], line_model.predict(points[:, :1]))

In [223]:
def check_pushup(img_path):
    image = cv2.imread(img_path)
    landmarks = get_landmarks(image)
    return check_body_straight(landmarks)

In [224]:
check_pushup('test_images/push_up_right.jpg')

0.020615766

LinearRegression()

0.027879482

In [148]:
pdr_landmarks

array([[ 287.50854 ,  423.53262 , -125.35507 ],
       [ 280.6787  ,  424.20044 , -147.28148 ],
       [ 281.39294 ,  424.55844 , -147.29375 ],
       [ 282.1515  ,  424.93127 , -147.34798 ],
       [ 277.65582 ,  421.40918 , -126.35826 ],
       [ 276.36295 ,  419.70807 , -126.35773 ],
       [ 275.19223 ,  418.01562 , -126.362045],
       [ 286.667   ,  417.3192  , -196.14108 ],
       [ 278.26483 ,  406.61313 , -103.23039 ],
       [ 295.76843 ,  419.1228  , -137.17993 ],
       [ 293.51373 ,  415.71304 , -110.369995],
       [ 341.2537  ,  395.54895 , -198.71936 ],
       [ 303.8901  ,  358.1392  ,  -40.97414 ],
       [ 410.18082 ,  412.12436 , -206.94943 ],
       [ 319.69418 ,  367.3406  ,   68.60828 ],
       [ 385.31116 ,  473.64368 , -196.00012 ],
       [ 295.9261  ,  406.83948 ,  150.81403 ],
       [ 380.2906  ,  484.17142 , -217.62086 ],
       [ 279.8606  ,  406.5147  ,  159.4475  ],
       [ 367.31592 ,  480.65408 , -209.86296 ],
       [ 283.07175 ,  412.0782  ,  145.2