In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os

def convert_multiperson_data(input_csv, output_dir='joint_data'):

    print(f"Processing data from {input_csv}")

    # Create output directory
    os.makedirs(output_dir, exist_ok=True)

    # Read the CSV file
    df = pd.read_csv(input_csv)

    # Get unique persons
    persons = df['Person'].unique()

    print(f"Found {len(persons)} person(s) in the data")

    # Define the 7 key joints we want to track (mapped to MediaPipe landmarks)
    # MediaPipe pose has 33 landmarks: https://developers.google.com/mediapipe/solutions/vision/pose_landmarker
    joint_mapping = {
        'Position1': 'Body_17',  # finger
        'Position2': 'Body_15',  # wrist
        'Position3': 'Body_13',  # elbow
        'Position4': 'Body_11',  # shoulder
        'Position5': 'Body_23',  # hip
        'Position6': 'Body_25',  # knee
        'Position7': 'Body_27'    # ankle
    }

    # Process each person separately
    for person_id in persons:
        person_df = df[df['Person'] == person_id]

        # Get all frames for this person
        frames = sorted(person_df['Frame'].unique())

        # Initialize a new dataframe for clean data
        clean_data = {'Frame': frames}

        # Extract each joint position
        for joint_name, landmark_name in joint_mapping.items():
            joint_data = person_df[person_df['Landmark'] == landmark_name]

            # Initialize with NaN
            clean_data[f'{joint_name}'] = np.nan

            # Map frame numbers to joint positions
            frame_to_position = {}

            for _, row in joint_data.iterrows():
                frame = row['Frame']
                # Store 3D position as a tuple
                frame_to_position[frame] = (row['x'], row['y'], row['z'])

            # Fill in positions for each frame
            positions = []
            for frame in frames:
                if frame in frame_to_position:
                    # Get the 3D position
                    pos = frame_to_position[frame]
                    positions.append(pos)
                else:
                    positions.append((np.nan, np.nan, np.nan))

            # Convert to numpy for easier velocity calculation
            np_positions = np.array(positions)


            # Store the position magnitudes (distance from origin)
            position_magnitudes = np.sqrt(np.sum(np_positions**2, axis=1))

            # Update the dataframe
            clean_data[f'{joint_name}'] = position_magnitudes
            # clean_data[f'Velocity{joint_name[-1]}'] = velocities

        # Create dataframe and fill missing values
        clean_df = pd.DataFrame(clean_data)
        clean_df = clean_df.interpolate(method='linear')

        # Save to CSV
        output_file = os.path.join(output_dir, f'human_person_{person_id}_joints.csv')
        clean_df.to_csv(output_file, index=False)

        print(f"Saved data for person {person_id} to {output_file}")

        # Optional: Visualize the joint positions
        plt.figure(figsize=(10, 6))
        for i in range(1, 8):
            plt.plot(clean_df['Frame'], clean_df[f'Position{i}'], label=f'Joint {i}')
        plt.title(f'Joint Positions - Person {person_id}')
        plt.xlabel('Frame')
        plt.ylabel('Position (magnitude)')
        plt.legend()
        plt.grid(True)
        plt.savefig(os.path.join(output_dir, f'positions_person_{person_id}.png'))
        plt.close()




In [None]:
convert_multiperson_data('MultipeopleData.csv')

# convert_single_person_data('pose_arm_timeseries_Singleperson.csv')



Processing data from MultipeopleData.csv
Found 1 person(s) in the data
Saved data for person 1 to joint_data/human_person_1_joints.csv


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from scipy import signal, interpolate

def convert_multiperson_data(input_csv, output_dir='joint_data', target_fps=20, source_fps=30):

    os.makedirs(output_dir, exist_ok=True)
    df = pd.read_csv(input_csv)
    persons = df['Person'].unique()
    print(f"Found {len(persons)} person(s) in the data")

    # Define the 7 key joints we want to track (mapped to MediaPipe landmarks)
    joint_mapping = {
        'Position1': 'Body_17',  # finger
        'Position2': 'Body_15',  # wrist
        'Position3': 'Body_13',  # elbow
        'Position4': 'Body_11',  # shoulder
        'Position5': 'Body_23',  # hip
        'Position6': 'Body_25',  # knee
        'Position7': 'Body_27'   # ankle
    }

    # Process each person separately
    for person_id in persons:
        person_df = df[df['Person'] == person_id]

        # Get all frames for this person
        frames = sorted(person_df['Frame'].unique())

        # Initialize a new dataframe for clean data
        clean_data = {'Frame': frames}

        # Extract each joint position
        for joint_name, landmark_name in joint_mapping.items():
            joint_data = person_df[person_df['Landmark'] == landmark_name]

            clean_data[f'{joint_name}'] = np.nan
            frame_to_position = {}

            for _, row in joint_data.iterrows():
                frame = row['Frame']
                frame_to_position[frame] = (row['x'], row['y'], row['z'])

            positions = []
            for frame in frames:
                if frame in frame_to_position:
                    pos = frame_to_position[frame]
                    positions.append(pos)
                else:
                    positions.append((np.nan, np.nan, np.nan))

            np_positions = np.array(positions)

            # Store the position magnitudes (distance from origin)
            position_magnitudes = np.sqrt(np.sum(np_positions**2, axis=1))
            clean_data[f'{joint_name}'] = position_magnitudes

        clean_df = pd.DataFrame(clean_data)
        clean_df = clean_df.interpolate(method='linear')

        downsampled_df = downsample_dataframe(clean_df, source_fps, target_fps)

        output_file = os.path.join(output_dir, f'3human_person_{person_id}_joints_{target_fps}hz.csv')
        downsampled_df.to_csv(output_file, index=False)

        print(f"Saved data for person {person_id} to {output_file}")


