In [2]:
import numpy as np
import pandas as pd
# import deeplabcut
# import torch
from pathlib import Path

In [3]:
from utils.vis import *
import importlib
import utils.vis

# After making changes to utils/vis.py, reload it:
importlib.reload(utils.vis)

<module 'utils.vis' from 'c:\\Users\\jexia\\OneDrive\\Documents\\projects\\datafawn\\utils\\vis.py'>

In [15]:
h5_file = 'processed_vids/vid1_superanimal_quadruped_hrnet_w32_fasterrcnn_resnet50_fpn_v2_.h5'
pose_data = pd.read_hdf(h5_file)

pose_data = pose_data.sort_index(axis=1)

In [None]:
def detect_velocity_errors(df, bodyparts, threshold_pixels=50, window_size=5, 
                          scorer=None, individual=None):
    """
    Vectorized detection of frames where keypoints jump too far (high velocity).
    Uses hybrid approach: extract with pandas, compute with numpy.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame with pose data (MultiIndex columns)
    bodyparts : list
        List of bodypart names to check
    threshold_pixels : float, default=50
        Maximum expected pixel movement per frame
    window_size : int, default=5
        Window size for median filter (smooth out noise)
    scorer : str, optional
        Scorer name (extracted from df if not provided)
    individual : str, optional
        Individual name (extracted from df if not provided)
    
    Returns:
    --------
    error_df : pd.DataFrame
        DataFrame with one column per bodypart (boolean), indicating errors
    """
    if scorer is None:
        scorer = df.columns.get_level_values(0).unique()[0]
    if individual is None:
        individual = df.columns.get_level_values(1).unique()[0]
    
    # Extract all data efficiently using pandas MultiIndex slicing
    x_data = df.xs((scorer, individual), level=[0, 1], axis=1).xs('x', level='coords', axis=1)[bodyparts]
    y_data = df.xs((scorer, individual), level=[0, 1], axis=1).xs('y', level='coords', axis=1)[bodyparts]
    likelihood_data = df.xs((scorer, individual), level=[0, 1], axis=1).xs('likelihood', level='coords', axis=1)[bodyparts]
    
    # Convert to numpy for fast computation
    x_arr = x_data.values  # Shape: (n_frames, n_bodyparts)
    y_arr = y_data.values
    likelihood_arr = likelihood_data.values
    
    # Vectorized velocity calculation
    dx = np.abs(np.diff(x_arr, axis=0, prepend=x_arr[0:1]))
    dy = np.abs(np.diff(y_arr, axis=0, prepend=y_arr[0:1]))
    velocity = np.sqrt(dx**2 + dy**2)
    
    # Smooth using pandas rolling (convenient for this)
    velocity_df = pd.DataFrame(velocity, index=df.index, columns=bodyparts)
    velocity_smooth = velocity_df.rolling(window=window_size, center=True).median()
    
    # Vectorized error detection: (velocity > threshold) & (likelihood > 0.1)
    errors = (velocity_smooth.values > threshold_pixels) & (likelihood_arr > 0.1)
    
    # Return DataFrame with one column per bodypart
    error_df = pd.DataFrame(errors, index=df.index, columns=bodyparts)
    
    return error_df


def detect_likelihood_errors(df, bodyparts, min_likelihood=0.5, 
                            scorer=None, individual=None):
    """
    Vectorized detection of frames with low likelihood scores.
    Uses hybrid approach: extract with pandas, compute with numpy.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame with pose data (MultiIndex columns)
    bodyparts : list
        List of bodypart names to check
    min_likelihood : float, default=0.5
        Minimum acceptable likelihood score
    scorer : str, optional
        Scorer name (extracted from df if not provided)
    individual : str, optional
        Individual name (extracted from df if not provided)
    
    Returns:
    --------
    error_df : pd.DataFrame
        DataFrame with one column per bodypart (boolean), indicating errors
    """
    if scorer is None:
        scorer = df.columns.get_level_values(0).unique()[0]
    if individual is None:
        individual = df.columns.get_level_values(1).unique()[0]
    
    # Extract all likelihood data efficiently
    likelihood_data = df.xs((scorer, individual), level=[0, 1], axis=1).xs('likelihood', level='coords', axis=1)[bodyparts]
    
    # Convert to numpy for fast computation
    likelihood_arr = likelihood_data.values
    
    # Vectorized error detection: likelihood < min_likelihood
    errors = likelihood_arr < min_likelihood
    
    # Return DataFrame with one column per bodypart
    error_df = pd.DataFrame(errors, index=df.index, columns=bodyparts)
    
    return error_df


