# DITTO Minimal Viable Example

This notebook tests an imitation system using other demonstrations as pseudo live views.

In [None]:
%reload_ext autoreload
%autoreload 2

In [None]:
import sys
from pathlib import Path
from typing import List, Dict

import ipywidgets
import numpy as np
import open3d as o3d
from tqdm import tqdm
import matplotlib.pyplot as plt

import casino
from DITTO.data import Hands23Dataset, get_all_runs
from DITTO.config import BASE_RECORDING_PATH, TIME_STEPS
from DITTO.tracking_3D import Step3DMethod


# Activate the interactive stuff
# https://github.com/microsoft/vscode-jupyter/wiki/Using-%25matplotlib-widget-instead-of-%25matplotlib-notebook,tk,etc
import ipympl
%matplotlib widget
# %matplotlib inline

In [None]:
# Notebook settings
PLOT_3D_TRAJECTORIES = True

session_id = "coke_tray"
all_runs = get_all_runs(only_keywords=[str(session_id)])

loaders: List[Hands23Dataset] = []
for ep in tqdm(all_runs, desc="Loading"):
    loaders.append(Hands23Dataset(session_id / ep, lazy_loading=True))

# loaders: List[Hands23Dataset] = [
#    Hands23Dataset(session_id / "000"),
#    Hands23Dataset(session_id / "001")
# Hands23Dataset(session_id / "004") # Demo --> Live pose estimation is not optimal
# Hands23Dataset(session_id / "001") # Demo --> Live pose estimation is not optimal
# ]

print(f"Total runs {len(loaders)}")

## Initialize Object Trajectories

In [None]:
from DITTO.trajectory import Trajectory

num_frames = TIME_STEPS  # number of frames through which we compute flow
trajectories: Dict[int, Trajectory] = {}
for demonstration_index in tqdm(range(len(loaders))):
    trajectories[demonstration_index] = Trajectory.from_hands23(loaders[demonstration_index], n_frames=num_frames)

# We could pre compute trajectories with .trajectory_2D and .trajectory_3D

## Show Recordings

Using an interactive viewer.

In [None]:
from DITTO.vis_helpers import overlay_mask_edge


plt.close("all")
fig, ax = plt.subplots(1, figsize=(8, 6))
fig.suptitle("Demonstration Frames")
ax.set_axis_off()


def update(demo_index, frame_index):
    demo_len = loaders[demo_index].get_len()
    if frame_index >= demo_len:
        print(f"invalid frame index: {frame_index}, demo length: {demo_len}")
        frame_index = demo_len - 1
    image = loaders[demo_index].get_rgb(frame_index)
    mask = loaders[demo_index].get_object_mask(frame_index)[:, :, 0]
    image = overlay_mask_edge(image, mask)

    if loaders[demo_index].has_goal:
        goal_mask = loaders[demo_index].get_goal_mask(0)[:, :, 0]
        image = overlay_mask_edge(image, goal_mask, color=(255, 0, 0))

    image_h = ax.imshow(image)
    # image_h.set_data(image)
    fig.canvas.draw_idle()
    fig.canvas.flush_events()
    # return fig


max_frames_i = max([len(loader) for loader in loaders]) - 1
slider_w = ipywidgets.widgets.IntSlider(
    min=0, max=len(loaders) - 1, step=1, value=0, layout=ipywidgets.Layout(width="70%")
)
slider_i = ipywidgets.widgets.IntSlider(
    min=0, max=max_frames_i, step=1, value=0, layout=ipywidgets.Layout(width="70%")
)
ipywidgets.interact(update, demo_index=slider_w, frame_index=slider_i)

## Show 3D Traj.

Find the full 3D pose, with rotations, by looking at sequential frames of the demonstration video, and fitting transformations.

This uses RANSAC, however the parameters for this still need to be tuned.

The pointcloud poses look quite good, but there is quite a bit of variance in the pose measurements.

In [None]:
demo_index = 0
live_indices = set(range(len(loaders)))
live_indices.remove(demo_index)
# For faster: only use the first one
live_indices = set([live_indices.pop()])

In [None]:
# If you want to use a different step method, adopt below
# from DITTO.tracking_3D import Step3DMethod

# trajectories[demo_index]._step_function_3D = Step3DMethod.flow_ransac
# trajectories[demo_index]._warp_function_3D = Step3DMethod.loftr_ransac
# trajectories[demo_index].reset_cached()

