## Import Libraries and Utility Functions

#### Import Libraries

In [168]:
from camera.zed_ros.zed_ros import ZedRos
import cv2
import numpy as np
import glob
import re
import open3d as o3d
import plotly.graph_objs as go
import plotly.express as px
import os
import copy
import hydra
from omegaconf import DictConfig, OmegaConf
from hydra.core.global_hydra import GlobalHydra


#### Plotting points

In [169]:
def plot_multi_np(plist):
    """
    Plots multiple point clouds in the same plot using plotly
    Assumes points are in meters coordinates.
    
    Args: 
        plist: list of np arrays of shape (N, 3)
    """
    colors = [
        '#1f77b4',  # muted blue
        '#ff7f0e',  # safety orange
        '#2ca02c',  # cooked asparagus green
        '#d62728',  # brick red
        '#9467bd',  # muted purple
    ]
    skip = 1
    go_data = []
    for i in range(len(plist)):
        p_dp = plist[i]
        plot = go.Scatter3d(x=p_dp[::skip,0], y=p_dp[::skip,1], z=p_dp[::skip,2], 
                     mode='markers', marker=dict(size=2, color=colors[i],
                     symbol='circle'))
        go_data.append(plot)
 
    layout = go.Layout(
        scene=dict(
            aspectmode='data',
        ),
        height=1200,
        width=1200,
    )
    fig = go.Figure(data=go_data, layout=layout)
    
    colors = ['red', 'green', 'blue']  # X, Y, Z axis colors
    
    fig = go.Figure()
    
    T = []
    T.append(np.eye(4))

    for tf in T:
        origin = tf[:3, 3]
        axes = tf[:3, :3]

        for i in range(3):
            axis_end = origin + 0.3*axes[:, i]
            fig.add_trace(go.Scatter3d(
                x=[origin[0], axis_end[0]],
                y=[origin[1], axis_end[1]],
                z=[origin[2], axis_end[2]],
                mode='lines',
                line=dict(color=colors[i], width=4),
                name='Axis ' + str(i+1)
            ))

    for plot in go_data:
        fig.add_trace(plot)
    
    # add axis lines and camera view
    fig.update_layout(scene=dict(
        xaxis=dict(title='X'),
        yaxis=dict(title='Y'),
        zaxis=dict(title='Z'),
        camera = dict(
                      eye=dict(x=-1.30, y=0, z=-.25),
                      center=dict(x=0., y=0, z=-0.25),
                      up=dict(x=0, y=0, z=1),
                     )
        ),
        height=800,
        width=1200,
    )    
    
    fig.show()

#### Transform points using a TF matrix

In [170]:
def transform_pcd(pcd, tf=None, tf_filepath=None):
    """
    Transforms a point cloud using a transformation matrix
    
    Args:
        pcd: open3d.geometry.PointCloud
        tf: 4x4 numpy array
        tf_filepath: str, path to a .npy file containing a 4x4 transformation matrix
        
    Returns:
        open3d.geometry.PointCloud
    """
    if tf_filepath is not None:
        tf = np.load(tf_filepath)
        
    if tf is not None:
        pcd.transform(tf)
        
    return pcd

#### Crop Regions

In [171]:
# table boundaries
table_boundaries = {
    'min': np.array([-0.5, -2, -1]),
    'max': np.array([1.0, 2, 5])
}

# board boundaries
board_boundaries = {
    'min': np.array([0.0, -2, 0.15]),
    'max': np.array([0.5, -0.0, 0.3])
}

# gripper boundaries
gripper_boundaries = {
    'min': np.array([0.2, -0.5, 0.3]),
    'max': np.array([0.35, -0.05, .5])
}

ih_camera_focus = {
    'min': np.array([-0.053, -0.028, 0.0]),
    'max': np.array([0.053, 0.025, 0.3])
}

gripper_focus = {
    'min': np.array([-0.2, -0.06, 0.0]),
    'max': np.array([0.2, 0.06, 0.20]),

}

