### Rapidly-exploring Random Tree Star (`RRT*`) using `UR5e`

In [1]:
import mujoco,sys
import numpy as np
import matplotlib.pyplot as plt
from functools import partial
sys.path.append('../package/helper/')
sys.path.append('../package/mujoco_usage/')
sys.path.append('../package/planning/')
from mujoco_parser import *
from transformation import *
from slider import *
from utility import *
from rrt import *
np.set_printoptions(precision=2,suppress=True,linewidth=100)
plt.rc('xtick',labelsize=6); plt.rc('ytick',labelsize=6)
%config InlineBackend.figure_format = 'retina'
%matplotlib inline
print ("MuJoCo:[%s]"%(mujoco.__version__))

MuJoCo:[3.1.1]


#### Parse `UR5e`

In [2]:
xml_path = '../asset/tabletop_manipulation/scene_ur5e_cylinders.xml'
env = MuJoCoParserClass(name='Tabletop',rel_xml_path=xml_path,verbose=True)
print ("Done.")

name:[Tabletop] dt:[0.002] HZ:[500]
n_qpos:[68] n_qvel:[60] n_qacc:[60] n_ctrl:[7]

n_body:[29]
 [0/29] [world] mass:[0.00]kg
 [1/29] [front_object_table] mass:[1.00]kg
 [2/29] [side_object_table] mass:[1.00]kg
 [3/29] [ur_base] mass:[4.00]kg
 [4/29] [ur_shoulder_link] mass:[3.70]kg
 [5/29] [ur_upper_arm_link] mass:[8.39]kg
 [6/29] [ur_forearm_link] mass:[2.27]kg
 [7/29] [ur_wrist_1_link] mass:[1.22]kg
 [8/29] [ur_wrist_2_link] mass:[1.22]kg
 [9/29] [ur_wrist_3_link] mass:[0.19]kg
 [10/29] [ur_tcp_link] mass:[0.00]kg
 [11/29] [ur_camera_mount] mass:[0.09]kg
 [12/29] [ur_d435i] mass:[0.07]kg
 [13/29] [ur_rg2_gripper_base_link] mass:[0.20]kg
 [14/29] [ur_camera_center] mass:[0.00]kg
 [15/29] [ur_rg2_gripper_finger1_finger_link] mass:[0.01]kg
 [16/29] [ur_rg2_gripper_finger1_inner_knuckle_link] mass:[0.01]kg
 [17/29] [ur_rg2_gripper_finger1_finger_tip_link] mass:[0.01]kg
 [18/29] [ur_rg2_gripper_finger2_finger_link] mass:[0.01]kg
 [19/29] [ur_rg2_gripper_finger2_inner_knuckle_link] mass:[

#### Select which object to pick

In [3]:
# Reset
np.random.seed(seed=0)
env.reset()

# Initialize UR5e
joint_names = ['shoulder_pan_joint','shoulder_lift_joint','elbow_joint',
               'wrist_1_joint','wrist_2_joint','wrist_3_joint']
q0 = np.deg2rad([-35.63,-78.2,153.42,-75.22,54.37,-0.])
p0 = env.get_p_body(body_name='ur_base')+np.array([0.5,0.0,0.05])
R0 = rpy_deg2r([-180,0,90])
env.forward(q=q0,joint_names=joint_names) # update UR5e pose

# Set cylinder object poses
obj_names = env.get_body_names(prefix='obj_')
n_obj = len(obj_names)
obj_xyzs = sample_xyzs(
    n_sample  = n_obj,
    x_range   = [+0.5,+1.0],
    y_range   = [-0.45,+0.45],
    z_range   = [0.8,0.81],
    min_dist  = 0.2,
    xy_margin = 0.0,
)
for obj_idx in range(n_obj):
    env.set_p_base_body(body_name=obj_names[obj_idx],p=obj_xyzs[obj_idx,:])
    env.set_R_base_body(body_name=obj_names[obj_idx],R=np.eye(3,3))
env.set_geom_color(body_names_to_color=obj_names,rgba_list=get_colors(n_obj))
env.forward() # update object poses

# Loop
env.init_viewer(title='Double click which object to pick',
                transparent=False,distance=3.0)
while env.is_viewer_alive():
    # Update
    env.step(
        ctrl        = np.append(q0,2.0),
        joint_names = joint_names+['gripper_finger2_joint'],
    )
    xyz_click,flag_click = env.get_xyz_left_double_click()
    if flag_click: # if double clicked, get the closest body
        body_name_clicked,p_body_clicked = env.get_body_name_closest(
            xyz_click,body_names=obj_names)
    # Render
    if env.loop_every(tick_every=10):
        env.plot_time()
        if xyz_click is not None: 
            env.plot_sphere(p=xyz_click,r=0.01,rgba=(1,0,0,0.5))
            env.plot_body_T(body_name=body_name_clicked,axis_len=0.25)
        # env.plot_contact_info(r_arrow=0.005,h_arrow=0.1,plot_sphere=False)
        env.render()
# Save state
if xyz_click is not None:
    print ("[%s] has been selected to pick."%(body_name_clicked))
    print (" object base position is %s."%(p_body_clicked))
    state = env.get_state() # get env state
else:
    print ("No object has been selected. Auto-selecting first object.")
    body_name_clicked = obj_names[0]
    p_body_clicked = obj_xyzs[0,:]
    print ("[%s] has been selected to pick."%(body_name_clicked))
    print (" object base position is %s."%(p_body_clicked))
    state = env.get_state() # get env state
print ("Done.")

No object has been selected. Auto-selecting first object.
[obj_cylinder_01] has been selected to pick.
 object base position is [0.77 0.19 0.81].
Done.


#### Solve IK to compute the grasp pose and interpolate

In [15]:
# Restore state and solve IKs
env.set_state(**state,step=True)
q_grasp,_,_ = solve_ik(
    env                = env,
    joint_names_for_ik = joint_names,
    body_name_trgt     = 'ur_tcp_link',
    q_init             = q0,
    p_trgt             = p_body_clicked + np.array([0,0,0.1]),
    R_trgt             = R0,
)
# Interpolate and smooth joint positions
times,traj_interp,traj_smt,times_anchor = interpolate_and_smooth_nd(
    anchors   = np.vstack([q0,q_grasp]),
    HZ        = env.HZ,
    vel_limit = np.deg2rad(30),
)
L = len(times)

# Open the gripper
qpos = env.get_qpos_joints(joint_names=joint_names)
for tick in range(100):
    env.step( # dynamic update
        ctrl        = np.append(qpos,2.0), # 0.0: close, 2.0: full open
        joint_names = joint_names+['gripper_finger2_joint'])

# Set collision configurations 
robot_body_names = env.get_body_names(prefix='ur_')
obj_body_names   = env.get_body_names(prefix='obj_')
env_body_names   = ['front_object_table','side_object_table']
# (optional) exclude 'body_name_clicked' from 'obj_body_names'
obj_body_names.remove(body_name_clicked)

# Loop
env.init_viewer(
    title='Checking collision while moving to the grasping pose',
    transparent=False,distance=3.0)
tick = 0
while env.is_viewer_alive():
    # Update
    time = times[tick]
    qpos = traj_smt[tick,:]
    env.forward(q=qpos,joint_names=joint_names)

    # Check collsision 
    is_feasible = is_qpos_feasible(
        env,qpos,joint_names,
        robot_body_names,obj_body_names,env_body_names)
    
    # Render
    if (tick%5)==0 or tick==(L-1):
        env.plot_text(p=np.array([0,0,1]),
                      label='[%d/%d] time:[%.2f]sec'%(tick,L,time))
        env.plot_body_T(body_name='ur_tcp_link',axis_len=0.1)
        env.plot_body_T(body_name=body_name_clicked,axis_len=0.1)
        if not is_feasible:
            env.plot_sphere(
                p=env.get_p_body(body_name='ur_tcp_link'),r=0.1,rgba=(1,0,0,0.5))
        env.render()
    # Proceed
    if tick < (L-1): tick = tick + 1
    # if tick == (L-1): tick = 0
    
print ("Done.")

Done.


#### Get collision-free path from `q0` to `q_grasp` using `RRT*`

In [16]:
is_point_feasible = partial(
    is_qpos_feasible,
    env              = env,
    joint_names      = joint_names,
    robot_body_names = robot_body_names,
    obj_body_names   = obj_body_names,
    env_body_names   = env_body_names,
) # function of 'qpos'
is_point_to_point_connectable = partial(
    is_qpos_connectable,
    env              = env,
    joint_names      = joint_names,
    robot_body_names = robot_body_names,
    obj_body_names   = obj_body_names,
    env_body_names   = env_body_names,
    deg_th           = 5.0,
) # function of 'qpos1' and 'qpos2'
print ("Ready.")

Ready.


In [22]:
point_min = -np.pi*np.ones(len(joint_names))
point_max = +np.pi*np.ones(len(joint_names))
rrt = RapidlyExploringRandomTreesStarClass(
    name      ='RRT-Star-UR',
    point_min = point_min,
    point_max = point_max,
    goal_select_rate = 0.05,         # 0.01 -> 0.05 (목표 선택 확률 증가)
    steer_len_max    = np.deg2rad(10), # 2도 -> 10도 (더 큰 스테핑)
    search_radius    = np.deg2rad(30), # 15도 -> 30도 (더 넓은 탐색)
    norm_ord         = 2, # 2,np.inf,
    n_node_max       = 20000,        # 10000 -> 20000 (더 많은 노드)
    TERMINATE_WHEN_GOAL_REACHED = False, SPEED_UP = True,
)

point_root,point_goal = q0,q_grasp
rrt.init_rrt_star(point_root=point_root,point_goal=point_goal,seed=0)
while True:
    # Randomly sample a point
    while True:
        if np.random.rand() <= rrt.goal_select_rate: 
            point_sample = rrt.point_goal
        else:
            point_sample = rrt.sample_point() # random sampling
        if is_point_feasible(qpos=point_sample): break

    # Get the nearest node ('node_nearest') to 'point_sample' from the tree
    node_nearest = rrt.get_node_nearest(point_sample)
    point_nearest = rrt.get_node_point(node_nearest)

    # Steering towards 'point_sample' to get 'point_new'
    point_new,cost_new = rrt.steer(node_nearest,point_sample)
    if point_new is None: continue # if the steering point is feasible

    if is_point_feasible(qpos=point_new) and \
        is_point_to_point_connectable(qpos1=point_nearest,qpos2=point_new):

        # Assign 'node_min' and initialize 'cost_min' with 'cost_new'
        node_min = node_nearest.copy()
        cost_min = cost_new

        # Select a set of nodes near 'point_new' => 'nodes_near'
        nodes_near = rrt.get_nodes_near(point_new)
    
        # For all 'node_near' find 'node_min'
        for node_near in nodes_near:
            point_near,cost_near = rrt.get_node_point_and_cost(node_near)
            if is_point_to_point_connectable(qpos1=point_near,qpos2=point_new):
                cost_prime = cost_near + rrt.get_dist(point_near,point_new)
                if cost_prime < cost_min:
                    cost_min = cost_near + rrt.get_dist(point_near,point_new)
                    node_min = node_near
        
        # Add 'node_new' and connect it with 'node_min'
        node_new = rrt.add_node(point=point_new,cost=cost_min,node_parent=node_min)

        # New node information for rewiring
        point_new,cost_new = rrt.get_node_point_and_cost(node_new)

        # Rewire
        for node_near in nodes_near:
            if node_near == 0: continue
            if rrt.get_node_parent(node_near) == node_new: continue
            point_near,cost_near = rrt.get_node_point_and_cost(node_near)
            cost_check = cost_new+rrt.get_dist(point_near,point_new)
            if (cost_check < cost_near) and \
                is_point_to_point_connectable(qpos1=point_near,qpos2=point_new):
                rrt.replace_node_parent(node=node_near,node_parent_new=node_new)

        # Re-update cost of all nodes
        if rrt.SPEED_UP: node_source = node_min
        else: node_source = 0
        rrt.update_nodes_cost(node_source=node_source,VERBOSE=False)

    # Print
    n_node = rrt.get_n_node()
    if (n_node % 1000 == 0) or (n_node == (rrt.n_node_max)):
        cost_goal = rrt.get_cost_goal() # cost to goal
        print ("n_node:[%d/%d], cost_goal:[%.5f]"%
               (n_node,rrt.n_node_max,cost_goal))
    
    # Terminate condition (if applicable)
    if n_node >= rrt.n_node_max: break # max node
    if (rrt.get_dist_to_goal() < 1e-6) and rrt.TERMINATE_WHEN_GOAL_REACHED: break
    
print ("Done.")

n_node:[1000/20000], cost_goal:[inf]
n_node:[2000/20000], cost_goal:[inf]
n_node:[3000/20000], cost_goal:[inf]
n_node:[4000/20000], cost_goal:[inf]
n_node:[5000/20000], cost_goal:[inf]
n_node:[6000/20000], cost_goal:[inf]
n_node:[7000/20000], cost_goal:[inf]
n_node:[8000/20000], cost_goal:[inf]
n_node:[9000/20000], cost_goal:[inf]
n_node:[10000/20000], cost_goal:[inf]
n_node:[11000/20000], cost_goal:[inf]
n_node:[12000/20000], cost_goal:[inf]
n_node:[13000/20000], cost_goal:[inf]
n_node:[14000/20000], cost_goal:[inf]
n_node:[15000/20000], cost_goal:[inf]
n_node:[16000/20000], cost_goal:[inf]
n_node:[17000/20000], cost_goal:[inf]
n_node:[18000/20000], cost_goal:[inf]
n_node:[19000/20000], cost_goal:[inf]
n_node:[20000/20000], cost_goal:[inf]
Done.


#### Animate RRT* planning results

In [23]:
# Get joint indices
# from 'start' to the point closest to the 'goal'
node_check = rrt.get_node_nearest(rrt.point_goal)
node_list = [node_check]
while node_check:
    node_parent = rrt.get_node_parent(node_check)
    node_list.append(node_parent)
    node_check = node_parent
node_list.reverse()
print ("node_list:%s"%(node_list))

# Get joint trajectories
q_anchors = np.zeros((len(node_list),len(joint_names)))
for idx,node in enumerate(node_list):
    qpos = rrt.get_node_point(node)
    q_anchors[idx,:] = qpos

times_interp,q_interp,_,_ = interpolate_and_smooth_nd(
    anchors   = q_anchors,
    HZ        = env.HZ,
    acc_limit = np.deg2rad(10),
)
L = len(times_interp)
print ("len(node_list):[%d], L:[%d]"%(len(node_list),L))

# Animate
env.reset()
env.set_state(**state,step=True)
env.init_viewer()
tick = 0
while env.is_viewer_alive():
    # Update
    env.forward(q=q_interp[tick,:],joint_names=joint_names)
    # Render
    if tick%20 == 0 or tick == (L-1):
        env.plot_text(p=np.array([0,0,1]),label='tick:[%d/%d]'%(tick,L))
        env.render()
    # Increase tick
    if tick < (L-1): tick = tick + 1

print ("Done.")

node_list:[0, 2, 9, 16, 43, 126, 167]
len(node_list):[7], L:[4421]
Done.