In [None]:
# Plot relative demo trajectory
o3d_objects = []
print(loaders[demo_index].recording_path)

for idx, transform in enumerate(trajectories[demo_index].trajectory_3D):
    abs_point_from_2_5_D = trajectories[demo_index].lifted_2D_trajectory[
        idx, 0, :  # This is the mean of the inital mask tracked using flow
    ]

    print(
        f"Point Error [{idx:03d}]: {np.linalg.norm(transform[:3, 3] - abs_point_from_2_5_D) * 1000:.02f} [mm]"
    )

    o3d_objects.append(
        casino.visualization.get_o3d_coordinate_frame(scale=0.1, transform=transform)
    )
    o3d_objects.append(
        casino.visualization.get_o3d_coordinate_frame(
            scale=0.05, position=abs_point_from_2_5_D
        )
    )

pcd = loaders[demo_index].get_pointcloud_o3d(0)

o3d.visualization.draw_geometries(
    [
        pcd,
        *o3d_objects,
    ]
)

In [None]:
# Compare the 3D trajectory to the lifted one.
trajectories[demo_index].trajectory_3D

trajectory_3d = trajectories[demo_index].trajectory_3D[..., :3, 3]
trajectory_lifted = trajectories[demo_index].lifted_2D_trajectory[:, 0,  :]

l2_error = np.linalg.norm(trajectory_3d - trajectory_lifted, axis=-1).mean()
print(f"Mean Point Error: {l2_error*1000:.02f} [mm] \t <- 3D vs lifted")

In [None]:
pc_start, pc_rgb_start = casino.pointcloud.get_pc(
    trajectories[demo_index].depth_start,
    trajectories[demo_index].intrinsics,
    rgb=trajectories[demo_index].rgb_start,
)

pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pc_start))
pcd.colors = o3d.utility.Vector3dVector(pc_rgb_start / 255.0)
o3d.visualization.draw_geometries(
    [
        pcd,
        casino.visualization.get_o3d_coordinate_frame(
            scale=0.1, transform=trajectories[demo_index].hand_pose_3D
        ),
    ]
)

# Warp Demo Traj. to Live View

## Warp 3D with Start

In [None]:
# Plot relative demo trajectory
for live_index in live_indices:
    o3d_objects = []

    # This could come from anywhere but for now we just use another trajecory
    rgb_live = trajectories[live_index].rgb_start
    xyz_live = casino.pointcloud.get_xyz(
        trajectories[live_index].depth_start, trajectories[live_index].intrinsics
    )

    # this takes a pre-computed trajectory, and demo mask of the initial frame, and warps it the live frame.
    warped_3D_trajectory = trajectories[demo_index].warp_3D_onto_live_frame(
        rgb_live, xyz_live, debug_vis=True
    )

    # Assuming we roughly moved the same, we can visualize the original trajectory
    live_3D_trajectory = trajectories[live_index].trajectory_3D

    for warped_transform, live_transform in zip(
        warped_3D_trajectory, live_3D_trajectory
    ):  # relative transformation between frames
        o3d_objects.append(
            casino.visualization.get_o3d_coordinate_frame(
                scale=0.1, transform=warped_transform
            )
        )

        # o3d_objects.append(
        #     casino.visualization.get_o3d_coordinate_frame(
        #         scale=0.05, transform=live_transform
        #     )
        # )

    # Get scene point cloud
    pc_start, pc_rgb_start = casino.pointcloud.get_pc(
        trajectories[live_index].depth_start,
        trajectories[live_index].intrinsics,
        rgb=trajectories[live_index].rgb_start,
    )

    pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pc_start))
    pcd.colors = o3d.utility.Vector3dVector(pc_rgb_start / 255.0)
    o3d.visualization.draw_geometries(
        [
            pcd,
            *o3d_objects,
        ]
    )


## Warp 3D with Goal