def detect_distance_errors(df, bodyparts, reference_bodypart='back_base', 
                           max_distance=300, scorer=None, individual=None):
    """
    Vectorized detection of frames where keypoints are too far from a reference point.
    Uses hybrid approach: extract with pandas, compute with numpy.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame with pose data (MultiIndex columns)
    bodyparts : list
        List of bodypart names to check
    reference_bodypart : str, default='back_base'
        Bodypart to use as reference
    max_distance : float, default=300
        Maximum expected distance in pixels
    scorer : str, optional
        Scorer name (extracted from df if not provided)
    individual : str, optional
        Individual name (extracted from df if not provided)
    
    Returns:
    --------
    error_df : pd.DataFrame
        DataFrame with one column per bodypart (boolean), indicating errors
    """
    if scorer is None:
        scorer = df.columns.get_level_values(0).unique()[0]
    if individual is None:
        individual = df.columns.get_level_values(1).unique()[0]
    
    # Extract reference point
    ref_x = df[(scorer, individual, reference_bodypart, 'x')].values
    ref_y = df[(scorer, individual, reference_bodypart, 'y')].values
    
    # Extract all bodypart data efficiently
    x_data = df.xs((scorer, individual), level=[0, 1], axis=1).xs('x', level='coords', axis=1)[bodyparts]
    y_data = df.xs((scorer, individual), level=[0, 1], axis=1).xs('y', level='coords', axis=1)[bodyparts]
    likelihood_data = df.xs((scorer, individual), level=[0, 1], axis=1).xs('likelihood', level='coords', axis=1)[bodyparts]
    
    # Convert to numpy for fast computation
    x_arr = x_data.values  # Shape: (n_frames, n_bodyparts)
    y_arr = y_data.values
    likelihood_arr = likelihood_data.values
    
    # Vectorized distance calculation
    # Broadcast reference point to match bodyparts shape
    ref_x_broadcast = ref_x[:, np.newaxis]  # Shape: (n_frames, 1)
    ref_y_broadcast = ref_y[:, np.newaxis]  # Shape: (n_frames, 1)
    
    dx = x_arr - ref_x_broadcast
    dy = y_arr - ref_y_broadcast
    distance = np.sqrt(dx**2 + dy**2)
    
    # Vectorized error detection: (distance > max_distance) & (likelihood > 0.1)
    errors = (distance > max_distance) & (likelihood_arr > 0.1)
    
    # Return DataFrame with one column per bodypart
    error_df = pd.DataFrame(errors, index=df.index, columns=bodyparts)
    
    return error_df

# TODO: edit this to fit use case. Right now no specific purpose
def detect_pose_errors(df, bodyparts=None, 
                       velocity_threshold=50,
                       min_likelihood=0.5,
                       max_distance=300,
                       reference_bodypart='back_base', # TODO: change this top a mapping from bodypart to reference point
                       require_multiple_indicators=True,
                       scorer=None,
                       individual=None):
    """
    Comprehensive error detection combining multiple methods.
    Now returns per-bodypart error information.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame with pose data
    bodyparts : list, optional
        List of bodypart names (None = use all)
    velocity_threshold : float, default=50
        Max pixels per frame
    min_likelihood : float, default=0.5
        Minimum acceptable likelihood
    max_distance : float, default=300
        Max distance from reference point
    reference_bodypart : str, default='back_base'
        Reference for distance check
    require_multiple_indicators : bool, default=True
        If True, require 2+ indicators for error
    scorer : str, optional
        Scorer name (extracted from df if not provided)
    individual : str, optional
        Individual name (extracted from df if not provided)
    
    Returns:
    --------
    error_mask : pd.Series
        Boolean Series indicating problematic frames (any bodypart)
    error_details : pd.DataFrame
        DataFrame with error information:
        - Per-bodypart columns for each error type (e.g., 'front_left_paw_velocity')
        - Combined columns: 'velocity_error', 'likelihood_error', 'distance_error'
        - 'any_error': overall error mask
    """
    if bodyparts is None:
        bodyparts = df.columns.get_level_values('bodyparts').unique().tolist()
    
    # Run all detection methods (now return DataFrames with per-bodypart columns)
    velocity_errors_df = detect_velocity_errors(
        df, bodyparts, velocity_threshold, scorer=scorer, individual=individual
    )
    likelihood_errors_df = detect_likelihood_errors(
        df, bodyparts, min_likelihood, scorer=scorer, individual=individual
    )
    distance_errors_df = detect_distance_errors(
        df, bodyparts, reference_bodypart, max_distance, 
        scorer=scorer, individual=individual
    )
    
    # Create error_details DataFrame with per-bodypart information
    error_details = pd.DataFrame(index=df.index)
    
    # Add per-bodypart error columns
    for bodypart in bodyparts:
        error_details[f'{bodypart}_velocity'] = velocity_errors_df[bodypart]
        error_details[f'{bodypart}_likelihood'] = likelihood_errors_df[bodypart]
        error_details[f'{bodypart}_distance'] = distance_errors_df[bodypart]
    
    # Add combined error columns (any bodypart has error)
    error_details['velocity_error'] = velocity_errors_df.any(axis=1)
    error_details['likelihood_error'] = likelihood_errors_df.any(axis=1)
    error_details['distance_error'] = distance_errors_df.any(axis=1)
    
    # Calculate overall error mask
    if require_multiple_indicators:
        # Require at least 2 error types (velocity, likelihood, or distance)
        combined_errors = pd.DataFrame({
            'velocity': error_details['velocity_error'],
            'likelihood': error_details['likelihood_error'],
            'distance': error_details['distance_error']
        })
        error_mask = combined_errors.sum(axis=1) >= 2
    else:
        # Any error type marks as error
        error_mask = error_details[['velocity_error', 'likelihood_error', 'distance_error']].any(axis=1)
    
    error_details['any_error'] = error_mask
    
    return error_mask, error_details