boundaries = {
    'table': table_boundaries,
    'board': board_boundaries,
    'gripper': gripper_boundaries,
    'ih_camera_focus': ih_camera_focus,
    'gripper_focus': gripper_focus
}

In [172]:
def crop_points(points, boundary: str):
    """
    Crops a point cloud based on a boundary
    Assmes points are in meters coordinates.
    
    Args:
        points:     np array of shape (N, 3)
        boundary:   str, key in boundaries dict. 
                    Predefined boundaries are 'table', 'board', 'gripper'
                    For custom boundaries, add to `boundaries` dict
        
    Returns:
        np array of shape (M, 3)
    """
    if boundary in boundaries:
        world_boundaries = boundaries[boundary]
    else:
        print('Bounds not found')
        return

    #deep copy points
    tmp_points = np.copy(points)
    
    tmp_points[np.isinf(tmp_points)] = np.nan
    tmp_points[np.where(tmp_points[:, 0] < world_boundaries['min'][0])] = np.nan
    tmp_points[np.where(tmp_points[:, 1] < world_boundaries['min'][1])] = np.nan
    tmp_points[np.where(tmp_points[:, 2] < world_boundaries['min'][2])] = np.nan
    tmp_points[np.where(tmp_points[:, 0] > world_boundaries['max'][0])] = np.nan
    tmp_points[np.where(tmp_points[:, 1] > world_boundaries['max'][1])] = np.nan
    tmp_points[np.where(tmp_points[:, 2] > world_boundaries['max'][2])] = np.nan
    tmp_points = tmp_points[np.where(~np.isnan(tmp_points).any(axis=1))]
    
    return tmp_points

## Visualize Pointcloud(s)

In [183]:
file_path = "/home/mfi/repos/rtc_vision_toolbox/data/calibration_data/rs/T_eef2camera.npy"
T_test = np.load(file_path)
print(np.round(T_test, 3),'\n')
T_test[0, 3] = T_test[0, 3] - 0.015
T_test[2,3] = T_test[2,3] + 0.031
print(np.round(T_test, 3))
np.save("/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/08-02-wp/calib_data/T_eef2cam3_new.npy", T_test)

[[-0.017 -1.    -0.026 -0.055]
 [ 1.    -0.017 -0.003 -0.012]
 [ 0.003 -0.026  1.     0.043]
 [ 0.     0.     0.     1.   ]] 

[[-0.017 -1.    -0.026 -0.07 ]
 [ 1.    -0.017 -0.003 -0.012]
 [ 0.003 -0.026  1.     0.074]
 [ 0.     0.     0.     1.   ]]


In [174]:
def crop_point_cloud(points, crop_box) -> o3d.geometry.PointCloud:

    mask = np.logical_and.reduce(
        (points[:, 0] >= crop_box['min'][0],
            points[:, 0] <= crop_box['max'][0],
            points[:, 1] >= crop_box['min'][1],
            points[:, 1] <= crop_box['max'][1],
            points[:, 2] >= crop_box['min'][2],
            points[:, 2] <= crop_box['max'][2])
    )
    points = points[mask]

    return points

