# Impact Detection

In [1]:
# imports
import os
import json
import numpy as np
import pandas as pd

In [2]:
import sys
import numpy
numpy.set_printoptions(threshold=sys.maxsize)

In [6]:
json_paths

[(0, 'data/output_jsons/j1/j1_000000000000_keypoints.json'),
 (1, 'data/output_jsons/j1/j1_000000000001_keypoints.json'),
 (2, 'data/output_jsons/j1/j1_000000000002_keypoints.json'),
 (3, 'data/output_jsons/j1/j1_000000000003_keypoints.json'),
 (4, 'data/output_jsons/j1/j1_000000000004_keypoints.json'),
 (5, 'data/output_jsons/j1/j1_000000000005_keypoints.json'),
 (6, 'data/output_jsons/j1/j1_000000000006_keypoints.json'),
 (7, 'data/output_jsons/j1/j1_000000000007_keypoints.json'),
 (8, 'data/output_jsons/j1/j1_000000000008_keypoints.json'),
 (9, 'data/output_jsons/j1/j1_000000000009_keypoints.json'),
 (10, 'data/output_jsons/j1/j1_000000000010_keypoints.json'),
 (11, 'data/output_jsons/j1/j1_000000000011_keypoints.json'),
 (12, 'data/output_jsons/j1/j1_000000000012_keypoints.json'),
 (13, 'data/output_jsons/j1/j1_000000000013_keypoints.json'),
 (14, 'data/output_jsons/j1/j1_000000000014_keypoints.json'),
 (15, 'data/output_jsons/j1/j1_000000000015_keypoints.json'),
 (16, 'data/output

## Load Pose Estimates

In [4]:
# function to load pose estimates
def load_pose_estimates(load_dir, vid_name, fps):
    json_files = sorted(os.listdir(os.path.join(load_dir, vid_name)))
    json_paths = [(int(json_file.split('_')[1]), os.path.join(load_dir, vid_name, json_file)) for json_file in json_files if '.json' in json_file]
    n_frames = len(json_paths)
    
    pose_x = np.empty((n_frames, 18))
    pose_y = np.empty((n_frames, 18))
    pose_c = np.empty((n_frames, 18))
    time_stamps = np.empty((n_frames))
    for frm, json_path in json_paths: 
        with open(json_path, 'r') as f:
            json_dict = json.load(f)
        x = json_dict['people'][0]['pose_keypoints_2d'][0::3]
        y = json_dict['people'][0]['pose_keypoints_2d'][1::3]
        c = json_dict['people'][0]['pose_keypoints_2d'][2::3]
        pose_x[frm] = x
        pose_y[frm] = y
        pose_c[frm] = c
        time_stamps[frm] = frm/fps
        
    # mark missing points with np.nan
    pose_x[pose_x == 0] = np.nan
    pose_y[pose_y == 0] = np.nan
    pose_c[pose_c == 0] = np.nan
        
    return pose_x, pose_y, pose_c, time_stamps

In [5]:
# load pose estimates for j1
load_dir = os.path.join('data', 'output_jsons')
vid_name = 'j1'
fps = 30

pose_x, pose_y, pose_c, time_stamps = load_pose_estimates(load_dir, vid_name, fps)
pose_x.shape, pose_y.shape, pose_c.shape, time_stamps.shape

((4606, 18), (4606, 18), (4606, 18), (4606,))

In [7]:
np.zeros((5), dtype=bool)

array([False, False, False, False, False])

## Detect Impact Points

https://cmu-perceptual-computing-lab.github.io/openpose/web/html/doc/md_doc_02_output.html

In [18]:
x = pose_x[:,4]
y = pose_y[:,4]

In [6]:
def first_order_derivative(x,y):
    x_diff = np.zeros(len(x))
    y_diff = np.zeros(len(y))
    first_derivative = np.zeros(len(x)) #Euclidiean displacement of keypoint
    for idx,_ in enumerate(x):
        if (idx == 0 ):
            x_diff[idx] = np.nan
        else:
            if (x[idx-1] == np.nan or x[idx] == np.nan):
                x_diff[idx] = np.nan
            else:
                x_diff[idx] = x[idx] - x[idx-1] 
                
    for idx,_ in enumerate(y):
        if (idx == 0 ):
            y_diff[idx] = np.nan
        else:
            if (y[idx-1] == np.nan or y[idx] == np.nan):
                y_diff[idx] = np.nan
            else:
                y_diff[idx] = y[idx] - y[idx-1] 
    
    for idx,_ in enumerate(x_diff):
        if (x_diff[idx] == np.nan or y_diff[idx] == np.nan):
            first_derivative[idx] = np.nan
        else:
            first_derivative[idx] = np.sqrt(x_diff[idx]**2 + y_diff[idx]**2)
    return first_derivative

In [20]:
# function to detect impact points
def detect_impact_points(pose_x, pose_y, pose_c, time_stamps, limb_indices):
    assert len(pose_x) == len(pose_y) and len(pose_x) == len(pose_c) and len(pose_x) == len(time_stamps)
    
    # detect impact points for each limb
    n_frames = len(time_stamps)
    n_limbs = len(limb_indices)
    impact_points = []
    for i, limb in enumerate(limb_indices):
        x = pose_x[:,limb]
        y = pose_y[:,limb]
        c = pose_c[:,limb] # just a confidence value, probably not necessary
        limb_impacts = np.zeros((n_frames), dtype=bool)
        
        # to do: detect impact points here
        # fyi: there are np.nan values where points were not detected
        first_derivative = first_order_derivative(x,y)
#         print(first_derivative)
#         print(first_derivative.shape)
        threshold = np.nanpercentile(first_derivative,95) # Choose threshold as the top 5% percentile
    
        for i,_ in enumerate(first_derivative):
            if (first_derivative[i]>= threshold):
                limb_impacts[i] = True
#         print(limb_impacts)
        impact_points.append(limb_impacts)
#     print(np.shape(impact_points)
    impact_points = np.array(impact_points).T
    
    assert len(impact_points) == len(time_stamps)
    return impact_points, time_stamps

# testing
limb_indices = [4, 7, 10, 13] # hands and feet indices in COCO format
impact_points, time_stamps = detect_impact_points(pose_x, pose_y, pose_c, time_stamps, limb_indices)
impact_points.shape, time_stamps.shape

((4606, 4), (4606,))

In [21]:
impact_points

array([[False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
       [False, False, False, False],
 

In [22]:
np.sum(impact_points)

902

In [5]:
# save results to file
save_dir = os.path.join('data', 'impact_points')
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

np.save(os.path.join(save_dir, f'{vid_name}_impact_points.npy'), impact_points)
np.save(os.path.join(save_dir, f'{vid_name}_time_stamps.npy'), time_stamps)

## Run on All Videos

In [6]:
# run impact detection for all videos
load_dir = os.path.join('data', 'output_jsons')
vid_info_df = pd.read_csv(os.path.join('video', 'video_info.csv'), index_col='vid_name')
limb_indices = [4, 7, 10, 13] # hands and feet indices in COCO format

save_dir = os.path.join('data', 'impact_points')
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

# iterate over all videos
for vid_name in vid_info_df.index:
    if vid_name != 'group':
        print(10 * '-')
        print(vid_name)
        
        # load pose estimates
        fps = vid_info_df.loc[vid_name]['fps']
        pose_x, pose_y, pose_c, time_stamps = load_pose_estimates(load_dir, vid_name, fps)
        
        # detect and save impact points and timestamps
        impact_points, time_stamps = detect_impact_points(pose_x, pose_y, pose_c, time_stamps, limb_indices)
        np.save(os.path.join(save_dir, f'{vid_name}_impact_points.npy'), impact_points)
        np.save(os.path.join(save_dir, f'{vid_name}_time_stamps.npy'), time_stamps)

----------
a1
----------
a2
----------
e1
----------
e2
----------
e3
----------
j1
----------
j2
----------
u1
----------
u2
