In [9]:
from tsrrt.collision import PandaCollision
import pybullet as p
import pybullet_data
import numpy as np
from spatial_math_mini import *
from collections.abc import Sequence
from abc import ABC, abstractmethod
#import kdtree
import datetime

pybullet build time: Oct 11 2021 20:59:39


In [10]:
from tsrrt.data_structure import Node, StateSpace, Tree
from tsrrt.util import *

In [11]:
uid = p.connect(p.GUI)
checker = PandaCollision(p, uid)

In [12]:
ss = StateSpace(Node)

#init
checker.set_home_positions()
(pos, ori), q = checker.get_ee_pose(), checker.get_arm_positions()
start = Node(pos, ori, q)

tree = Tree(start, state_space=ss)

In [15]:
ss = StateSpace(NodeTS, checker)
#_controller = Controller(NodeTS, checker)

#init
checker.set_home_positions()
(pos, ori), q = checker.get_ee_pose(), checker.get_arm_positions()
start = NodeTS(pos, ori, q)
_tree = Tree(start)

In [14]:
class NodeTS():
    trans_weight = 5
    
    def __init__(self, pos, ori, q, m=None):
        super().__init__()
        self.pos = pos
        self.ori = ori
        self.q = q
        self._m = m
        self.idx = None
        
    def __repr__(self):
        return "pos:{}, ori:{}".format(self.pos, self.ori)
    
    def __sub__(self, y):
        """get twist"""
        SE3_1 = SE3(self.ori, self.pos)
        SE3_2 = SE3(y.ori, y.pos)
        screw, angle = (SE3_2.inv() @ SE3_1).to_twistangle()
        return screw*angle
    
    @staticmethod
    def dist(node1, node2):
        twist = node1 - node2
        rot_dist = np.linalg.norm(twist[:3])
        trans_dist = np.linalg.norm(twist[3:])
        return rot_dist + trans_dist * NodeTS.trans_weight
        

class Tree:
    def __init__(self, root):
        #init
        self._parent_of = {0:None} # parent of 0(root) is None
        self._num = 0
        self._data = []
        self.add(root)
    
    def add(self, node, parent=None):
        node.idx = self._num
        self._data.append(node)
        if not parent is None:
            self._parent_of[node.idx] = parent.idx
        self._num += 1
    
    def nearest(self, node):
        distances = [NodeTS.dist(node, n) for n in self._data]
        idx = np.argmin(distances)
        return self._data[idx]
    
    def parent(self, node):
        assert node.idx is not None
        return self._data[self._parent_of[node.idx]]
    
    def backtrack(self, node_last):
        path = [node_last]
        child_idx = node_last.idx
        while True:
            parent_idx = self._parent_of[child_idx]
            if parent_idx is None:
                break
            path.append(self._data[parent_idx])
            child_idx = parent_idx
        return path[::-1]
    
class StateSpace:
    """ 
    state space configuration and random node generator
    """
    pos_ll = [-1, -1, 0]
    pos_ul = [1, 1, 1]
    q_ll = [-2.9671, -1.8326, -2.9671, -3.1416, -2.9671, -0.0873, -2.9671]
    q_ul = [2.9671, 1.8326, 2.9671, 0.0, 2.9671, 3.8223, 2.9671]
    q_m = [0.0, 0.0, 0.0, -1.5708, 0.0, 1.8675, 0.0]
    q_range = [5.9342, 3.6652, 5.9342, 3.1416, 5.9342, 3.9095999999999997, 5.9342]
    eps = 0.01
    trans_max = 0.1
    rot_max = 0.1
    
    def __init__(self, node_type, checker):
        self._node_class = node_type
        self._checker = checker
        
    @staticmethod
    def _get_pos():
        pos = np.empty(3)
        for i in range(3):
            ll, ul = StateSpace.pos_ll[i], StateSpace.pos_ul[i]
            pos[i] = np.random.uniform(low=ll, high=ul)
        return pos
    
    @staticmethod
    def _get_ori():
        return  SO3._uniform_sampling_quaternion()
    
    @staticmethod
    def _get_q():
        q = np.empty(7)
        for i in range(7):
            ll, ul = StateSpace.q_ll[i], StateSpace.q_ul[i]
            q[i] = np.random.uniform(low=ll, high=ul)
        return q
    
    def is_valid(self, q):
        checker.set_joint_positions(q)
        return not checker.is_self_collision()

    def random(self, isfree=True):
        #get sample in free space by rejection sampling
        pos, ori = self._get_pos(), self._get_ori()
        _, q = self._checker.IK(pos, ori)
        
        while True:
            #task-space voronoi bias
            pos, ori = self._get_pos(), self._get_ori()
            _, q = self._checker.IK(pos, ori)
            self._checker.set_joint_positions(q)
            pos, ori = self._checker.get_ee_pose()
            is_collision = self._checker.is_self_collision()
            if not is_collision:
                node_new = self._node_class(pos, ori, q)
                break
        node_new = self._node_class(pos, ori, q)
        return node_new