In [None]:
def downsample_dataframe(df, source_fps, target_fps):
    downsample_factor = source_fps / target_fps

    if downsample_factor == 1:
        return df

    original_frames = df['Frame'].values
    n_frames_new = int(len(original_frames) / downsample_factor)

    position_cols = [col for col in df.columns if col.startswith('Position')]
    downsampled_data = {'Frame': np.arange(n_frames_new)}

    original_time = np.arange(len(df)) / source_fps
    new_time = np.arange(n_frames_new) / target_fps

    for col in position_cols:
        interpolated = interpolate.interp1d(
            original_time,
            df[col].values,
            kind='linear',
            bounds_error=False,
            fill_value='extrapolate'
        )

        downsampled_data[col] = interpolated(new_time)

    downsampled_df = pd.DataFrame(downsampled_data)

    return downsampled_df

In [None]:
convert_multiperson_data(
    input_csv='MultipeopleData3.csv',
    output_dir='joint_data',
    target_fps=20,  # Target frame rate
    source_fps=30   # Original frame rate
)

Processing data from MultipeopleData3.csv
Downsampling from 30Hz to 20Hz
Found 1 person(s) in the data
Saved data for person 1 to joint_data/3human_person_1_joints_20hz.csv
Original frames: 6244, Downsampled frames: 4162
Conversion and downsampling complete.


Correct conversion between XYZ and Radians


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from scipy import signal, interpolate

def convert_multiperson_data(input_csv, output_dir='joint_data', target_fps=20, source_fps=30, normalize=True):
    print(f"Processing data from {input_csv}")

    # Create output directory
    os.makedirs(output_dir, exist_ok=True)

    # Read the CSV file
    df = pd.read_csv(input_csv)

    # Get unique persons
    persons = df['Person'].unique()

    print(f"Found {len(persons)} person(s) in the data")

    # Define the 7 key joints we want to track (mapped to MediaPipe landmarks)
    joint_mapping = {
        'Position1': 'Body_17',  # finger
        'Position2': 'Body_15',  # wrist
        'Position3': 'Body_13',  # elbow
        'Position4': 'Body_11',  # shoulder
        'Position5': 'Body_23',  # hip
        'Position6': 'Body_25',  # knee
        'Position7': 'Body_27'   # ankle
    }

    # Process each person separately
    for person_id in persons:
        person_df = df[df['Person'] == person_id]

        # Get all frames for this person
        frames = sorted(person_df['Frame'].unique())

        # Initialize a new dataframe for clean data
        clean_data = {'Frame': frames}

        # Dictionary to store joint positions per frame
        joints_3d = {}
        for joint_name in joint_mapping.keys():
            joints_3d[joint_name] = []

        # Extract each joint 3D position
        for frame in frames:
            frame_joints = {}

            # Initialize with None
            for joint_name in joint_mapping.keys():
                frame_joints[joint_name] = None

            # Find landmarks for this frame
            frame_data = person_df[person_df['Frame'] == frame]

            # Extract joint positions
            for joint_name, landmark_name in joint_mapping.items():
                joint_row = frame_data[frame_data['Landmark'] == landmark_name]

                if not joint_row.empty:
                    # Get 3D position
                    x, y, z = joint_row['x'].values[0], joint_row['y'].values[0], joint_row['z'].values[0]
                    frame_joints[joint_name] = (x, y, z)

            # Add to joints_3d
            for joint_name, pos in frame_joints.items():
                joints_3d[joint_name].append(pos)

        # Calculate joint angles - This is the key improvement
        joint_angles = calculate_joint_angles(joints_3d)

        # Add to clean_data
        for joint_name, angles in joint_angles.items():
            clean_data[joint_name] = angles

        # Create dataframe and fill missing values
        clean_df = pd.DataFrame(clean_data)
        clean_df = clean_df.interpolate(method='linear')

        # Calculate velocities (optional)
        for joint_name in joint_mapping.keys():
            pos = clean_df[joint_name].values
            # Simple velocity calculation (derivative)
            vel = np.zeros_like(pos)
            vel[1:] = np.diff(pos)
            # Scale by fps to get proper units
            vel = vel * source_fps
            clean_df[f'Velocity{joint_name[-1]}'] = vel

        # Normalize joint values to match robot range if requested
        if normalize:
            clean_df = normalize_joint_ranges(clean_df, joint_mapping)

        # Downsample to target fps
        downsampled_df = downsample_dataframe(clean_df, source_fps, target_fps)

        # Save to CSV
        output_file = os.path.join(output_dir, f'human_person_{person_id}_joints_{target_fps}hz.csv')
        downsampled_df.to_csv(output_file, index=False)

        print(f"Saved data for person {person_id} to {output_file}")
        print(f"Original frames: {len(clean_df)}, Downsampled frames: {len(downsampled_df)}")

        # Visualize the joint angles
        visualize_joint_data(downsampled_df, person_id, output_dir)

