In [10]:
import cv2
import numpy as np
import mediapipe as mp
import pandas as pd
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
import os
import re

In [11]:
# Load Model

model_path = r'C:\Users\kyles\Desktop\CS5100\Final_Project\pose_landmarker_heavy.task'
min_pose_detection_confidence = 0.5
min_pose_presence_confidence = 0.5
min_tracking_confidence = 0.5
num_poses = 1
last_timestamp_ms = 0
labels = [
    'Right Knee',
    'Left Knee',
    'Right Ankle',
    'Left Ankle',
    'Right Heel',
    'Left Heel',
    'Right Foot Index',
    'Left Foot Index'
]


In [12]:
# input model parameters 
base_options = python.BaseOptions(model_asset_path=model_path)
options = vision.PoseLandmarkerOptions(base_options=base_options,
                                       running_mode=vision.RunningMode.VIDEO,
                                       num_poses=num_poses,
                                       min_pose_detection_confidence=min_pose_detection_confidence,
                                       min_pose_presence_confidence=min_pose_presence_confidence,
                                       min_tracking_confidence=min_tracking_confidence,
                                       output_segmentation_masks=False)

In [13]:
# Functions to draw pose data

def draw_skeleton(detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    blank = np.zeros((500, 800, 3), np.uint8)

    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]

        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(
                x=landmark.x,
                y=landmark.y,
                z=landmark.z) for landmark in pose_landmarks
        ])
        mp.solutions.drawing_utils.draw_landmarks(
            blank,
            pose_landmarks_proto,
            mp.solutions.pose.POSE_CONNECTIONS,
            mp.solutions.drawing_styles.get_default_pose_landmarks_style())
    return blank


def draw_landmarks_on_image(rgb_image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    annotated_copy = np.copy(rgb_image)

    # Loop through the detected poses to visualize.
    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]

        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(
                x=landmark.x,
                y=landmark.y,
                z=landmark.z) for landmark in pose_landmarks
        ])
        mp.solutions.drawing_utils.draw_landmarks(
            annotated_copy,
            pose_landmarks_proto,
            mp.solutions.pose.POSE_CONNECTIONS,
            mp.solutions.drawing_styles.get_default_pose_landmarks_style())
    return annotated_copy

In [14]:
# Functions for identifying good poses, extracting and reorganizing their data, and converting to DataFrame

def test_image_range(data, x, y, save):
    test_range = data[x:y]
    path = r'C:\Users\kyles\Desktop\CS5100\Final_Project\skeleton_data'
    for i in range(len(test_range)):
        # out_file = shortened + '_' + str(i) + '.jpg'
        test_output = draw_skeleton(test_range[i])
        if save:
            #cv2.imwrite(os.path.join(path, out_file), test_output)
            continue
        while True:
            cv2.imshow('result', test_output)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    cv2.destroyAllWindows()


def get_lower_half_data(pose_data, x, y):
    lower_half_data = []
    trimmed_data = pose_data[x:y]

    for i in range(len(trimmed_data)):
        for each in trimmed_data[i].pose_landmarks:
            lower_half_data.append(each[23::])
    return lower_half_data


def create_output(data_labels, data):
    output = []
    line = []

    for i in range(len(data)):
        for j in range(len(data_labels)):
            data_point = [data_labels[j], data[i][j]]
            line.append(data_point)
        output.append(line)
        line = []

    return output


def create_df(dataframe, columns):
    pattern = re.compile(r"\['(.*?)', \(x=(.*?), y=(.*?), z=(.*?), visibility=(.*?), presence=(.*?)\)\]")
    output_df = pd.DataFrame(columns=columns)

    for row in dataframe['Pose Data']:
        data = []
        row = str(row)
        new_row = row.replace('NormalizedLandmark', '')
        new_row = new_row[1:-1]
        matches = pattern.findall(new_row)

        for match in matches:
            body_part = match[0]
            measurements = tuple(map(float, match[1:]))
            data.append((body_part, measurements))

        data_df = pd.DataFrame([dict(data)])
        output_df = pd.concat([output_df, data_df])
    return output_df