In [16]:
class Controller:
    trans_max = 0.1
    rot_max = 0.1
    manipulability_max = 1000
    
    def __init__(self, node_type, checker):
        self._node_class = node_type
        self._checker = checker
        
    @staticmethod
    def crop_within_limits(twist):
        trans_ratio = np.linalg.norm(twist[:3])/Controller.trans_max
        rot_ratio = np.linalg.norm(twist[3:])/Controller.rot_max
        if (trans_ratio < 1) & (rot_ratio < 1):
            return twist
        elif trans_ratio > rot_ratio:
            return twist/trans_ratio
        else:
            return twist/rot_ratio
    
    def clamp_max(self, qs, qmax):
        result = []
        mag = np.linalg.norm(qs)
        if mag < qmax:
            return qs
        else:
            return qmax * qs/mag
    
    def clamp_max_abs(self, qs, qmax):
        result = []
        mag = np.linalg.norm(qs,1)
        if mag < qmax:
            return qs
        else:
            return qmax * qs/mag
        
    def control(self, node1, node2):
        diff = node2 - node1
        jac = self._checker.get_body_jacobian(node1.q)
        #jac_pinv = np.linalg.pinv(jac)
        #null_proj = np.eye(7) - jac_pinv@jac
        #damped least square
        U, s, VT = np.linalg.svd(jac)
        damping = 0.1
        dls = np.zeros((7,6))
        for i in range(6):
            u, v = U[:,i], VT[i,:]
            dls += s[i]/(s[i]**2+damping**2) * np.outer(v, u)
        manipulability = s[0] / s[-1]
        # print(np.linalg.norm(null_proj))
        # print(null_proj @ (node2.q - node1.q))
        q_delta = dls @ diff  # + null_proj @ (StateSpace.q_m - node1.q)
        q_new = self.clamp_max(q_delta, 0.2) + node1.q
        pos, ori = self._checker.FK(q_new)
        is_stop_cond = manipulability > self.manipulability_max
        return False, self._node_class(pos, ori, q_new, manipulability)

In [17]:
import numpy as np

In [8]:
np.linalg.norm([1,2,3,4,-5],np.inf)

5.0

In [5]:
uid = p.connect(p.GUI)
checker = PandaCollision(p, uid)

In [18]:
_statespace = StateSpace(NodeTS, checker)
_controller = Controller(NodeTS, checker)

#init
checker.set_home_positions()
(pos, ori), q = checker.get_ee_pose(), checker.get_arm_positions()
start = NodeTS(pos, ori, q)
_tree = Tree(start)

In [19]:
goal = _statespace.random()
view_pose(goal.pos, goal.ori)
checker.set_joint_positions(goal.q)
checker.is_self_collision()

False

In [294]:
clear()

In [20]:
import time
import copy
def extend(node_rand):
    global _last_node
    node_near = _tree.nearest(node_rand)
    view_pose(node_rand.pos, node_rand.ori)
    parent = node_near
    is_stop, node_new = _controller.control(parent, node_rand)
    dist = NodeTS.dist(node_rand, node_new)
    curr_dist = dist
    while True:
        curr_dist = NodeTS.dist(node_rand, node_new)
        is_not_valid = not _statespace.is_valid(node_new.q)
        is_threshold = (dist - curr_dist) >=  0.5
        is_arrived = curr_dist <=  _statespace.eps
        is_goal = NodeTS.dist(_goal, node_new) <= _statespace.eps
        #print(is_goal, NodeTS.distance(_goal, node_new), is_not_valid, is_stop)
        print("dist:{}, curr_dist:{}".format(dist, curr_dist))
        if is_not_valid | is_stop:
            print("is not valid")
            return #break
        checker.set_joint_positions(node_new.q)#######################################################
        _tree.add(node_new, parent)
        
        if is_goal:
            print(node_new)
            _last_node = copy.deepcopy(node_new)
            print("goal found")
            return #break
        
        if is_arrived | is_threshold:
            print("arrived")
            return 
        

        parent = node_new
        is_stop, node_new = _controller.control(parent, node_rand)
        time.sleep(0.2)

def is_goal():
    return _last_node is not None

global _last_node
_last_node = None
_goal = goal

In [28]:
clear()

In [21]:
if _last_node is None:
    if np.random.random() > 1:
        node_rand = _statespace.random(isfree=False)
    else:
        node_rand = _goal
    #view_pose(node_rand.pos, node_rand.ori)##########################################3
    extend(node_rand)

