In [None]:
#download dependencies
!wget -P . https://raw.githubusercontent.com/quietscientist/gma_score_prediction_from_video/refs/heads/main/utils/kinematics.py
!wget -P . https://raw.githubusercontent.com/quietscientist/gma_score_prediction_from_video/refs/heads/main/utils/circstat.py
!wget -P . https://raw.githubusercontent.com/quietscientist/gma_score_prediction_from_video/refs/heads/main/utils/processing.py
!wget -P . https://raw.githubusercontent.com/quietscientist/gma_score_prediction_from_video/refs/heads/main/utils/skeleton.py

#download example data or upload your own json annotations

In [None]:
%pip install scikit-video

import sys, os, cv2, glob, json, gc
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
import json
import numpy as np
import pandas as pd
import itertools
from itertools import chain
from moviepy.editor import VideoFileClip
import skvideo.io
from tqdm import tqdm
import circstat as CS
import scipy as sc
import math

from processing import *
from kinematics import *
from skeleton import *

In [None]:
# format files as pkl with openpose standard and bodypart labels

from tqdm import tqdm

OVERWRITE = True
USE_CENTER_INSTANCE = False
USE_BEST_INSTANCE = True

dataset = 'YT'
json_path = f'./data/annotations'
json_files = os.listdir(json_path)
directory = f'./data'

save_path = f'./pose_estimates/{dataset}_norm'

if not os.path.exists(save_path):
    os.makedirs(save_path)

kp_mapping = {0:'Nose', 1:'Neck', 2:'RShoulder', 3:'RElbow', 4:'RWrist', 5:'LShoulder', 6:'LElbow',
              7:'LWrist', 8:'RHip', 9:'RKnee', 10:'RAnkle', 11:'LHip',
              12:'LKnee', 13:'LAnkle', 14:'REye', 15:'LEye', 16:'REar', 17:'LEar'}

# Define the DataFrame columns as specified
columns = ['video_number', 'video', 'bp', 'frame', 'x', 'y', 'c','fps', 'pixel_x', 'pixel_y', 'time', 'part_idx']
data = []  # This will hold the data to be loaded into the DataFrame
vid_info = pd.read_csv('./data/video_info.csv')

for file_number, file in enumerate(tqdm(json_files)):
    # Construct the full file path
    file_path = os.path.join(json_path, file)
    fname = file.split('.')[0]
    interim = []

    if not OVERWRITE and os.path.exists(f'{save_path}/{fname}.pkl'):
        continue

    # Open and load the JSON data
    with open(file_path, 'r') as f:
        frames = json.load(f)
        info = vid_info[vid_info['video'] == fname]
        center_x = info['center_x'].values[0]
        center_y = info['center_y'].values[0]
        pixel_x = info['width'].values[0]
        pixel_y = info['height'].values[0]
        fps = info['fps'].values[0]

        # Iterate through each frame in the JSON file
        for frame in frames:
            frame_id = frame['frame_id']
            if 'instances' in frame and len(frame['instances']) > 0:

                if USE_CENTER_INSTANCE:
                    instance_id = get_center_instance(frame['instances'], center_x, center_y)
                elif USE_BEST_INSTANCE:
                    instance_id = get_best_instance(frame['instances'])
                else:
                    instance_id = 0

                keypoints = frame['instances'][instance_id]['keypoints']
                confidence = frame['instances'][instance_id]['keypoint_scores']
                keypoints, confidence = convert_coco_to_openpose(keypoints, confidence)

                # Iterate through each keypoint
                for part_idx, (x, y) in enumerate(keypoints):

                    bp = kp_mapping[part_idx]
                    fps = fps
                    time = frame_id / fps
                    c = confidence[part_idx]

                    row = [file_number, fname, bp, frame_id, x, y, c, fps, pixel_x, pixel_y, time, part_idx]
                    interim.append(row)

    interim_df = pd.DataFrame(interim, columns=columns)
    interim_df.to_pickle(f'{save_path}/{fname}.pkl')

    del interim_df


100%|██████████| 19/19 [00:04<00:00,  4.22it/s]


In [None]:
for pklfile in tqdm(os.listdir(save_path)):

    interim_df = pd.read_pickle(f'{save_path}/{pklfile}')
    interim_df.to_csv(f'{save_path}/pose_estimates_{dataset}.csv', mode='a', header=False, index=False)

    del interim_df

In [None]:
dataset = 'YT'

csv_path = f'{save_path}/pose_estimates_{dataset}.csv'
output_csv_path = f'{save_path}/pose_estimates_{dataset}_b.csv'
chunksize = 1000  # Number of rows per chunk

# Define the new headers
new_headers = ['video_number', 'video', 'bp', 'frame', 'x', 'y', 'c', 'fps', 'pixel_x', 'pixel_y', 'time', 'part_idx']

# Read the CSV file in chunks
chunk_iterator = pd.read_csv(csv_path, chunksize=chunksize)

# Process the first chunk
first_chunk = next(chunk_iterator)
first_chunk.columns = new_headers
first_chunk.to_csv(output_csv_path, index=False)

# Process the rest of the chunks and append them to the new CSV file without headers
for chunk in chunk_iterator:
    chunk.columns = new_headers
    chunk.to_csv(output_csv_path, mode='a', index=False, header=False)

# rename the csv file
os.rename(csv_path, f'{save_path}pose_estimates_{dataset}_x.csv')
os.rename(output_csv_path, csv_path)

#remove other files
os.remove(f'{save_path}/pose_estimates_{dataset}_b.csv')
os.remove(f'{save_path}/pose_estimates_{dataset}_x.csv')