def calculate_joint_angles(joints_3d):
    """
    Calculate joint angles from 3D joint positions
    This approximates the robot joint angles

    Args:
        joints_3d: Dictionary of joint positions (x,y,z) for each frame

    Returns:
        Dictionary of joint angles for each frame
    """
    joint_angles = {}

    # Initialize joint angles
    for joint_name in joints_3d.keys():
        joint_angles[joint_name] = []

    # Process each frame
    n_frames = len(joints_3d['Position1'])

    for frame in range(n_frames):
        # Get joint positions for this frame
        pos1 = joints_3d['Position1'][frame]  # finger
        pos2 = joints_3d['Position2'][frame]  # wrist
        pos3 = joints_3d['Position3'][frame]  # elbow
        pos4 = joints_3d['Position4'][frame]  # shoulder
        pos5 = joints_3d['Position5'][frame]  # hip
        pos6 = joints_3d['Position6'][frame]  # knee
        pos7 = joints_3d['Position7'][frame]  # ankle

        # Skip if any joint position is missing
        if None in [pos1, pos2, pos3, pos4, pos5, pos6, pos7]:
            for joint_name in joint_angles.keys():
                joint_angles[joint_name].append(np.nan)
            continue

        # Calculate angles
        # These calculations are simplified approximations of joint angles

        # Position1: Finger angle (relative to wrist)
        finger_vec = np.array(pos1) - np.array(pos2)
        wrist_vec = np.array(pos2) - np.array(pos3)
        angle1 = angle_between_vectors(finger_vec, wrist_vec)

        # Position2: Wrist angle (relative to elbow)
        wrist_vec = np.array(pos2) - np.array(pos3)
        elbow_vec = np.array(pos3) - np.array(pos4)
        angle2 = angle_between_vectors(wrist_vec, elbow_vec)

        # Position3: Elbow angle (relative to shoulder)
        elbow_vec = np.array(pos3) - np.array(pos4)
        shoulder_vec = np.array(pos4) - np.array(pos5)
        angle3 = angle_between_vectors(elbow_vec, shoulder_vec)

        # Position4: Shoulder angle (3D orientation)
        # Using angle with vertical axis
        vertical = np.array([0, 1, 0])  # Assuming Y is vertical
        angle4 = angle_between_vectors(shoulder_vec, vertical)

        # Position5: Hip angle
        hip_vec = np.array(pos5) - np.array(pos6)
        angle5 = angle_between_vectors(hip_vec, vertical)

        # Position6: Knee angle
        knee_vec = np.array(pos6) - np.array(pos7)
        angle6 = angle_between_vectors(knee_vec, hip_vec)

        # Position7: Ankle angle (orientation)
        # Approximated as the angle with the ground plane normal
        ground_normal = np.array([0, 0, 1])  # Assuming Z is up from ground
        angle7 = angle_between_vectors(knee_vec, ground_normal)

        # Add to joint angles
        joint_angles['Position1'].append(angle1)
        joint_angles['Position2'].append(angle2)
        joint_angles['Position3'].append(angle3)
        joint_angles['Position4'].append(angle4)
        joint_angles['Position5'].append(angle5)
        joint_angles['Position6'].append(angle6)
        joint_angles['Position7'].append(angle7)

    return joint_angles

