In [None]:
%load_ext dotenv
%dotenv

from ros_edf.ros_interface import EdfRosInterface
from ros_edf.pc_utils import pcd_from_numpy, draw_geometry, reconstruct_surface
from edf.data import PointCloud, SE3, TargetPoseDemo, DemoSequence, save_demos
from edf.pc_utils import check_pcd_collision, optimize_pcd_collision

import torch
import numpy as np
import yaml
import plotly as pl
import plotly.express as ple
import open3d as o3d

# Initialize Robot Interface

In [None]:
env_interface = EdfRosInterface(reference_frame = "scene", planner_id="AnytimePathShortening")
env_interface.set_planning_time(seconds=5.)
env_interface.reset()

### Initialize dataset list

In [None]:
dataset = []

# Pick

### Observe

In [None]:
grasp_raw = env_interface.observe_eef(obs_type = 'pointcloud', update = True)
scene_raw = env_interface.observe_scene(obs_type = 'pointcloud', update = True)

### Sample Pick Pose

In [None]:
pick_poses = SE3([0.0, 0.0, 1.0, 0.0, -0.05, 0.0, 0.275])
# pre_pick_poses = SE3.multiply(pick_poses, SE3([1., 0., 0., 0., 0., 0., -0.1]))
# post_pick_poses = SE3.multiply(SE3([1., 0., 0., 0., 0., 0., 0.2]), pick_poses)
_, pre_pick_poses = optimize_pcd_collision(x=scene_raw, y=grasp_raw, 
                                           cutoff_r = 0.03, dt=0.01, eps=1., iters=50,
                                           rel_pose=pick_poses)
post_pick_poses = pre_pick_poses

### Execute Pick

In [None]:
for idx in [0]:
    pick_pose, pre_pick_pose, post_pick_pose = pick_poses[idx], pre_pick_poses[idx], post_pick_poses[idx]
    
    colcheck_r = 0.003 # Should be similar to voxel filter size
    col_check = check_pcd_collision(x=scene_raw, y=grasp_raw.transformed(pick_pose)[0], r = colcheck_r)
    print(f"Pick Pose_{idx} collision-free: {not col_check}")
    if not col_check:
        break
    
if not col_check:
    print("Found collision-free pick pose!")
else:
    raise NotImplementedError("No collision-free pick pose found!")

In [None]:
# DEBUG
# draw_geometry([scene_raw] + grasp_raw.transformed(post_pick_pose))

In [None]:
pick_result = env_interface.pick(pre_pick_pose, pick_pose, post_pick_pose)
print(f"Pick result: {pick_result}")
if pick_result == "SUCCESS":
    pick_demo = TargetPoseDemo(target_poses=pick_poses, scene_pc=scene_raw, grasp_pc=grasp_raw)
    env_interface.detach()
    env_interface.attach_placeholder() # To avoid collsion with the grasped object
else:
    raise NotImplementedError("Pick failed")

# Observe for Place

In [None]:
# Observe EEF
result = env_interface.move_to_named_target("init")
print(f"Move to End-Effector observation pose: {result}")
if result == 'SUCCESS':
    env_interface.detach()
    grasp_raw = env_interface.observe_eef(obs_type = 'pointcloud', update = True)
    env_interface.attach(obj = grasp_raw)
else:
    raise NotImplementedError(f"Failed to move to End-Effector observation pose.")

# Observe Scene
result = env_interface.move_to_named_target("observe")
print(f"Move to Scene observation pose: {result}")
if result == 'SUCCESS':
    scene_raw = env_interface.observe_scene(obs_type = 'pointcloud', update = True)
else:
    raise NotImplementedError(f"Failed to move to Scene observation pose.")


result = env_interface.move_to_named_target("init")
print(f"Come back to default pose: {result}")
if result == 'SUCCESS':
    pass
else:
    raise NotImplementedError(f"Failed to come back to default pose.")

# Place

### Sample Place Poses

In [None]:
place_poses = SE3([0.5000, -0.5000, -0.5000, -0.5000, 0.13, -0.20, 0.32])
_, pre_place_poses = optimize_pcd_collision(x=scene_raw, y=grasp_raw, 
                                            cutoff_r = 0.03, dt=0.01, eps=1., iters=5,
                                            rel_pose=place_poses)
post_place_poses = place_poses * pick_pose.inv() * pre_pick_pose

### Execute Place

In [None]:
for idx in [0]:
    place_pose, pre_place_pose, post_place_pose = place_poses[idx], pre_place_poses[idx], post_place_poses[idx]
    
    colcheck_r = 0.0015 # Should be similar to voxel filter size
    col_check = check_pcd_collision(x=scene_raw, y=grasp_raw.transformed(place_pose)[0], r = colcheck_r)
    print(f"Place Pose_{idx} collision-free: {not col_check}")
    if not col_check:
        break
    
if not col_check:
    print("Found collision-free place pose!")
else:
    raise NotImplementedError("No collision-free place pose found!")

In [None]:
# DEBUG
# draw_geometry([scene_raw] + grasp_raw.transformed(place_pose))

In [None]:
place_result = env_interface.place(pre_place_pose, place_pose, post_place_pose)
print(f"Place result: {place_result}")
if place_result == "SUCCESS":
    place_demo = TargetPoseDemo(target_poses=place_poses, scene_pc=scene_raw, grasp_pc=grasp_raw)
    env_interface.detach()
    env_interface.release()
else:
    raise NotImplementedError("Place failed")

### Save

In [None]:
demo_seq = DemoSequence(demo_seq = [pick_demo, place_demo])
dataset.append(demo_seq)
save_demos(demos=dataset, dir="demo/test_demo")