In [1]:
import os
import pickle

import numpy as np
from tqdm.notebook import tqdm

from lared_dataset.constants import raw_pose_path, processed_pose_path
from lared_dataset.preprocess.pose.frame_poses import FramePoses
from lared_dataset.preprocess.pose.utils import (
    openpose_keypoints_to_text,
    interp_tracks,
    convert_to_upper_body,
    get_assigned_tracks,
    undo_track_interpolation,
    filter_joints_inplace_by_confidence,
    filter_joints_inplace_by_bone_length,
    reset_track_ids,
    filter_tracks_by_time,
    find_track_by_id)

In [2]:
def process_framewise_poses():
    for cam in [4]:
        source = os.path.join(raw_pose_path, f'cam{cam}')
        dest = os.path.join(processed_pose_path, f'cam{cam}.txt')
        openpose_keypoints_to_text(source, dest)

In [3]:
# process_framewise_poses()

In [4]:
def make_tracks_from_poses(poses_path):
    poses = FramePoses.from_openpose_text_fast(poses_path)
    poses = [j for i in poses for j in i]

    active_tracks = list()
    tracks = list()
    FramePoses.next_id = 0

    for i in tqdm(range(0, len(poses))):
        fp = FramePoses(poses[i], frame_num=i)
        fp.assign_tracks_hungarian(active_tracks, tracks)
    # add the tracks that are still active in the end
    tracks += active_tracks

    # sort by start time / frame
    tracks = sorted(tracks, key=lambda x: x['ini'])

    # remove the lower body
    convert_to_upper_body(tracks)

    # keep only tracks longer than 3s
    tracks = [t for t in tracks if len(t['poses']) > 3*30]

    # interpolate tracks
    interp_tracks(tracks)

    return tracks

def make_all_tracks():
    for cam in [2,3]:
        source_path = os.path.join(processed_pose_path, f'cam{cam}.txt')
        tracks = make_tracks_from_poses(source_path)
        dest_path = os.path.join(processed_pose_path, 'tracks', f'cam{cam}_3s.pkl')
        pickle.dump(tracks, open(dest_path, "wb"))

In [5]:
# make_all_tracks()

### Process manual track assignment labels

In [6]:
def assign_and_filter(tracks, track_assignments):
    tracks = get_assigned_tracks(tracks, track_assignments)

    tracks = undo_track_interpolation(tracks)

    filter_joints_inplace_by_confidence(tracks)

    filter_joints_inplace_by_bone_length(tracks)

    reset_track_ids(tracks)
    tracks = filter_tracks_by_time(tracks)
    
    return tracks

In [7]:
def cam2_corrections(tracks):
    ''' Correct manually identified mistakes in cam2 tracks
    '''
    track = find_track_by_id(tracks,138)
    tracks.remove(track)

    track = find_track_by_id(tracks,151)
    track['pid'] = 3

def cam3_corrections(tracks):
    ''' Correct manually identified mistakes in cam3 tracks
    '''
    t = find_track_by_id(tracks, 66)
    t['poses'] = t['poses'][150:]

    t = find_track_by_id(tracks, 91)
    tracks.remove(t)

    t = find_track_by_id(tracks, 231)
    t['pid'] = 7

    t = find_track_by_id(tracks, 261)
    t['poses'] = t['poses'][:180]

In [10]:
def store_final_tracks():
    for cam in [2]:
        tracks_path = os.path.join(processed_pose_path, 'tracks', f'cam{cam}.pkl')
        assignments_path = os.path.join(processed_pose_path, 'track_assignments', f'cam{cam}_all.pkl')

        tracks = pickle.load(open(tracks_path,'rb'))
        track_assignments = pickle.load(open(assignments_path,'rb'))

        tracks = assign_and_filter(tracks, track_assignments)

        if cam == 2:
            cam2_corrections(tracks)
        if cam == 3:
            cam3_corrections(tracks)
        
        reset_track_ids(tracks)
        
        dest_path = os.path.join(processed_pose_path, 'tracks', f'cam{cam}_final.pkl')
        pickle.dump(tracks, open(dest_path, "wb"))

In [11]:
store_final_tracks()