def get_pose_output(video_draw_data, start_frame, end_frame):
    # Get 5 poses around kick. Start with pose directly before impact
    
    test_image_range(video_draw_data, start_frame, end_frame, False)
    
    # Extract data
    lower_output = get_lower_half_data(video_draw_data, start_frame, end_frame)
    final_output = create_output(labels, lower_output)
    
    return final_output

def write_arr_to_csv(data_with_times, save_path):
    cols = ['Right Knee', 'Left Knee', 'Right Ankle', 'Left Ankle', 'Right Heel', 'Left Heel', 'Right Foot Index', 'Left Foot Index']
    
    if os.path.isfile(formatted_data_path):
        existing_df = pd.read_csv(save_path)
        next_index = existing_df.tail(1)['Pose ID'].item() + 1
        header = False
    else:
        next_index = 0
        header = True
    
    df_output = pd.DataFrame(data_with_times, columns=['Time', 'Pose Data'])
    df_output.dropna()
    df_output = df_output.reset_index(drop=True)
    

    reformatted_df = create_df(df_output, cols)
    reformatted_df.reset_index(drop=True, inplace=True)
    reformatted_df['Pose ID'] = next_index

    # Uncomment below when saving data
    reformatted_df.to_csv(save_path, mode='a', header=header, index=False)
    

In [15]:
# Check for good pose estimation and capture data

def get_pose_data(file_path, flip):
    pose_data = []
    mp_data = []
    pose_draw_data = []
    time_stamps = []
    
    
    shortened = file_path[-9:-4]
    
    with vision.PoseLandmarker.create_from_options(options) as landmarker:
        cap = cv2.VideoCapture(file_path)
        
        while cap.isOpened():
            success, image = cap.read()
            if not success:
    
                break
    
            # When penalty is taken with left foot, uncomment below
            if flip:
                image = cv2.flip(image, 1)
            
            # Convert cv2 image to mediapipe image object
            mp_image = mp.Image(
                image_format=mp.ImageFormat.SRGB,
                data=cv2.cvtColor(image[400:1000, 500:1200], cv2.COLOR_BGR2RGB))
            timestamp_ms = int(cv2.getTickCount() / cv2.getTickFrequency() * 1000)
            result = landmarker.detect_for_video(mp_image, timestamp_ms)
            print(result.pose_world_landmarks)
            pose_data.append(result.pose_world_landmarks)
            annotated_image = draw_landmarks_on_image(mp_image.numpy_view(), result)
    
            if result.pose_world_landmarks:
                if result.pose_world_landmarks[0][0].z > -.3 or result.pose_world_landmarks[0][1].z > -.3:
                    # get frame_ms -- append to list
                    frame_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
                    time_stamps.append(frame_ms / 1000)
                    # draw frames
                    pose_draw_data.append(result)
                    mp_data.append(mp_image)
    
            cv2.imshow("MediaPipe Pose Landmark", cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB))
    
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        cv2.destroyAllWindows()
        cap.release()
        return pose_draw_data, time_stamps

In [16]:
new_path = r'C:\Users\kyles\Desktop\CS5100\Final_Project\Test_Data\fifa_data\fifa16.mp4'

go_ahead = 'n'


flip_status = input('Flip image? y or n')

if flip_status == 'y':
    video_data, time_stamps = get_pose_data(new_path, True)
else:
    video_data, time_stamps = get_pose_data(new_path, False)

while go_ahead == 'n':
    x = int(input('Enter start frame int: '))
    y = int(input('Enter end frame int: '))
    selected_data = get_pose_output(video_data, x, y)

    go_ahead = input('Proceed? y or n')
    if go_ahead == 'y':
        selected_times = time_stamps[x:y]
        output_with_times = list(zip(selected_times, selected_data))

        formatted_data_path = 'fifa_data2.csv'

        # uncomment to save to csv
        write_arr_to_csv(output_with_times, formatted_data_path)
    if go_ahead == 'skip':
        break


ValueError: invalid literal for int() with base 10: ''