def angle_between_vectors(v1, v2):
    """
    Calculate the angle between two vectors in 3D space

    Args:
        v1: First vector
        v2: Second vector

    Returns:
        Angle in radians
    """
    # Ensure vectors are numpy arrays
    v1 = np.array(v1)
    v2 = np.array(v2)

    # Normalize vectors
    v1_norm = np.linalg.norm(v1)
    v2_norm = np.linalg.norm(v2)

    # Check for zero vectors
    if v1_norm < 1e-10 or v2_norm < 1e-10:
        return 0.0

    v1 = v1 / v1_norm
    v2 = v2 / v2_norm

    # Calculate angle
    dot_product = np.clip(np.dot(v1, v2), -1.0, 1.0)
    angle = np.arccos(dot_product)

    return angle

def normalize_joint_ranges(df, joint_mapping):
    """
    Normalize joint angles to match robot joint ranges

    Args:
        df: DataFrame with joint angles
        joint_mapping: Dictionary mapping joint names to landmarks

    Returns:
        DataFrame with normalized joint angles
    """
    # Example robot joint ranges (in radians)
    # These should be adjusted based on your specific robot
    robot_ranges = {
        'Position1': (-2.9, 2.9),    # Joint 1
        'Position2': (-1.8, 1.8),    # Joint 2
        'Position3': (-2.9, 2.9),    # Joint 3
        'Position4': (-3.1, 0.0),    # Joint 4
        'Position5': (-2.9, 2.9),    # Joint 5
        'Position6': (-0.0, 3.8),    # Joint 6
        'Position7': (-2.9, 2.9)     # Joint 7
    }

    # Create a copy of the DataFrame
    normalized_df = df.copy()

    # Normalize each joint
    for joint_name in joint_mapping.keys():
        # Get current range
        current_min = np.nanmin(df[joint_name])
        current_max = np.nanmax(df[joint_name])

        # Skip if range is too small
        if np.abs(current_max - current_min) < 1e-6:
            continue

        # Get target range
        target_min, target_max = robot_ranges[joint_name]

        # Normalize
        normalized_df[joint_name] = (df[joint_name] - current_min) / (current_max - current_min) * (target_max - target_min) + target_min

    return normalized_df

def downsample_dataframe(df, source_fps, target_fps):
    """
    Downsample a DataFrame from source_fps to target_fps

    Args:
        df: DataFrame containing motion data
        source_fps: Original frame rate
        target_fps: Target frame rate

    Returns:
        Downsampled DataFrame
    """
    # Calculate the downsample factor
    downsample_factor = source_fps / target_fps

    if downsample_factor == 1:
        # No downsampling needed
        return df

    # Get original frames
    original_frames = df['Frame'].values

    # Calculate the number of frames in the downsampled data
    n_frames_new = int(len(original_frames) / downsample_factor)

    # Find all columns except 'Frame'
    data_cols = [col for col in df.columns if col != 'Frame']

    # Initialize new DataFrame
    downsampled_data = {'Frame': np.arange(n_frames_new)}

    # Get original time points and new time points
    original_time = np.arange(len(df)) / source_fps
    new_time = np.arange(n_frames_new) / target_fps

    # Resample each column
    for col in data_cols:
        # Create interpolation function from original data
        interpolated = interpolate.interp1d(
            original_time,
            df[col].values,
            kind='linear',
            bounds_error=False,
            fill_value='extrapolate'
        )

        # Apply to new time points
        downsampled_data[col] = interpolated(new_time)

    # Create new DataFrame
    downsampled_df = pd.DataFrame(downsampled_data)

    return downsampled_df

