In [3]:
import pandas as pd
from pathlib import Path
import re
import numpy as np
from dataclasses import dataclass
from typing import List, Optional, Dict
import pickle
import torch


In [4]:
example_path = Path('/gws/nopw/j04/iecdt/cheetah/2017_08_29/bottom/phantom/flick2')

In [5]:
@dataclass
class Sample:
    # TODO: include the path to the data so we know where the frame came from in the same way that frame_idx was included
    """
    Represents one frame's data for a particular camera inside a Sequence.
    detections_2D: np.ndarray, shape (C=6, J, 3) where C isnumber of cameras
            J is number of bodyparts, and the last dim is (u, v, likelihood).
    frame_idx: int (0-based row index in the dataframe)
    """
    # these are the crucial parts as specified in data processing instructions
    detections_2d: np.ndarray 
    camera_projections: any # establish what type this should be, list of
    ground_truth_3d: np.ndarray

    # useful information
    frame_idx: int # frame_id from which this comes
#     detections_2d_file: Path # file path to filtered h5 2d data array file
#     ground_truth_3d_file: Path # file path to 3d ground truth


In [6]:
class Sequence:
    def __init__(self, path: Path):
        self.path = path
        self.detections_2d_dir = self.path / "filtered_2D"
        self.ground_truth_3d_file = self.path / "fte_pw" / "fte.pickle"
        self.samples: List[Sample] = []


    def load_3d_ground_truth(self):
        # loading FTE 3D positions
        if not self.ground_truth_3d_file.exists():
            raise FileNotFoundError(f"3D ground-truth file not found: {self.ground_truth_3d_file}")

        with open(self.ground_truth_3d_file, "rb") as f:
            fte = pickle.load(f)
        
        # pickle file is a dictionary with keys dict_keys(['positions', 'x', 'dx', 'ddx', 'start_frame'])
        # the positions is a nested list that can be converted to an array of dimension (F, J, 3)
        # where F is number of frames, J is number of joints (25) and 3 are the 3D coordinates
        fte_df = np.array(fte["positions"], dtype=np.float32)
        start_frame = fte["start_frame"]

        return fte_df, start_frame

   
    def load_2d_detections(self):

        # some sequences will only have 4 cameras, but keep size of tensor as 6
        NUM_CAMERAS = 6

        cam_data: Dict[int, np.ndarray] = {}

        # loop through the filtered directory
        for file in self.detections_2d_dir.glob("cam*.h5"):

            # assumption: all files will begin "camN" where N is the camera number
            # extract camera number from file name
            match = re.search(r'cam(\d+)', file.name)
            if not match:
                print(f"Warning: Could not parse camera ID from {file.name}")
                continue

            cam_id = int(match.group(1)) 
            cam_idx = cam_id - 1 # convert to 0-based index

            print(f"Loading filtered detections from {file} (camera ID {cam_id})")

            df = pd.read_hdf(file, key="df_with_missing")

            # drop the top level, leaving two levels (bodyparts, coords)
            df.columns = df.columns.droplevel(0)

            # can we drop the paw joints here so we have same number as ground truth
            # also need to ensure the ordering of joints is the same


            # todo: explain the logic of this. only slicing once per camera?

            x_df = df.xs("x", axis=1, level="coords")
            y_df = df.xs("y", axis=1, level="coords")
            c_df = df.xs("likelihood", axis=1, level="coords")

            x_np = x_df.to_numpy(dtype=np.float32)  
            y_np = y_df.to_numpy(dtype=np.float32)
            c_np = c_df.to_numpy(dtype=np.float32)

            stacked = np.stack([x_np, y_np, c_np], axis=2)  # (F, J, 3)

            cam_data[cam_idx] = stacked
            
        return cam_data

        
    def generate_samples(self):
        """
        Creates self.samples using:
          - the ground-truth timeline); for each GT frame, find the corresponding
            row in each camera's DataFrame, build a (C, J, 3) tensor and attach GT Jx3.
        Alignment:
          - FTE provides a 'start_frame' value, global_frame = start_frame + gt_idx
        """
        # ensure we start with a clean slate 
        self.samples = []

        fte_arr, start_frame = self.load_3d_ground_truth()

        # number of frames, number of joints, coordinate dimension
        F = fte_arr.shape[0]
        J = 25 # hardcoded but need to figure out which joints to drop to make this match the groundtruth
        # should be:
        # J = fte_arr.shape[1]
        C = 6

        cam_dfs = self.load_2d_detections()

        for i in range(F):

            global_frame = start_frame + i

            # initialize base tensor, size ()
            tensor = np.zeros((C, J, 3), dtype=np.float32)

            for cam_idx in range(C):

                if cam_idx not in cam_dfs:
                    continue
                
                cam_df = cam_dfs[cam_idx]

                # for each camera, get the positions for that frame_id
                # what to do if camera doesn't have this frame?

                row = global_frame - 1 # DLC dataframe is zero indexed, is fte pickle?
                tensor[cam_idx, :, :] = cam_df[row]

        
            sample = Sample(
                detections_2d=tensor, 
                camera_projections=None, # implement this, 
                ground_truth_3d = fte_arr[i],
                frame_idx = global_frame,
            )

            self.samples.append(sample)
        
        return     