In [None]:
# Plot relative demo trajectory
for live_index in live_indices:
    o3d_objects = []

    # This could come from anywhere but for now we just use another trajecory
    rgb_live = trajectories[live_index].rgb_start
    xyz_live = casino.pointcloud.get_xyz(
        trajectories[live_index].depth_start, trajectories[live_index].intrinsics
    )

    # this takes a pre-computed trajectory, and demo mask of the initial frame, and warps it the live frame.
    warped_3D_trajectory = trajectories[demo_index].warp_3D_onto_live_frame(
        rgb_live, xyz_live, use_goal_mask=True, debug_vis=True
    )

    # Assuming we roughly moved the same, we can visualize the original trajectory
    live_3D_trajectory = trajectories[live_index].trajectory_3D

    for warped_transform, live_transform in zip(
        warped_3D_trajectory, live_3D_trajectory
    ):  # relative transformation between frames
        o3d_objects.append(
            casino.visualization.get_o3d_coordinate_frame(
                scale=0.1, transform=warped_transform
            )
        )

        # o3d_objects.append(
        #     casino.visualization.get_o3d_coordinate_frame(
        #         scale=0.05, transform=live_transform
        #     )
        # )

    # Get scene point cloud
    pc_start, pc_rgb_start = casino.pointcloud.get_pc(
        trajectories[live_index].depth_start,
        trajectories[live_index].intrinsics,
        rgb=trajectories[live_index].rgb_start,
    )

    pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pc_start))
    pcd.colors = o3d.utility.Vector3dVector(pc_rgb_start / 255.0)
    o3d.visualization.draw_geometries(
        [
            pcd,
            *o3d_objects,
        ]
    )


# Trajectory Mixing

Mixing can only be done, if there is a define goal position.

## Trajectory Mixing in 3D

In [None]:
from DITTO.mixing import mix_3D

PLOT_3D_TRAJECTORIES = True

    
# Plot relative demo trajectory
for live_index in live_indices:
    # This could come from anywhere but for now we just use another trajecory
    rgb_live = trajectories[live_index].rgb_start
    xyz_live = casino.pointcloud.get_xyz(
        trajectories[live_index].depth_start, trajectories[live_index].intrinsics
    )

    # this takes a pre-computed trajectory, and demo mask of the initial frame, and warps it the live frame.
    object_warped_3D_trajectory = trajectories[demo_index].warp_3D_onto_live_frame(
        rgb_live, xyz_live, use_goal_mask=False
    )
    goal_warped_3D_trajectory = trajectories[demo_index].warp_3D_onto_live_frame(
        rgb_live, xyz_live, use_goal_mask=True
    )

    warped_3D_trajectory = mix_3D(
        object_warped_3D_trajectory, goal_warped_3D_trajectory
    )

    live_3D_trajectory = trajectories[live_index].trajectory_3D

    # visualize
    if PLOT_3D_TRAJECTORIES:
        o3d_objects = []
        
        for warped_transform, live_transform in zip(
            warped_3D_trajectory, live_3D_trajectory
        ):  # relative transformation between frames
            o3d_objects.append(
                casino.visualization.get_o3d_coordinate_frame(
                    scale=0.1, transform=warped_transform
                )
            )

            # o3d_objects.append(
            #     casino.visualization.get_o3d_coordinate_frame(
            #         scale=0.05, transform=live_transform
            #     )
            # )

        # Get scene point cloud
        pc_start, pc_rgb_start = casino.pointcloud.get_pc(
            trajectories[live_index].depth_start,
            trajectories[live_index].intrinsics,
            rgb=trajectories[live_index].rgb_start,
        )

        pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pc_start))
        pcd.colors = o3d.utility.Vector3dVector(pc_rgb_start / 255.0)
        o3d.visualization.draw_geometries(
            [
                pcd,
                *o3d_objects,
            ]
        )


# Warp the Hand Position

In [None]:
# Plot relative demo trajectory
for live_index in live_indices:
    o3d_objects = []

    # This could come from anywhere but for now we just use another trajecory
    rgb_live = trajectories[live_index].rgb_start
    xyz_live = casino.pointcloud.get_xyz(
        trajectories[live_index].depth_start, trajectories[live_index].intrinsics
    )

    # This warpes the hand
    warped_3D_hand_pose = trajectories[demo_index].warp_3D_hand_onto_live_frame(
        rgb_live, xyz_live
    )

    # Get scene point cloud
    pc_start, pc_rgb_start = casino.pointcloud.get_pc(
        trajectories[live_index].depth_start,
        trajectories[live_index].intrinsics,
        rgb=trajectories[live_index].rgb_start,
    )

    pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pc_start))
    pcd.colors = o3d.utility.Vector3dVector(pc_rgb_start / 255.0)
    o3d.visualization.draw_geometries(
        [
            pcd,
            casino.visualization.get_o3d_coordinate_frame(
                scale=0.1, transform=warped_3D_hand_pose
            ),
        ]
    )
