
# Hot3D Data Provider Tutorial

In order to use sequences from the HOT3D dataset, you will need ot use the Hot3dDataProvider object.

This notebook is explaining how to use the various "DataProvider" in order to retrieve:
- Section 0: DataProvider initialization
- Section 1: Device calibration and Image data
- Section 2: Pose data
  - Section 2.a: Device/Headset pose data
  - Section 2.b: Hand pose data
  - Section 2.b.a: Hand pose data and MESH hands
  - Section 2.c: Object pose data
- Section 3: Object bounding boxes (amodal bounding boxes)

Hot3dDataProvider API is organized as follow:
```
|- device_data_provider        -> provides device calibration and image data
|- device_pose_data_provider   -> provides device pose data
|- mano_hand_data_provider     -> provides hand pose data (MANO representation)
|- umetrack_hand_data_provider -> provides hand pose data (UmeTrack representation)
|- object_pose_data_provider   -> provides object pose data
|- object_library              -> provides information about the HOT3D 3D objects/assets
|- hand_box2d_data_provider    -> provides hands bbox information
|- object_box2d_data_provider  -> provides objects bbox information
```

## Notes
- All Device/Headset, Hand, Object poses data are shared in world coordinates (meters)

In this tutorial you will learn that:
- Device data, such as Image data stream is indexed with a stream_id
- Headset use camera rig coordinates relative to the DEVICE pose (world_camera_stream_id = world_device @ device_camera_stream_id)

In [3]:
#
# Section 0: DataProvider initialization
#
# Take home message:
# - Device data, such as Image data stream is indexed with a stream_id
# - Intrinsics and Extrinsics calibration relative to the device coordinates is available for each CAMERA/stream_id
#
# Data Requirements:
# - a sequence
# - the object library
# Optional:
# - To use the Mano hand you need to have the LEFT/RIGHT *.pkl hand models (available)

import os
from dataset_api import Hot3dDataProvider
from data_loaders.loader_object_library import load_object_library
from data_loaders.mano_layer import MANOHandModel

# home = os.path.expanduser("~")
# print("home", home)
# hot3d_dataset_path = home + "/Downloads/hot3d_dataset"
hot3d_dataset_path = "/home/keyi/Documents/VsCodeP/research/retarget/bimanual/hot3d/hot3d/dataset"
sequence_path = os.path.join(hot3d_dataset_path, "P0003_c701bd11")
object_library_path = os.path.join(hot3d_dataset_path, "assets")
# mano_hand_model_path = os.path.join(home, "Downloads")
mano_hand_model_path = "/home/keyi/Documents/VsCodeP/research/retarget/bimanual/objasm/manopth/mano/models"

if not os.path.exists(sequence_path) or not os.path.exists(object_library_path):
    print("Invalid input sequence or library path.")
    print("Please do update the path to VALID values for your system.")
    raise
#
# Init the object library
#
object_library = load_object_library(object_library_folderpath=object_library_path)

#
# Init the HANDs model
# If None, the UmeTrack HANDs model will be used
#
mano_hand_model = None
if mano_hand_model_path is not None:
    mano_hand_model = MANOHandModel(mano_hand_model_path)

#
# Initialize hot3d data provider
#
hot3d_data_provider = Hot3dDataProvider(
    sequence_folder=sequence_path,
    object_library=object_library,
    mano_hand_model=mano_hand_model,
)
print(f"data_provider statistics: {hot3d_data_provider.get_data_statistics()}")



MPS Data Paths
MPS SLAM Data Paths
--closedLoopTrajectory: /home/keyi/Documents/VsCodeP/research/retarget/bimanual/hot3d/hot3d/dataset/P0003_c701bd11/mps/slam/closed_loop_trajectory.csv
--openLoopTrajectory: /home/keyi/Documents/VsCodeP/research/retarget/bimanual/hot3d/hot3d/dataset/P0003_c701bd11/mps/slam/open_loop_trajectory.csv
--semidensePoints: /home/keyi/Documents/VsCodeP/research/retarget/bimanual/hot3d/hot3d/dataset/P0003_c701bd11/mps/slam/semidense_points.csv.gz
--semidenseObservations: /home/keyi/Documents/VsCodeP/research/retarget/bimanual/hot3d/hot3d/dataset/P0003_c701bd11/mps/slam/semidense_observations.csv.gz
--onlineCalibration: /home/keyi/Documents/VsCodeP/research/retarget/bimanual/hot3d/hot3d/dataset/P0003_c701bd11/mps/slam/online_calibration.jsonl
--summary: /home/keyi/Documents/VsCodeP/research/retarget/bimanual/hot3d/hot3d/dataset/P0003_c701bd11/mps/slam/summary.json
MPS Eyegaze Data Paths
--generalEyegaze: /home/keyi/Documents/VsCodeP/research/retarget/bimanual/ho

