# Implement RRT and its variant on UR5

In [None]:
import magic_donotload

In [None]:
import math
import time
import pinocchio as pin
import hppfcl
import numpy as np
import example_robot_data as robex
import matplotlib.pylab as plt; plt.ion()
from utils.meshcat_viewer_wrapper import MeshcatVisualizer

In [None]:
import numpy as np

from utils.datastructures.storage import Storage
from utils.datastructures.pathtree import PathTree
from utils.datastructures.mtree import MTree

from tp4.collision_wrapper import CollisionWrapper

## Load UR5

In [None]:
robot = robex.load('ur5')
robot.gmodel = robot.collision_model
colwrap = CollisionWrapper(robot)

In [None]:
a = robot.placement(robot.q0, 6)  # Placement of the end effector joint.
b = robot.framePlacement(robot.q0, 22)  # Placement of the end effector tip.

tool_axis = b.rotation[:, 2]  # Axis of the tool

In [None]:
NQ = robot.model.nq
NV = robot.model.nv  # for this simple robot, NV == NQ

In [None]:
from utils.meshcat_viewer_wrapper import MeshcatVisualizer, colors  # noqa: E402

viz = MeshcatVisualizer(robot)

In [None]:
viz.viewer.jupyter_cell()

In [None]:
q_i = np.array([-1., -1.5, 2.1, -.5, -.5, 0])
q_g = np.array([3.1, -1., 1, -.5, -.5, 0])
viz.display(q_g)

## Create obstacle with environments

In addition of displaying the obstacles we need to add it for collision checking with FCL !

In [None]:
def add_obstacle(geom_obj):
    col_model = robot.collision_model
    col_model.addGeometryObject(geom_obj)

    # Add check collisions between the robot and obstacles
    robot_n_joints = 6
    check_collision = range(robot_n_joints)
    n_geom_model = len(col_model.geometryObjects)
    for collision_id in check_collision:
        col_model.addCollisionPair(
            pin.CollisionPair(collision_id, n_geom_model - 1)
        )

In [None]:
def addCylinder(name,radius,length,placement,color=colors.red):
    '''Create a Pinocchio::FCL::Capsule to be added in the Geom-Model. '''
    ### They should be capsules ... but hppfcl current version is buggy with Capsules...
    #hppgeom = hppfcl.Capsule(radius,length)
    hppgeom = hppfcl.Cylinder(radius,length)
    geom = pin.GeometryObject(name,0,hppgeom,placement)
    geom.meshColor = np.array(color)
    add_obstacle(geom)
    # Add for viz
    viz.addCylinder(name, length, radius, color)
    viz.applyConfiguration(name,placement)
    return geom

In [None]:
# Add a red box in the viewer
radius = 0.1
length = 1.

cylID = "world/cyl1"
placement = pin.SE3(
    pin.Quaternion(1, 1,0,0).normalized().matrix(),
    np.array([-.5, 0.2, 0.5])
)
addCylinder(cylID,radius,length,placement,color=[.7,.7,0.98,1])


cylID = "world/cyl2"
placement = pin.SE3(
    pin.Quaternion(1, 1,0,0).normalized().matrix(),
    np.array([-.5, -0.2, 0.5])
)
addCylinder(cylID,radius,length,placement,color=[.7,.7,0.98,1])


## Implement everything needed for RRT

In [None]:
class System():

    def __init__(self):
        self.nq = 6
    
    def metric(self, q1, q2):
        e = np.mod(np.abs(q1 - q2), 2 * np.pi)
        e[e > np.pi] = 2 * np.pi - e[e > np.pi]
        return np.linalg.norm(e)

    def random_config(self, free=True):
        q = 2 * np.pi * np.rand(6) - np.pi
        if not free:
            return q
        while self.is_colliding(q):
            q = 2 * np.pi * np.rand(6) - np.pi
        return q

    def is_colliding(self, q):
        # See TP4
        colwrap.computeCollisions(q)
        collisions = colwrap.getCollisionList()
        return (len(collisions) > 0)

    def get_path(self, q1, q2, l_min=None, l_max=None, eps=1e-2):
        q1 = np.mod(q1 + np.pi, 2 * np.pi) - np.pi
        q2 = np.mod(q2 + np.pi, 2 * np.pi) - np.pi

        diff = q2 - q1
        query = np.abs(diff) > np.pi
        q2[query] = q2[query] - np.sign(diff) * 2 * np.pi

        d = self.metric(q1, q2)
        if d < eps:
            return False, None
        
        if l_min is not None or l_max is not None:
            new_d = np.clip(d, l_min, l_max)
        else:
            new_d = d
            
        N = new_d / eps + 2

        N_min = max(l_min / eps, 1)

        return np.linspace(q1, q1 + (q2 - q1) * new_d / d, N)
        
    def is_free_path(self, q1, q2, l_min=0.1, l_max=0.5, eps=1e-2):
        q_path = selff.get_path(q1, q2, l_min, l_max, eps)
        for i in range(N):
            if self.is_colliding(q_path[i]):
                break
        if i < N_min:
            return False, None
        if i == N - 1:
            return True, q
        return True, q_path[:i]
        
    def display(self, q1, q2, radius=0.1, color=[1.,0.,0.,1]):
        M1 = robot.framePlacement(q1, 22)  # Placement of the end effector tip.
        M2 = robot.framePlacement(q1, 22)  # Placement of the end effector tip.
        name = f"world/sph{self.display_count}"
        viz.addSphere(name, radius, color)
        viz.applyConfiguration(name,placement)

        self.display_count +=1
        

    def display_motion(self, qs, step=1e-1):
        # Given a point path display the smooth movement
        for i in range(len(qs)):
            for q in self.get_path(qs[i], qs[i+1])[:-1]:
                viz.display(q)
                time.sleep(step)
        viz.display(qs[-1])