In [7]:
test = Sequence(example_path)
test.generate_samples()

Loading filtered detections from /gws/nopw/j04/iecdt/cheetah/2017_08_29/bottom/phantom/flick2/filtered_2D/cam3DLC_resnet152_CheetahOct14shuffle1_500000.h5 (camera ID 3)
Loading filtered detections from /gws/nopw/j04/iecdt/cheetah/2017_08_29/bottom/phantom/flick2/filtered_2D/cam4DLC_resnet152_CheetahOct14shuffle1_500000.h5 (camera ID 4)
Loading filtered detections from /gws/nopw/j04/iecdt/cheetah/2017_08_29/bottom/phantom/flick2/filtered_2D/cam5DLC_resnet152_CheetahOct14shuffle1_500000.h5 (camera ID 5)
Loading filtered detections from /gws/nopw/j04/iecdt/cheetah/2017_08_29/bottom/phantom/flick2/filtered_2D/cam6DLC_resnet152_CheetahOct14shuffle1_500000.h5 (camera ID 6)


In [8]:
# example, look at first sample i.e. the first frame in the sequence
sample0 = test.samples[0]
sample0.ground_truth_3d # (J=20,3) array with 3d coord gt of each joint
sample0.detections_2d # (C, J=25, 3) array for each camera at first frame (matched to the ground truth frame number)
sample0.detections_2d[2] # look at 2d joint coordinates for camera 3 (cam index = 2)


array([[4.8733630e+02, 4.9720169e+02, 3.4308180e-01],
       [          nan,           nan, 2.7727544e-01],
       [4.9632397e+02, 4.9722488e+02, 7.9857337e-01],
       [5.0662256e+02, 5.1447656e+02, 8.7918854e-01],
       [5.0698251e+02, 5.2237421e+02, 7.0071018e-01],
       [5.0733719e+02, 5.2306757e+02, 5.5751270e-01],
       [4.8740195e+02, 4.9704016e+02, 7.4514353e-01],
       [4.8705948e+02, 5.0575839e+02, 8.0174422e-01],
       [4.8734814e+02, 5.1400653e+02, 8.3344680e-01],
       [4.8823483e+02, 5.2266437e+02, 7.7867436e-01],
       [4.9772604e+02, 5.2270178e+02, 6.8685657e-01],
       [4.6813193e+02, 5.1390295e+02, 9.2848915e-01],
       [4.4818103e+02, 5.2196851e+02, 9.9610347e-01],
       [          nan,           nan, 1.0640347e-01],
       [          nan,           nan, 1.2626436e-01],
       [          nan,           nan, 2.6733887e-01],
       [          nan,           nan, 2.6833761e-01],
       [          nan,           nan, 1.4509436e-01],
       [          nan,      