[38;2;000;128;000m[MultiRecordFileReader][DEBUG]: Opened file '/home/keyi/Documents/VsCodeP/research/retarget/bimanual/hot3d/hot3d/dataset/P0003_c701bd11/recording.vrs' and assigned to reader #0[0m
[0m[38;2;000;000;255m[VrsDataProvider][INFO]: streamId 214-1/camera-rgb activated[0m
[0m[38;2;000;000;255m[VrsDataProvider][INFO]: Timecode stream found: 285-2[0m
[0m[38;2;000;000;255m[VrsDataProvider][INFO]: Fail to activate streamId 286-1[0m
[0m[38;2;000;000;255m[VrsDataProvider][INFO]: streamId 1201-1/camera-slam-left activated[0m
[0m[38;2;000;000;255m[VrsDataProvider][INFO]: streamId 1201-2/camera-slam-right activated[0m
[0m[38;2;000;000;255m[VrsDataProvider][INFO]: streamId 1202-1/imu-right activated[0m
[0m[38;2;000;000;255m[VrsDataProvider][INFO]: streamId 1202-2/imu-left activated[0m
[0m

In [1]:
#
# Utility functions
# Used for interactive display in the following sections
#
import sys
print(sys.executable)
import rerun as rr
import numpy as np

from projectaria_tools.core.sophus import SE3
from projectaria_tools.utils.rerun_helpers import ToTransform3D


def log_image(
    image: np.array,
    label: str,
    static=False
) -> None:
    rr.log(label, rr.Image(image), static=static)


def log_pose(
    pose: SE3,
    label: str,
    static=False
) -> None:
    rr.log(label, ToTransform3D(pose, False), static=static)

/home/keyi/anaconda3/envs/hot3d/bin/python


ModuleNotFoundError: No module named 'rerun'

In [2]:
# Section 1: Device calibration and Image data

from tqdm import tqdm

#
# Retrieve some statistics about the "IMAGE" VRS recording
#

# Getting the device data provider (alias)
device_data_provider = hot3d_data_provider.device_data_provider

# Retrieve the list of image stream supported by this sequence
# It will return the RGB and SLAM Left/Right image streams
image_stream_ids = device_data_provider.get_image_stream_ids()
# Retrieve a list of timestamps for the sequence (in nanoseconds)
timestamps = device_data_provider.get_sequence_timestamps()

print(f"Sequence: {os.path.basename(os.path.normpath(sequence_path))}")
print(f"Device type is {hot3d_data_provider.get_device_type()}")
print(f"Image stream ids: {image_stream_ids}")
print(f"Number of timestamp for this sequence: {len(timestamps)}")
print(
    f"Duration of the sequence: {(timestamps[-1] - timestamps[0]) / 1e9} (seconds)"
)  # Timestamps are in nanoseconds


# Init a rerun context to visualize the sequence file images
rr.init("Device images")
rec = rr.memory_recording()

# How to iterate over timestamps using a slice to show one timestamp every 200
timestamps_slice = slice(None, None, 200)
# Loop over the timestamps of the sequence and visualize corresponding data
for timestamp_ns in tqdm(timestamps[timestamps_slice]):

    for stream_id in image_stream_ids:
        # Retrieve the image stream label as string
        image_stream_label = device_data_provider.get_image_stream_label(stream_id)
        # Retrieve the image data for a given timestamp
        image_data = device_data_provider.get_image(timestamp_ns, stream_id)
        # Visualize the image data (it's a numpy array)
        log_image(label=f"img/{image_stream_label}", image=image_data)


#
# Retrieve Camera calibration (intrinsics and extrinsics) for a given stream_id
#
for stream_id in image_stream_ids:
    # Retrieve the camera calibration (intrinsics and extrinsics) for a given stream_id
    [extrinsics, intrinsics] = device_data_provider.get_camera_calibration(stream_id)
    print(intrinsics)
    # We will show in next section how to visualize the position of the camera in the world frame

# Showing the rerun window
rr.notebook_show()

ModuleNotFoundError: No module named 'tqdm'

# A gentle introduction to the "GT Data" Provider API

Take home message:
- All "GT data provider" are using a similar API interface to query data at a given timestamp and/or StreamID.
- If the requested timestamp does not exists, the closest one can be retrieve along its delta time (dt).

All the following "GT data providers" are accessible from Hot3dDataProvider and using a similar API interface.
```
|- device_pose_data_provider   -> device/headset pose data
|- mano_hand_data_provider     -> hand pose data (MANO hand model)
|- umetrack_hand_data_provider -> hand pose data (UmeTrack hand model)
|- object_pose_data_provider   -> object pose data
|- hand_box2d_data_provider    -> hand information such as amodal BBox and visibility ratio
|- object_box2d_data_provider  -> object information such as amodal BBox and visibility ratio
```

We are here shortly introducing the retrieval concept used, and then will showcase how to use each data_provider.
GT data providers enable retrieving information at a given TIMESTAMP
- If the timestamp is not exact, the closest one can will be returned,
- Delta Time (dt) between the found sample and the query timestamp is returned
  Meaning that you known if you have a perfect match to the GT time sample or retrieved a close sample.
  
Note: Some GT data providers are STREAM_ID specific and enable retrieve information for a given image stream.
```
data_with_dt = device_pose_provider.get_pose_at_timestamp(
   timestamp_ns: int,                           -> Timestamp
   stream_id: StreamID,                         -> If used, specify for which VRS image stream you query the data
   time_query_options: TimeQueryOptions,        -> Retrieval configuration, i.e TimeQueryOptions.CLOSEST
   time_domain: TimeDomain,                     -> TimeDomain (always use TimeDomain.TIME_CODE)
   acceptable_time_delta: Optional[int] = None, -> Threshold to reject delta dt that would be too large (using 0 or None is recommended)
```

Here is how most of the interface will be used in the following sections:
```
data_with_dt = X_provider.get_X_at_timestamp(
    timestamp_ns=timestamp_ns,
    time_query_options=TimeQueryOptions.CLOSEST,
    time_domain=TimeDomain.TIME_CODE)
```

In [None]:
#
# Section 2: Pose data
#
# Take home message:
# - the device_pose_provider enables you to retrieve the Headset pose as (T_world_device)
# - moving to the device to a given camera can be done by using calibration data and combining SE3 poses
#   - such as T_world_camera = T_world_device @ T_device_camera
#

from projectaria_tools.core.sensor_data import TimeDomain, TimeQueryOptions

# Alias over the HEADSET/Device pose data provider
device_pose_provider = hot3d_data_provider.device_pose_data_provider

# Init a rerun context to visualize the device trajectory
rr.init("Device/Headset trajectory")
rec = rr.memory_recording()

pose_translations = []
# Retrieve the position of the device in the world frame at a given timestamp
for timestamp_ns in tqdm(timestamps):

    rr.set_time_nanos("synchronization_time", int(timestamp_ns))
    rr.set_time_sequence("timestamp", timestamp_ns)

    headset_pose3d_with_dt = None
    if device_pose_provider is None:
        continue
    headset_pose3d_with_dt = device_pose_provider.get_pose_at_timestamp(
        timestamp_ns=timestamp_ns,
        time_query_options=TimeQueryOptions.CLOSEST,
        time_domain=TimeDomain.TIME_CODE,
    )

    if headset_pose3d_with_dt is None:
        continue

    headset_pose3d = headset_pose3d_with_dt.pose3d
    T_world_device = headset_pose3d.T_world_device
    
    log_pose(pose=T_world_device, label="world/device")
    pose_translations.append(T_world_device.translation()[0])
    # This is the pose of the device, to move to a given camera, you need to apply the device_camera transformation
    #for stream_id in image_stream_ids:
       # # Retrieve the camera calibration (intrinsics and extrinsics) for a given stream_id
       # [T_device_camera, intrinsics] = device_data_provider.get_camera_calibration(stream_id)
       # # The pose of the given camera at this timestamp is (world_camera = world_device @ device_camera):
       # T_world_camera = headset_pose3d.T_world_device @ T_device_camera
       # camera_stream_label = device_data_provider.get_image_stream_label(stream_id)
       # print(f"Image stream label: {camera_stream_label} -> world_camera translation: {T_world_camera.translation()[0]}")

rr.log("world/device_trajectory", rr.LineStrips3D([pose_translations]), static=True)

# Showing the rerun window
rr.notebook_show()

In [None]:
#
# Section 2.b: Hand pose data
#
# Take home message:
# - Hands are labelled as LEFT or RIGHT hands
# - "Hands pose" are representing the WRIST pose on which a MESH or LANDMARKS can be attached (see next section)
#

# Alias over the HAND pose data provider
hand_data_provider = hot3d_data_provider.mano_hand_data_provider if hot3d_data_provider.mano_hand_data_provider is not None else hot3d_data_provider.umetrack_hand_data_provider

# Init a rerun context to visualize the hand pose data trajectory
rr.init("Hand pose trajectory (wrist)")
rec = rr.memory_recording()

# Accumulate HAND poses translations as list, to show a LINE strip HAND trajectory
left_hand_pose_translations = []
right_hand_pose_translations = []

# Retrieve the position of the device in the world frame at a given timestamp
for timestamp_ns in tqdm(timestamps):

    rr.set_time_nanos("synchronization_time", int(timestamp_ns))
    rr.set_time_sequence("timestamp", timestamp_ns)

    hand_poses_with_dt = None
    if hand_data_provider is None:
        continue
    
    hand_poses_with_dt = hand_data_provider.get_pose_at_timestamp(
        timestamp_ns=timestamp_ns,
        time_query_options=TimeQueryOptions.CLOSEST,
        time_domain=TimeDomain.TIME_CODE,
    )

    if hand_poses_with_dt is None:
        continue
        
    hand_pose_collection = hand_poses_with_dt.pose3d_collection

    for hand_pose_data in hand_pose_collection.poses.values():
        # Retrieve the handedness of the hand (i.e Left or Right)
        handedness_label = hand_pose_data.handedness_label()

        T_world_wrist = hand_pose_data.wrist_pose
        log_pose(pose=T_world_wrist, label=f"world/hand/{handedness_label}")

        # Accumulate HAND poses translations as list, to show a LINE strip HAND trajectory
        if hand_pose_data.is_left_hand():
            left_hand_pose_translations.append(T_world_wrist.translation()[0])
        elif hand_pose_data.is_right_hand():
            right_hand_pose_translations.append(T_world_wrist.translation()[0])

rr.log("world/left_hand", rr.LineStrips3D([left_hand_pose_translations]), static=True)
rr.log("world/right_hand", rr.LineStrips3D([right_hand_pose_translations]), static=True)

# Showing the rerun window
rr.notebook_show()

In [None]:
#
# Section 2.b.a: Hand pose data
#
# Take home message:
# - Hands are labelled as LEFT or RIGHT hands
# - Hands can be retrieved as:
#   - Landmarks and displayed as line
#   - Vertices
#   - Mesh (using vertices, faces index and normals)
#

from data_loaders.hand_common import LANDMARK_CONNECTIVITY


# Alias over the HAND pose data provider
hand_data_provider = hot3d_data_provider.mano_hand_data_provider if hot3d_data_provider.mano_hand_data_provider is not None else hot3d_data_provider.umetrack_hand_data_provider

# Init a rerun context
rr.init("Hand pose LANDMARK/MESH")
rec = rr.memory_recording()

left_hand_pose_translations = []
right_hand_pose_translations = []

# Limit to the first 300 timestamps
for timestamp_ns in tqdm(timestamps[:300]):

    rr.set_time_nanos("synchronization_time", int(timestamp_ns))
    rr.set_time_sequence("timestamp", timestamp_ns)

    hand_poses_with_dt = None
    if hand_data_provider is None:
        continue
        
    hand_poses_with_dt = hand_data_provider.get_pose_at_timestamp(
        timestamp_ns=timestamp_ns,
        time_query_options=TimeQueryOptions.CLOSEST,
        time_domain=TimeDomain.TIME_CODE,
    )

    if hand_poses_with_dt is None:
        continue
    
    hand_pose_collection = hand_poses_with_dt.pose3d_collection

    for hand_pose_data in hand_pose_collection.poses.values():
        # Retrieve the handedness of the hand (i.e Left or Right)
        handedness_label = hand_pose_data.handedness_label()

        # Skeleton/Joints landmark representation (for LEFT hand)
        if hand_pose_data.is_left_hand():
            hand_landmarks = hand_data_provider.get_hand_landmarks(
                hand_pose_data
            )
            # convert landmarks to connected lines for display
            # (i.e retrieve points along the HAND LANDMARK_CONNECTIVITY as a list)
            points = [connections
                      for connectivity in LANDMARK_CONNECTIVITY
                      for connections in [[hand_landmarks[it].numpy().tolist() for it in connectivity]]]
            rr.log(
                f"world/{handedness_label}/joints",
                rr.LineStrips3D(points, radii=0.002),
            )

        #
        # Plot RIGHT hand as a Triangular Mesh representation
        #
        if hand_pose_data.is_right_hand():
            hand_mesh_vertices = hand_data_provider.get_hand_mesh_vertices(hand_pose_data)
            hand_triangles, hand_vertex_normals = hand_data_provider.get_hand_mesh_faces_and_normals(hand_pose_data)
            
            rr.log(
                f"world/{handedness_label}/mesh_faces",
                rr.Mesh3D(
                    vertex_positions=hand_mesh_vertices,
                    vertex_normals=hand_vertex_normals,
                    triangle_indices=hand_triangles,  # TODO: we could avoid sending this list if we want to save memory
                ),
            )

# Showing the rerun window
rr.notebook_show()

In [None]:
#
# Section 2.c: Object pose data
#
# Take home message:
# - Each object is associated with a Unique Identified (uid)
# - The object library enables to retrieve the 3D asset linked to this UID (a glb file)
#

from data_loaders.loader_object_library import ObjectLibrary

# Alias over the Object pose data provider
object_pose_data_provider = hot3d_data_provider.object_pose_data_provider

# Keep track of what 3D assets has been loaded/unloaded so we will load them only when needed
# So we will load them only when required for Rerun
object_cache_status = {}

# Init a rerun context
rr.init("Object pose")
rec = rr.memory_recording()

# Limit to the some timetamps
for timestamp_ns in tqdm(timestamps[100:300]):

    rr.set_time_nanos("synchronization_time", int(timestamp_ns))
    rr.set_time_sequence("timestamp", timestamp_ns)

    object_poses_with_dt = (
        object_pose_data_provider.get_pose_at_timestamp(
            timestamp_ns=timestamp_ns,
            time_query_options=TimeQueryOptions.CLOSEST,
            time_domain=TimeDomain.TIME_CODE,
        )
    )
    if object_poses_with_dt is None:
        continue

    objects_pose3d_collection = object_poses_with_dt.pose3d_collection

    # Keep a mapping to know what object has been seen, and which one has not
    object_uids = object_pose_data_provider.object_uids_with_poses
    logging_status = {x: False for x in object_uids}

    for (
        object_uid,
        object_pose3d,
    ) in objects_pose3d_collection.poses.items():

        object_name = object_library.object_id_to_name_dict[object_uid]
        object_name = object_name + "_" + str(object_uid)
        object_cad_asset_filepath = ObjectLibrary.get_cad_asset_path(
            object_library_folderpath=object_library.asset_folder_name,
            object_id=object_uid,
        )

        log_pose(pose=object_pose3d.T_world_object, label=f"world/objects/{object_name}")
        
        # Mark object has been seen (enable to know which object has been logged or not)
        # I.E and object not logged, has not been seen and will have its entity cleared for rerun
        logging_status[object_uid] = True

        # Link the corresponding 3D object to the pose
        if object_uid not in object_cache_status.keys():
            object_cache_status[object_uid] = True
            rr.log(
                f"world/objects/{object_name}",
                rr.Asset3D(
                    path=object_cad_asset_filepath,
                ),
            )

    # Rerun specifics (if an entity is disapearing, the last status is shown)
    # To compensate that , if some objects are not visible, we clear the entity
    for object_uid, displayed in logging_status.items():
        if not displayed:
            object_name = object_library.object_id_to_name_dict[object_uid]
            object_name = object_name + "_" + str(object_uid)
            rr.log(
                f"world/objects/{object_name}",
                rr.Clear.recursive(),
            )
            if object_uid in object_cache_status.keys():
                del object_cache_status[object_uid]  # We will log the mesh again

# Showing the rerun window
rr.notebook_show()

In [None]:
#
# Section 3: Object/Hand bounding boxes
#
# Take home message
# - Bounding box data is queried by TIMESTAMP and STREAM_ID and contains amodal bbox and visibility ratio
# - Unique Identifiers are used to label objects (uid) -> they can be mapped to literal name by using the object_library
#

from projectaria_tools.core.stream_id import StreamId
from data_loaders.ObjectBox2dDataProvider import (  # @manual
    ObjectBox2dCollectionWithDt,
    ObjectBox2dProvider,
)
import matplotlib.pyplot as plt # Used to display consistent colored Bounding Boxes contours

# Alias over the Object box2d data provider and Device data provider (to get image data)
object_box2d_data_provider = hot3d_data_provider.object_box2d_data_provider
device_data_provider = hot3d_data_provider.device_data_provider

# Retrieve a distinct color mapping for object bounding box
# by using a colormap (i.e associate a object_uid to a specific color)
object_uids = list(object_box2d_data_provider.object_uids) # list of available object_uid used to map them to [0, 1, 2, ...] indices
object_box2d_colors = None
if object_box2d_data_provider is not None:
    color_map = plt.get_cmap("viridis")
    object_box2d_colors = color_map(
        np.linspace(0, 1, len(object_box2d_data_provider.object_uids))
    )
else:
    print("This section expect to have valid bounding box data")


# Init a rerun context
rr.init("Object bounding boxed and visibility ratio")
rec = rr.memory_recording()

left_hand_pose_translations = []
right_hand_pose_translations = []

# Use SLAM-LEFT image (exists for both Aria and Quest files)
stream_id = StreamId("1201-1")
if stream_id not in object_box2d_data_provider.stream_ids:
    print(f"The object_box2d_data_provider does not have data for this StreamId: {stream_id}")


# Limit to the some timetamps
for timestamp_ns in tqdm(timestamps[100:200]):

    rr.set_time_nanos("synchronization_time", int(timestamp_ns))
    rr.set_time_sequence("timestamp", timestamp_ns)

    # Retrieve data for this timestamp and specific stream_id
    box2d_collection_with_dt = (
        object_box2d_data_provider.get_box2d_at_timestamp(
            stream_id=stream_id,
            timestamp_ns=timestamp_ns,
            time_query_options=TimeQueryOptions.CLOSEST,
            time_domain=TimeDomain.TIME_CODE,
        )
    )
    if box2d_collection_with_dt is None:
        continue
    if (
        box2d_collection_with_dt is None
        and box2d_collection_with_dt.box2d_collection or None
    ):
        continue
    
    # We have valid data, returned as a collection
    # i.e for each object_uid, we retrieve its BBOX and visibility
    object_uids_at_query_timestamp = (
        box2d_collection_with_dt.box2d_collection.object_uid_list
    )

    for object_uid in object_uids_at_query_timestamp:
        object_name = object_library.object_id_to_name_dict[object_uid]
        axis_aligned_box2d = box2d_collection_with_dt.box2d_collection.box2ds[object_uid]
        bbox = axis_aligned_box2d.box2d
        visibility_ratio = axis_aligned_box2d.visibility_ratio
        if bbox is None:
            continue

        rr.log(
            f"{stream_id}_raw/bbox/{object_name}",
            rr.Boxes2D(
                mins=[bbox.left, bbox.top],
                sizes=[bbox.width, bbox.height],
                colors=object_box2d_colors[object_uids.index(object_uid)],
            ),
        )
        rr.log(f"visibility_ratio/{object_name}", rr.Scalar(visibility_ratio))
        
        # Log the corresponding image
        image_stream_label = device_data_provider.get_image_stream_label(stream_id)
        # Retrieve the image data for a given timestamp
        image_data = device_data_provider.get_image(timestamp_ns, stream_id)
        # Visualize the image data (it's a numpy array)
        log_image(label=f"{stream_id}_raw", image=image_data)

# Showing the rerun window
rr.notebook_show()