In [None]:
# Test velocity errors
bodyparts = ['front_left_paw', 'front_right_paw', 'back_left_paw', 'back_right_paw']

vel_errors = detect_velocity_errors(pose_data, bodyparts, threshold_pixels=50, window_size=5, 
                          scorer=None, individual=None)
                          
vel_errors[vel_errors['front_left_paw']==True]

Unnamed: 0,front_left_paw,front_right_paw,back_left_paw,back_right_paw
1063,True,True,True,False
1064,True,True,True,True
1066,True,True,True,True
1070,True,True,True,True
1081,True,False,True,True
1082,True,True,True,True
1172,True,False,False,False
1173,True,False,True,False
1174,True,False,True,False
1175,True,False,True,False


In [None]:
# Test likelihood errors
lh_errors = detect_likelihood_errors(pose_data, bodyparts, min_likelihood=0.0)
lh_errors[lh_errors['front_left_paw']==True]


Unnamed: 0,front_left_paw,front_right_paw,back_left_paw,back_right_paw
1048,True,True,True,True
1049,True,True,True,True
1061,True,True,True,True
1062,True,True,True,True
1080,True,True,True,True
...,...,...,...,...
1221,True,True,True,True
1222,True,True,True,True
1223,True,True,True,True
1224,True,True,True,True


In [27]:
# Test distance errors
d_errors = detect_distance_errors(pose_data, bodyparts, reference_bodypart='back_base', max_distance=200)
d_errors[d_errors['front_left_paw']==True]


Unnamed: 0,front_left_paw,front_right_paw,back_left_paw,back_right_paw
1082,True,False,True,False
1173,True,False,False,False


In [17]:
# Example 1: Get per-bodypart velocity errors
bodyparts = ['front_left_paw', 'front_right_paw', 'back_left_paw', 'back_right_paw']
velocity_errors = detect_velocity_errors(pose_data, bodyparts, threshold_pixels=50)
print(velocity_errors.head())
# Returns DataFrame with columns: front_left_paw, front_right_paw, etc.

# Check which bodyparts have errors in frame 100
print(velocity_errors.loc[100])

# Example 2: Comprehensive error detection
error_mask, error_details = detect_pose_errors(
    pose_data,
    bodyparts=bodyparts,
    velocity_threshold=50,
    min_likelihood=0.5,
    max_distance=300,
    require_multiple_indicators=True
)

# Access per-bodypart errors
print(error_details[['front_left_paw_velocity', 'front_left_paw_likelihood']].head())

# Access combined errors
print(error_details[['velocity_error', 'likelihood_error', 'distance_error']].sum())

# Example 3: Filter by specific bodypart errors
front_left_errors = error_details[
    error_details['front_left_paw_velocity'] | 
    error_details['front_left_paw_likelihood']
]

   front_left_paw  front_right_paw  back_left_paw  back_right_paw
0           False            False          False           False
1           False            False          False           False
2           False            False          False           False
3           False            False          False           False
4           False            False          False           False
front_left_paw     False
front_right_paw    False
back_left_paw      False
back_right_paw     False
Name: 100, dtype: bool
   front_left_paw_velocity  front_left_paw_likelihood
0                    False                      False
1                    False                      False
2                    False                      False
3                    False                      False
4                    False                      False
velocity_error       14
likelihood_error    278
distance_error        3
dtype: int64