def visualize_joint_data(df, person_id, output_dir):
    """
    Visualize the joint data

    Args:
        df: DataFrame with joint data
        person_id: ID of the person
        output_dir: Output directory for saving plots
    """
    # Find position columns
    position_cols = [col for col in df.columns if col.startswith('Position')]

    # Plot joint angles
    plt.figure(figsize=(15, 10))

    for i, col in enumerate(position_cols):
        plt.subplot(4, 2, i+1)
        plt.plot(df['Frame'], df[col])
        plt.title(f'{col} (radians)')
        plt.xlabel('Frame')
        plt.ylabel('Angle (rad)')
        plt.grid(True)

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f'joint_angles_person_{person_id}.png'))
    plt.close()

    # Plot joint angles in degrees for easier interpretation
    plt.figure(figsize=(15, 10))

    for i, col in enumerate(position_cols):
        plt.subplot(4, 2, i+1)
        plt.plot(df['Frame'], np.rad2deg(df[col]))
        plt.title(f'{col} (degrees)')
        plt.xlabel('Frame')
        plt.ylabel('Angle (deg)')
        plt.grid(True)

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f'joint_angles_degrees_person_{person_id}.png'))
    plt.close()

    # If velocity columns exist, plot them too
    velocity_cols = [col for col in df.columns if col.startswith('Velocity')]

    if velocity_cols:
        plt.figure(figsize=(15, 10))

        for i, col in enumerate(velocity_cols):
            plt.subplot(4, 2, i+1)
            plt.plot(df['Frame'], df[col])
            plt.title(f'{col} (rad/s)')
            plt.xlabel('Frame')
            plt.ylabel('Angular Velocity (rad/s)')
            plt.grid(True)

        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, f'joint_velocities_person_{person_id}.png'))
        plt.close()

    # Create a joint comparison with robot data
    try:
        # Check if robot data is available
        robot_data = pd.read_csv('1_2.csv')

        # Plot comparison for each joint
        plt.figure(figsize=(15, 15))

        for i, col in enumerate(position_cols):
            plt.subplot(4, 2, i+1)

            # Plot human data
            plt.plot(np.arange(len(df)) / len(df), df[col], label='Human (Extracted)')

            # Plot robot data (normalized to same length for comparison)
            robot_col = col  # Same column name
            if robot_col in robot_data.columns:
                # Resample robot data to match human data length
                robot_x = np.linspace(0, 1, len(robot_data))
                human_x = np.linspace(0, 1, len(df))

                # Create interpolation function
                robot_interp = interpolate.interp1d(
                    robot_x,
                    robot_data[robot_col].values,
                    kind='linear',
                    bounds_error=False,
                    fill_value='extrapolate'
                )

                # Resample to human data points
                robot_resampled = robot_interp(human_x)

                plt.plot(human_x, robot_resampled, '--', label='Robot (Original)')

            plt.title(f'{col} Comparison')
            plt.xlabel('Normalized Time')
            plt.ylabel('Joint Angle (rad)')
            plt.legend()
            plt.grid(True)

        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, f'joint_comparison_person_{person_id}.png'))
        plt.close()

    except Exception as e:
        print(f"Could not create comparison with robot data: {e}")

def load_robot_data(path):
    """
    Load robot joint data and visualize

    Args:
        path: Path to the robot joint data CSV
    """
    # Load robot data
    robot_df = pd.read_csv(path)

    # Find position columns
    position_cols = [col for col in robot_df.columns if col.startswith('Position')]

    # Plot joint angles
    plt.figure(figsize=(15, 10))

    for i, col in enumerate(position_cols):
        plt.subplot(4, 2, i+1)
        plt.plot(robot_df[col])
        plt.title(f'{col} (Robot)')
        plt.xlabel('Frame')
        plt.ylabel('Joint Angle (rad)')
        plt.grid(True)

    plt.tight_layout()
    plt.savefig('robot_joint_angles.png')
    plt.close()

    print(f"Robot data visualization saved to robot_joint_angles.png")

    return robot_df

# Example usage
if __name__ == "__main__":
    # First, visualize the robot data to understand its format
    robot_df = load_robot_data('1_2.csv')

    # Then process the multiperson data
    convert_multiperson_data(
        input_csv='MultipeopleData2.csv',
        output_dir='joint_data',
        target_fps=20,
        source_fps=30,
        normalize=True
    )

Robot data visualization saved to robot_joint_angles.png
Processing data from MultipeopleData2.csv
Found 1 person(s) in the data
Saved data for person 1 to joint_data/human_person_1_joints_20hz.csv
Original frames: 2728, Downsampled frames: 1818


Simplified approach

In [None]:
import pandas as pd
import numpy as np
import os
from scipy import interpolate