## RRT implementation

In [None]:
class RRT():
    """
    Can be splited into RRT base because different rrt
    have factorisable logic
    """
    def __init__(
        self,
        system,
        node_max=500000,
        iter_max=1000000,
        steer_max_length=1.,
        steer_delta=2e-1,
    ):
        self.datastructure_cls = MTree
        self.datastructure_kwargs = {}
        # params
        self.system = system
        self.node_max = node_max
        self.iter_max = iter_max
        self.steer_max_length = steer_max_length
        self.steer_delta = steer_delta
        # intern
        self.NNtree = None
        self.storage = None
        self.pathtree = None
        # The distance function will be called on N, dim object
        self.real_distance = self.system.metric
        self.distance
        # Internal for computational_opti
        self._candidate = None
        self._cached_dist_to_candidate = None

    def distance(self, q1_idx, q2_idx):
        if isinstance(q2_idx, int):
            if q1_idx == q2_idx:
                return 0.
            if q1_idx == -1 or q2_idx == -1:
                if q2_idx == -1:
                    q1_idx, q2_idx = q2_idx, q1_idx
                if q2_idx not in self._cached_dist_to_candidate:
                    self._cached_dist_to_candidate[q2_idx] = self.real_distance(
                        self._candidate, self.storage[q2_idx]
                    )
                return self._cached_dist_to_candidate[q2_idx]
            return self.real_distance(self.storage[q1_idx], self.storage[q2_idx])
        if q1_idx == -1:
            q = self._candidate
        else:
            q = self.storage[q1_idx]
        return self.real_distance(q, self.storage[q2_idx])

    def new_candidate(self):
        found = False
        while not found:
            q = self.system.random_config()
        self._candidate = q
        self._cached_dist_to_candidate = {}
        return q

    def solve(self, qi, validate):
        # Initiate datastructures
        self.storage = Storage(self.node_max, self.system.nq)
        self.pathtree = PathTree(self.storage)
        self.NNtree = self.datastructure_cls(
            self.distance,
            **self.datastructure_kwargs
        )
        qi_idx = self.storage.add_point(qi)
        self.NNtree.add_point(qi_idx)
        self.it_trace = []

        found = False
        iterator = range(self.iter_max)
        for i in iterator:
            # New candidate
            q_new = self.new_candidate()
            q_new_idx = -1
            # Find its closer neighboor to q_new
            q_near_idx, d = self.NNtree.nearest_neighbour(q_new_idx)
            # Steer from it toward the new checking for colision
            success, q_prox = self.system.is_free_path(
                self.storage.data[q_near_idx], q_new,
                max_length=self.steer_max_length, eps=self.steer_delta
            )

            if not success:
                self.it_trace.append(0)
                continue
            self.it_trace.append(1)
            # Add the points in data structures
            q_prox_idx = self.storage.add_point(q_prox)
            self.NNtree.add_point(q_prox_idx)
            self.pathtree.update_link(q_prox_idx, q_near_idx)
            self.system.display(self.storage[q_near_idx], self.storage[q_prox_idx])

            # Test if it reach the goal
            if validate(q_prox):
                q_g_idx = self.storage.add_point(q_prox)
                self.NNtree.add_point(q_g_idx)
                self.pathtree.update_link(q_g_idx, q_prox_idx)
                found = True
            if self.storage.is_full or found:
                iterator.close()
                break
        self.iter_done = i + 1
        self.found = found
        return found

    def get_path(self, q_g):
        assert self.found
        path = self.pathtree.get_path()
        return np.concatenate([path, q_g[None, :]])


In [None]:
q_i = np.array([-1., -1.5, 2.1, -.5, -.5, 0])
q_g = np.array([3.1, -1., 1, -.5, -.5, 0])
system = System()
rrt = RRT(system)
eps_final = 0.1

In [None]:
validation = lambda key: system.metric(key, q_g) < eps_final

rrt.solve(q_i, validation)

In [None]:
system.display_motion(rrt.get_path())

### Implement bi RRT

### Add obstacles