In [184]:
def prepare_dataset(cfg) -> None:

    project_dir = "/home/mfi/repos/rtc_vision_toolbox/"
    data_dir = os.path.join(project_dir, cfg.data_dir)
    num_demos = 1
    # num_demos = cfg.training.num_demos
    train_demos = np.ceil(
        num_demos * (1-cfg.training.test_ratio)).astype(int)
    test_demos = num_demos - train_demos
    action_class = cfg.training.action.class_idx
    anchor_class = cfg.training.anchor.class_idx
    
    teach_pcd_dir = os.path.join(data_dir, "teach_data/pcd_data")
    teach_pose_dir = os.path.join(data_dir, "teach_data/pose_data")
    train_save_dir = os.path.join(data_dir, "learn_data/train")
    test_save_dir = os.path.join(data_dir, "learn_data/test")

    # anchor point clouds with global coord.
    anchor_points_list = []
    action_points_list = []
    
    # for each demo:
    for demo in range(num_demos):
          
        anchor_pcd_file = os.path.join(teach_pcd_dir, f"demo{demo}_{cfg.training.anchor.view}0_{cfg.training.anchor.camera}_pointcloud.ply")
        anchor_pcd = o3d.io.read_point_cloud(anchor_pcd_file)
        anchor_pcd, _ = anchor_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)  
        anchor_pcd = anchor_pcd.uniform_down_sample(2)
        
        #scale point cloud
        anchor_points = np.asarray(anchor_pcd.points) / 1000
        anchor_pcd.points = o3d.utility.Vector3dVector(anchor_points)
        
        anchor_view_pose_file = os.path.join(teach_pose_dir, f"demo{demo}_{cfg.training.anchor.view}0_pose.npy")
        T_eef2cam_file = cfg.devices.cameras[cfg.training.anchor.camera].setup.T_eef2cam
        
        # anchor view point
        T_base2eef = np.load(anchor_view_pose_file)
        T_eef2cam = np.load(os.path.join(project_dir, T_eef2cam_file))
        T_base2cam = np.dot(T_base2eef, T_eef2cam)
        
        # target pose
        T_base2targeteef = np.load(os.path.join(teach_pose_dir, f"demo{demo}_placement_pose.npy"))
        T_ee2target = [
            [1, 0, 0, 0],
            [0, 1, 0, 0],
            [0, 0, 1, 0.212],
            [0, 0, 0, 1]
        ]        
        T_base2target = np.dot(T_base2targeteef, T_ee2target)

        # transform anchor pcd in target pose frame for easy cropping
        anchor_tf = np.dot(np.linalg.inv(T_base2target), T_base2cam)
        anchor_pcd = transform_pcd(anchor_pcd, tf=anchor_tf)

        #crop_points
        anchor_bounds = cfg.training.anchor.object_bounds
        anchor_points = np.asarray(anchor_pcd.points)
        object_bounds = {
            'min': np.array(anchor_bounds.min)/1000,
            'max': np.array(anchor_bounds.max)/1000
        }
        anchor_points = crop_point_cloud(anchor_points, object_bounds)
        anchor_points_list.append(anchor_points)
        
        for var in range(cfg.training.action.view_variations.count):
            # action point clouds @ target
            action_pcd_file = os.path.join(teach_pcd_dir, f"demo{demo}_{cfg.training.action.view}{var}_{cfg.training.action.camera}_pointcloud.ply")
            action_pcd = o3d.io.read_point_cloud(action_pcd_file)
            action_pcd, _ = action_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)
            action_pcd = action_pcd.uniform_down_sample(2)

            #scale point cloud
            action_points = np.asarray(action_pcd.points) / 1000
            action_pcd.points = o3d.utility.Vector3dVector(action_points)
            
            action_view_pose_file = os.path.join(teach_pose_dir, f"demo{demo}_{cfg.training.action.view}{var}_pose.npy")
            T_base2action = np.load(action_view_pose_file)
            T_base2cam_file = cfg.devices.cameras[cfg.training.action.camera].setup.T_base2cam
            T_base2cam = np.load(os.path.join(project_dir, T_base2cam_file))
            action_tf = np.linalg.inv(T_ee2target) @ (np.linalg.inv(T_base2action) @ T_base2cam)
            action_pcd = transform_pcd(action_pcd, tf=action_tf)
            
            action_points = np.asarray(action_pcd.points)
            action_points = action_points[action_points[:, 2] > 0]     
            action_points_list.append(action_points)
        
        # save data to train or test dir
    
    return anchor_points_list, action_points_list

In [185]:
GlobalHydra.instance().clear()
hydra.initialize(config_path="../data/demonstrations/08-02-wp/", version_base="1.3")
config: DictConfig = hydra.compose('place_object.yaml')

anchor_points_list, action_points_list = prepare_dataset(config)


In [186]:
print(len(anchor_points_list), len(action_points_list))

# plot_multi_np([anchor_points_list[0]])

1 5


In [None]:
plot_multi_np([action_points_list[4], anchor_points_list[0]])