def convert_multiperson_data(input_csv, output_dir='joint_data', target_fps=20, source_fps=30, normalize=True):
    print(f"Processing data from {input_csv}")

    os.makedirs(output_dir, exist_ok=True)

    df = pd.read_csv(input_csv)

    persons = df['Person'].unique()

    print(f"Found {len(persons)} person(s) in the data")

    # Define the 7 key joints we want to track (mapped to MediaPipe landmarks)
    joint_mapping = {
        'Position1': 'Body_17',  # finger
        'Position2': 'Body_15',  # wrist
        'Position3': 'Body_13',  # elbow
        'Position4': 'Body_11',  # shoulder
        'Position5': 'Body_23',  # hip
        'Position6': 'Body_25',  # knee
        'Position7': 'Body_27'   # ankle
    }

    # Process each person separately
    for person_id in persons:
        person_df = df[df['Person'] == person_id]

        # Get all frames for this person
        frames = sorted(person_df['Frame'].unique())

        clean_data = {'Frame': frames}

        # Dictionary to store joint positions per frame
        joints_3d = {}
        for joint_name in joint_mapping.keys():
            joints_3d[joint_name] = []

        # Extract each joint 3D position
        for frame in frames:
            frame_joints = {}

            for joint_name in joint_mapping.keys():
                frame_joints[joint_name] = None

            frame_data = person_df[person_df['Frame'] == frame]

            # Extract joint positions
            for joint_name, landmark_name in joint_mapping.items():
                joint_row = frame_data[frame_data['Landmark'] == landmark_name]

                if not joint_row.empty:
                    x, y, z = joint_row['x'].values[0], joint_row['y'].values[0], joint_row['z'].values[0]
                    frame_joints[joint_name] = (x, y, z)

            for joint_name, pos in frame_joints.items():
                joints_3d[joint_name].append(pos)

        # Calculate joint angles
        joint_angles = calculate_joint_angles(joints_3d)

        for joint_name, angles in joint_angles.items():
            clean_data[joint_name] = angles

        clean_df = pd.DataFrame(clean_data)
        clean_df = clean_df.interpolate(method='linear')


        # Normalize joint values to match robot range
        if normalize:
            clean_df = normalize_joint_ranges(clean_df, joint_mapping)

        # Downsample
        downsampled_df = downsample_dataframe(clean_df, source_fps, target_fps)

        output_file = os.path.join(output_dir, f'human_person_{person_id}_joints_{target_fps}hz.csv')
        downsampled_df.to_csv(output_file, index=False)

        print(f"Saved data for person {person_id} to {output_file}")
        print(f"Original frames: {len(clean_df)}, Downsampled frames: {len(downsampled_df)}")

def calculate_joint_angles(joints_3d):
    joint_angles = {}

    for joint_name in joints_3d.keys():
        joint_angles[joint_name] = []

    n_frames = len(joints_3d['Position1'])

    for frame in range(n_frames):
        pos1 = joints_3d['Position1'][frame]  # finger
        pos2 = joints_3d['Position2'][frame]  # wrist
        pos3 = joints_3d['Position3'][frame]  # elbow
        pos4 = joints_3d['Position4'][frame]  # shoulder
        pos5 = joints_3d['Position5'][frame]  # hip
        pos6 = joints_3d['Position6'][frame]  # knee
        pos7 = joints_3d['Position7'][frame]  # ankle

        if None in [pos1, pos2, pos3, pos4, pos5, pos6, pos7]:
            for joint_name in joint_angles.keys():
                joint_angles[joint_name].append(np.nan)
            continue

        # Calculate angles
        # Position1: Finger angle (relative to wrist)
        finger_vec = np.array(pos1) - np.array(pos2)
        wrist_vec = np.array(pos2) - np.array(pos3)
        angle1 = angle_between_vectors(finger_vec, wrist_vec)

        # Position2: Wrist angle (relative to elbow)
        wrist_vec = np.array(pos2) - np.array(pos3)
        elbow_vec = np.array(pos3) - np.array(pos4)
        angle2 = angle_between_vectors(wrist_vec, elbow_vec)

        # Position3: Elbow angle (relative to shoulder)
        elbow_vec = np.array(pos3) - np.array(pos4)
        shoulder_vec = np.array(pos4) - np.array(pos5)
        angle3 = angle_between_vectors(elbow_vec, shoulder_vec)

        # Position4: Shoulder angle (3D orientation)
        vertical = np.array([0, 1, 0])  # Assuming Y is vertical
        angle4 = angle_between_vectors(shoulder_vec, vertical)

        # Position5: Hip angle
        hip_vec = np.array(pos5) - np.array(pos6)
        angle5 = angle_between_vectors(hip_vec, vertical)

        # Position6: Knee angle
        knee_vec = np.array(pos6) - np.array(pos7)
        angle6 = angle_between_vectors(knee_vec, hip_vec)

        # Position7: Ankle angle (orientation)
        ground_normal = np.array([0, 0, 1])  # Assuming Z is up from ground
        angle7 = angle_between_vectors(knee_vec, ground_normal)

        # Add to joint angles
        joint_angles['Position1'].append(angle1)
        joint_angles['Position2'].append(angle2)
        joint_angles['Position3'].append(angle3)
        joint_angles['Position4'].append(angle4)
        joint_angles['Position5'].append(angle5)
        joint_angles['Position6'].append(angle6)
        joint_angles['Position7'].append(angle7)

    return joint_angles

def angle_between_vectors(v1, v2):
    v1 = np.array(v1)
    v2 = np.array(v2)

    v1_norm = np.linalg.norm(v1)
    v2_norm = np.linalg.norm(v2)

    if v1_norm < 1e-10 or v2_norm < 1e-10:
        return 0.0

    v1 = v1 / v1_norm
    v2 = v2 / v2_norm

    dot_product = np.clip(np.dot(v1, v2), -1.0, 1.0)
    angle = np.arccos(dot_product)

    return angle