dist:4.240099992336301, curr_dist:4.240099992336301
dist:4.240099992336301, curr_dist:4.16814844961332
dist:4.240099992336301, curr_dist:4.156199895141877
dist:4.240099992336301, curr_dist:4.18560018188562
dist:4.240099992336301, curr_dist:4.239298984921557
dist:4.240099992336301, curr_dist:4.303391172349409
dist:4.240099992336301, curr_dist:4.366730305718512
dist:4.240099992336301, curr_dist:4.4198720165339465
dist:4.240099992336301, curr_dist:4.454112324346108
dist:4.240099992336301, curr_dist:4.460796007265726
dist:4.240099992336301, curr_dist:4.430667066740422
dist:4.240099992336301, curr_dist:4.35303308059442
dist:4.240099992336301, curr_dist:4.214954466580131
dist:4.240099992336301, curr_dist:4.002332414872711
dist:4.240099992336301, curr_dist:3.7095878003559863
arrived


In [14]:
path = _tree.backtrack(_last_node)

In [232]:
i = 0

In [293]:
i += 1
checker.set_joint_positions(path[i].q)

### Debug

In [96]:
node1 = start
node2 = goal

In [103]:
np.savetxt("start_pos",node1.pos)
np.savetxt("start_ori",node1.ori)
np.savetxt("goal_pos",node2.pos)
np.savetxt("goal_ori",node2.ori)

In [105]:
diff = node2 - node1
diff

array([-1.14723403,  1.52757014, -0.40919282,  0.03224844,  0.47263063,
       -0.11216937])

In [109]:
jac = checker.get_body_jacobian(node1.q)
U, s, VT = np.linalg.svd(jac)
damping = 0.1
dls = np.zeros((7,6))
for i in range(6):
    u, v = U[:,i], VT[i,:]
    dls += s[i]/(s[i]**2+damping**2) * np.outer(v, u)

In [115]:
dls

array([[ 2.54906155e-01, -6.32427510e-12,  4.38964523e-01,
         2.60651818e-12, -8.37252738e-01, -1.18177271e-11],
       [ 1.17135052e-12,  6.45322684e-01,  1.91936648e-11,
         2.59545111e+00,  2.60173273e-12, -2.52085801e-01],
       [ 2.54906155e-01, -3.57307311e-12,  4.38964523e-01,
        -2.87588352e-11, -8.37252738e-01,  7.00959608e-12],
       [ 1.21274957e-11, -5.84605995e-01,  1.05173734e-11,
         2.80648416e+00,  2.15725641e-11, -2.41129034e+00],
       [ 9.85968831e-01,  2.75901790e-13, -1.24953998e-02,
        -5.92856290e-13,  1.35657902e-02,  1.05444025e-12],
       [-1.96383128e-11,  2.16527710e+00,  3.51532322e-13,
        -1.93261605e-01, -2.62216159e-12,  2.08377492e+00],
       [ 4.97316911e-01, -7.94114660e-12,  1.83680333e+00,
        -1.72818965e-12, -1.63346367e+00, -6.24302337e-12]])

In [116]:
diff

array([-1.14723403,  1.52757014, -0.40919282,  0.03224844,  0.47263063,
       -0.11216937])

In [111]:
damping

0.1

In [117]:
dls@diff

array([-0.86776944,  1.09775121, -0.86776944, -0.53204901, -1.11961236,
        3.06764453, -2.09417059])

In [113]:
q_delta = dls @ diff
q_delta

array([-0.86776944,  1.09775121, -0.86776944, -0.53204901, -1.11961236,
        3.06764453, -2.09417059])

In [None]:
diff = node2 - node1
jac = self._checker.get_body_jacobian(node1.q)
#jac_pinv = np.linalg.pinv(jac)
#null_proj = np.eye(7) - jac_pinv@jac
#damped least square
U, s, VT = np.linalg.svd(jac)
damping = 0.1
dls = np.zeros((7,6))
for i in range(6):
    u, v = U[:,i], VT[i,:]
    dls += s[i]/(s[i]**2+damping**2) * np.outer(v, u)
manipulability = s[0] / s[-1]
# print(np.linalg.norm(null_proj))
# print(null_proj @ (node2.q - node1.q))
q_delta = dls @ diff  # + null_proj @ (StateSpace.q_m - node1.q)
q_new = self.clamp_max(q_delta, 0.2) + node1.q
pos, ori = self._checker.FK(q_new)
is_stop_cond = manipulability > self.manipulability_max
return False, self._node_class(pos, ori, q_new, manipulability)

In [50]:
is_stop, node = _controller.control(start, goal)
checker.set_joint_positions(node.q)

In [95]:
if not is_stop:
    success, node = _controller.control(node, goal)
    print(NodeTS.dist(goal, node))
    print(np.linalg.norm(goal.q - node.q))
checker.set_joint_positions(node.q)

2.969510630345833e-09
1.3821927579230495


In [48]:
ss = StateSpace(NodeTS, checker)

In [82]:
start = ss.random()
tree = Tree(start)

In [126]:
for i in range(1000):
    tree.add(ss.random(), parent=start)

In [142]:
node_rand = ss.random()
nn = tree.nearest(node_rand)
checker.set_joint_positions(nn.q)


In [143]:
checker.set_joint_positions(node_rand.q)

In [None]:
start = tree.add()