In [1]:
import numpy as np
import pandas as pd
import json
import h5py
import subprocess
import os

In [2]:
# Set an environment variable
os.environ["DATA_DIR"] = "../../botmatrix/demo/public/"

from crowndata_evaluation.services.kinematics.urdf import (
    get_robot_from_urdf,
    forward_kinematics,
    forward_kinematics,
    find_joint,
)
from crowndata_evaluation.services.kinematics.resources import (
    get_urdf_map,
    get_urdf_dict,
    get_urdf_file_path,
    h5_to_dict,
)
from crowndata_evaluation.services.utils import (
    fetch_joint_json,
)

urdf = "droid"
urdf = "rm_65_gazebo_dual_gripper"

# Load the URDF model
urdf_file_path = get_urdf_file_path(urdf=urdf)
robot = get_robot_from_urdf(urdf_file_path)

movable_joints = get_urdf_dict(urdf=urdf).get("movable_joints", [])

joint_names = get_urdf_dict(urdf=urdf).get("joint_names", [])

In [3]:
joint_names

['joint8', 'joint8_2']

In [4]:
source_path = "../../rm_65_gazebo_dual_gripper/rm_001/episode_4.hdf5"
target_dir = "../../rm_65_gazebo_dual_gripper/rm_001"

source_path = "../../rm_65_gazebo_dual_gripper/rm65_abc_20241031_010921/rm65_abc_20241031_010921.h5"
target_dir = "../../rm_65_gazebo_dual_gripper/rm65_abc_20241031_010921"

In [5]:
meta_file_path = "../../rm_65_gazebo_dual_gripper/rm_001/meta.json"
meta_file_path = "../../rm_65_gazebo_dual_gripper/rm65_abc_20241031_010921/meta.json"


with open(meta_file_path, "r") as file:
    meta = json.load(file)

dt = meta["DT"]
robot_embodiment = meta["device"]["type"]
cameras = meta["device"]["camera_names"]

In [7]:
#### process trajectory ####

trajectory_data = {}
with h5py.File(source_path, "r") as file:
    for key in file.keys():
        if isinstance(file[key], h5py.Dataset):
            trajectory_data[key] = file[key][:]
        elif isinstance(file[key], h5py.Group):
            trajectory_data[key] = h5_to_dict(file[key])

subprocess.call(["mkdir", "-p", f"{target_dir}"])
subprocess.call(["mkdir", "-p", f"{target_dir}/trajectories"])
# joint position
joint_positions = trajectory_data["action"]
# convert to radian
joint_positions[:, :6] = np.radians(joint_positions[:, :6])
joint_positions[:, 7:13] = np.radians(joint_positions[:, :6])
# convert to meter
joint_positions[:, 6] = joint_positions[:, 6] / 1000.0 * 60
joint_positions[:, 13] = joint_positions[:, 13] / 1000.0 * 60

columns = [f"joint{i}" for i in range(1, len(trajectory_data["action"][0]) + 1)]
if len(trajectory_data["action"][0]) == len(movable_joints):
    columns = movable_joints

df = pd.DataFrame(
    joint_positions,
    columns=columns,
)
df.to_json(
    f"{target_dir}/trajectories/joint_positions.json",
    orient="split",
    index=False,
    double_precision=3,
)

joint_records = df.to_dict(orient="records")
trajectories = {}
for joint_name in joint_names:
    link_name = find_joint(robot, joint_name).child
    trajectories[joint_name] = forward_kinematics(
        robot=robot, joint_records=joint_records, link_name=link_name
    )
    df = pd.DataFrame(
        trajectories[joint_name], columns=["x", "y", "z", "roll", "pitch", "yaw"]
    )
    df.to_json(
        f"{target_dir}/trajectories/{joint_name}__trajectory.json",
        orient="split",
        index=False,
        double_precision=3,
    )