def normalize_joint_ranges(df, joint_mapping):
    # Robot joint ranges (in radians)
    robot_ranges = {
        'Position1': (-2.9, 2.9),    # Joint 1
        'Position2': (-1.8, 1.8),    # Joint 2
        'Position3': (-2.9, 2.9),    # Joint 3
        'Position4': (-3.1, 0.0),    # Joint 4
        'Position5': (-2.9, 2.9),    # Joint 5
        'Position6': (-0.0, 3.8),    # Joint 6
        'Position7': (-2.9, 2.9)     # Joint 7
    }

    normalized_df = df.copy()

    # Normalize each joint
    for joint_name in joint_mapping.keys():
        current_min = np.nanmin(df[joint_name])
        current_max = np.nanmax(df[joint_name])

        if np.abs(current_max - current_min) < 1e-6:
            continue

        target_min, target_max = robot_ranges[joint_name]
        normalized_df[joint_name] = (df[joint_name] - current_min) / (current_max - current_min) * (target_max - target_min) + target_min

    return normalized_df

def downsample_dataframe(df, source_fps, target_fps):
    downsample_factor = source_fps / target_fps

    if downsample_factor == 1:
        return df

    original_frames = df['Frame'].values
    n_frames_new = int(len(original_frames) / downsample_factor)
    data_cols = [col for col in df.columns if col != 'Frame']
    downsampled_data = {'Frame': np.arange(n_frames_new)}

    original_time = np.arange(len(df)) / source_fps
    new_time = np.arange(n_frames_new) / target_fps

    for col in data_cols:
        interpolated = interpolate.interp1d(
            original_time,
            df[col].values,
            kind='linear',
            bounds_error=False,
            fill_value='extrapolate'
        )

        downsampled_data[col] = interpolated(new_time)

    downsampled_df = pd.DataFrame(downsampled_data)

    return downsampled_df


convert_multiperson_data(
    input_csv='MultipeopleData2.csv',
    output_dir='joint_data',
    target_fps=20,
    source_fps=30,
    normalize=True

Low pass filter to reduce noise and make data smoother

In [None]:
import pandas as pd
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
import os

def smooth_joint_data(input_csv, output_csv, method='butterworth', cutoff=2.0, order=4, window_size=15):

    print(f"Loading data from {input_csv}")
    df = pd.read_csv(input_csv)

    # Identify joint position columns
    position_cols = [col for col in df.columns if col.startswith('Position')]

    # Create a copy for smoothed data
    smoothed_df = df.copy()

    # Apply the selected smoothing method
    if method == 'butterworth':
        print(f"Applying Butterworth filter (cutoff={cutoff}Hz, order={order})")
        # Assume 20Hz sampling rate (adjust if different)
        fs = 20.0
        # Normalize cutoff frequency
        nyquist = 0.5 * fs
        normal_cutoff = cutoff / nyquist

        # Design filter
        b, a = signal.butter(order, normal_cutoff, btype='low', analog=False)

        # Apply to each joint position
        for col in position_cols:
            # Apply filtering
            smoothed_df[col] = signal.filtfilt(b, a, df[col].values)

    elif method == 'savgol':
        print(f"Applying Savitzky-Golay filter (window={window_size}, order={order})")
        # Make sure window_size is odd
        if window_size % 2 == 0:
            window_size += 1

        # Apply to each joint position
        for col in position_cols:
            smoothed_df[col] = signal.savgol_filter(df[col].values, window_size, order)

    elif method == 'moving_avg':
        print(f"Applying Moving Average filter (window={window_size})")
        # Define moving average filter
        window = np.ones(window_size) / window_size

        # Apply to each joint position
        for col in position_cols:
            smoothed_df[col] = np.convolve(df[col].values, window, mode='same')

            # Fix edges (first and last window_size/2 points)
            half_window = window_size // 2
            for i in range(half_window):
                # Start (use forward values)
                if i < len(df):
                    forward_window = min(window_size, len(df) - i)
                    smoothed_df.loc[i, col] = df[col].values[i:i+forward_window].mean()

                # End (use backward values)
                end_idx = len(df) - 1 - i
                if end_idx >= 0:
                    backward_window = min(window_size, len(df) - (len(df) - 1 - i))
                    smoothed_df.loc[end_idx, col] = df[col].values[end_idx-backward_window+1:end_idx+1].mean()

    else:
        raise ValueError(f"Unknown smoothing method: {method}")

    # Recalculate velocities if they exist
    velocity_cols = [col for col in df.columns if col.startswith('Velocity')]
    if velocity_cols:
        print("Recalculating velocities from smoothed positions")

        for i, pos_col in enumerate(position_cols):
            vel_col = f"Velocity{pos_col[-1]}"
            if vel_col in velocity_cols:
                # Compute velocity using central differences
                pos_values = smoothed_df[pos_col].values
                vel_values = np.zeros_like(pos_values)

                # Interior points (central difference)
                vel_values[1:-1] = (pos_values[2:] - pos_values[:-2]) / 2.0

                # Endpoints (forward and backward differences)
                vel_values[0] = pos_values[1] - pos_values[0]
                vel_values[-1] = pos_values[-1] - pos_values[-2]

                # Scale by sampling rate (assuming 20Hz)
                vel_values *= 20.0

                # Update velocity column
                smoothed_df[vel_col] = vel_values

    # Save smoothed data
    print(f"Saving smoothed data to {output_csv}")
    smoothed_df.to_csv(output_csv, index=False)

    return smoothed_df

def visualize_smoothing_comparison(original_df, smoothed_df, output_dir, prefix=''):

    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Identify joint position columns
    position_cols = [col for col in original_df.columns if col.startswith('Position')]

    # Plot comparison
    plt.figure(figsize=(15, 10))

    for i, col in enumerate(position_cols):
        plt.subplot(4, 2, i+1)
        plt.plot(original_df['Frame'], original_df[col], 'r-', alpha=0.5, label='Original')
        plt.plot(smoothed_df['Frame'], smoothed_df[col], 'b-', label='Smoothed')
        plt.title(f'{col} Comparison')
        plt.xlabel('Frame')
        plt.ylabel('Joint Angle (rad)')
        plt.legend()
        plt.grid(True)

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f'{prefix}smoothing_comparison.png'))
    plt.close()

    # Plot velocities if they exist
    velocity_cols = [col for col in original_df.columns if col.startswith('Velocity')]
    if velocity_cols:
        plt.figure(figsize=(15, 10))

        for i, col in enumerate(velocity_cols):
            plt.subplot(4, 2, i+1)
            plt.plot(original_df['Frame'], original_df[col], 'r-', alpha=0.5, label='Original')
            plt.plot(smoothed_df['Frame'], smoothed_df[col], 'b-', label='Smoothed')
            plt.title(f'{col} Comparison')
            plt.xlabel('Frame')
            plt.ylabel('Angular Velocity (rad/s)')
            plt.legend()
            plt.grid(True)

        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, f'{prefix}velocity_comparison.png'))
        plt.close()

def batch_process_joint_files(input_dir, output_dir, method='butterworth',
                           cutoff=2.0, order=4, window_size=15, visualize=False):

    # Create output directory
    os.makedirs(output_dir, exist_ok=True)

    # Find all CSV files in input directory
    csv_files = [f for f in os.listdir(input_dir) if f.endswith('.csv')]

    for file in csv_files:
        input_path = os.path.join(input_dir, file)
        output_path = os.path.join(output_dir, f"smoothed_{file}")

        print(f"Processing {file}...")

        # Load original data
        original_df = pd.read_csv(input_path)

        # Apply smoothing
        smoothed_df = smooth_joint_data(
            input_path,
            output_path,
            method=method,
            cutoff=cutoff,
            order=order,
            window_size=window_size
        )

        # Visualize if requested
        if visualize:
            visualize_smoothing_comparison(
                original_df,
                smoothed_df,
                output_dir,
                prefix=os.path.splitext(file)[0] + '_'
            )

    print(f"Processed {len(csv_files)} files")

# Example usage
if __name__ == "__main__":
    # Option 1: Process a single file
    smooth_joint_data(
        input_csv='joint_data/3human_person_1_joints_20hz.csv',
        output_csv='joint_data/3smoothed_human_person_1_joints_20hz.csv',
        method='butterworth',
        cutoff=2.0,  # 2Hz cutoff - adjust based on your movement data
        order=4      # 4th order filter
    )

    # Option 2: Batch process all files in a directory
    # batch_process_joint_files(
    #     input_dir='joint_data',
    #     output_dir='smoothed_joint_data',
    #     method='butterworth',  # Try 'butterworth', 'savgol', or 'moving_avg'
    #     cutoff=2.0,            # Cutoff frequency in Hz (lower = smoother)
    #     order=4,               # Filter order (higher = sharper cutoff)
    #     window_size=15,        # Window size for savgol or moving average
    #     visualize=True         # Create comparison visualizations
    # )

Loading data from joint_data/3human_person_1_joints_20hz.csv
Applying Butterworth filter (cutoff=2.0Hz, order=4)
Saving smoothed data to joint_data/3smoothed_human_person_1_joints_20hz.csv
