In [1]:
import os
os.chdir(os.path.join(os.environ["RNB_PLANNING_DIR"], 'src'))

### Make scene

In [2]:
from pkg.controller.combined_robot import *
from pkg.project_config import *

crob = CombinedRobot(robots_on_scene=[
    RobotConfig(0, RobotType.panda, None,
                INDY_IP)]
              , connection_list=[False])
from pkg.geometry.builder.scene_builder import SceneBuilder
s_builder = SceneBuilder(None)
# s_builder.reset_reference_coord(ref_name="floor")
# xyz_rpy_robots = s_builder.detect_items(level_mask=[DetectionLevel.ROBOT])
xyz_rpy_robots = {"panda0": ((0,0,0), (0,0,0))}
crob.update_robot_pos_dict(xyz_rpy_robots=xyz_rpy_robots)
gscene = s_builder.create_gscene(crob)
from pkg.planning.scene import PlanningScene
pscene = PlanningScene(gscene, combined_robot=crob)

connection command:
panda0: False
Please create a subscriber to the marker
publication OK
published: [0, 0, 0, 0, 0, 0, 0]


## Preapare background

In [3]:
from pkg.geometry.geometry import *
from pkg.planning.constraint.constraint_actor import Gripper2Tool, PlacePlane

floor = gscene.create_safe(GEOTYPE.BOX, "floor", "base_link", (3,3,0.01), (0,0,0), 
                           rpy=(0,0,0), color=(0.8,0.8,0.8,0.5), display=True, fixed=True, collision=True)
wall = gscene.create_safe(GEOTYPE.BOX, "wall", "base_link", (3,3,0.01), (-0.3,0,0), 
                           rpy=(0,np.pi/2,0), color=(0.8,0.8,0.8,0.5), display=True, fixed=True, collision=True)

gtems = s_builder.add_robot_geometries(color=(0,1,0,0.5), display=True, collision=True)

gscene.create_safe(gtype=GEOTYPE.SPHERE, name="grip0", link_name="panda0_hand", 
                 dims=(0.01,)*3, center=(0,0,0.112), rpy=(-np.pi/2,0,0), color=(1,0,0,1), display=True, collision=False, fixed=True)
grip0 = pscene.create_binder(bname="grip0", gname="grip0", _type=Gripper2Tool, point=(0,0,0), rpy=(0,0,0))

Please create a subscriber to the marker


## Door knob

In [4]:
from pkg.utils.code_scraps import *

In [5]:
gscene.show_pose(crob.home_pose)

In [6]:
knob = gscene.create_safe(GEOTYPE.BOX, 'knob', link_name="base_link",
                          center=(0.5,0.5,0.5), rpy=(0,-np.pi/2,0), dims=(0.02,0.02,0.02),
                          fixed=True, collision=True, color=(1,0,0,1))

In [7]:
from enum import Enum
class KnobState(Enum):
    FREE = 0
    HELD = 1
    ON = 2

In [None]:

    
##
# @class LeverObject
# @remark   state_param: boolean vector of which each element represents if each waypoint is covered or not
#           node_item: number of covered waypoints
class LeverObject(AbstractObject):
    def __init__(self, oname, geometry, 
                 grasp_point, off_point, on_point):
        self.oname = oname
        self.geometry = geometry
        self.grasp_point, self.off_point, self.on_point = grasp_point, off_point, on_point
        self.action_points_dict = {grasp_point.name: grasp_point, 
                                   off_point.name: off_point, 
                                   on_point.name: on_point}
        self.sub_binders_dict = {}
        self.binding = BindingTransform(self, None, None, None)
        self.state_param = KnobState.FREE

    ##
    # @brief make constraints. by default, empty list.
    # @remark whether to apply constarint or not is decided with previous and next bindings
    # @param binding_from previous binding
    # @param binding_to next binding
    def make_constraints(self, binding_from, binding_to, tol=None):
        if binding_from is not None:
            if binding_from.handle_name == self.grap_point.name:
                return "Constraint not implemented yet. Use MoveitPlanner.incremental_constraint_motion=True"
        return []
    
    ##
    # @brief set object binding state and update location
    # @param binding BindingTransform
    # @param state_param (link name, offset transformation in 4x4 matrix)
    def set_state(self, binding, state_param=None):
        assert binding.chain.subject_name == self.oname, "wrong binding given {} <- {}".format(
            self.oname, binding.chain.subject_name)
        self.binding = deepcopy(binding)
        self.state_param = deepcopy(state_param)
        if state_param == KnobState.HELD:
            binding = 
        self.geometry.set_offset_tf(binding.T_lao[:3, 3], binding.T_lao[:3, :3])
        self.geometry.set_link(binding.actor_link)
        self.update_sub_points()

    ##
    # @brief get object-level node component
    @classmethod
    def get_node_component(cls, btf, state_param):
        return state_param.name

    ##
    # @brief get object-level node component
    def get_neighbor_node_component_list(self, node_tem, pscene):
        if node_tem == KnobState.FREE.name:
            return [KnobState.FREE.name, KnobState.HELD.name]
        elif node_tem == KnobState.HELD.name:
            return [KnobState.FREE.name, 
                    KnobState.HELD.name, 
                    KnobState.ON.name]
        elif node_tem == KnobState.ON.name:
            return [KnobState.HELD.name, 
                    KnobState.ON.name]

    ##
    # @brief (prototype) get available bindings from current binding state
    # @param from_binding current binding (subject name, handle name, actor name, actor root geometry name)
    # @param to_node_item desired node item
    # @param actor_dict
    #           dictionary of binder {binder_name: rnb-planning.src.pkg.planning.constraint.constraint_actor.Actor}
    # @param Q_dict dictionary of joint values {joint_name: value}
    # @return list of available bindings [(handle name, actor name, actor root geometry name), ...]
    def get_available_bindings(self, from_binding, to_node_item, actor_dict, Q_dict):
        if to_node_item == KnobState.FREE.name:
            return [from_binding[1:]]
        elif to_node_item == KnobState.HELD.name:
            from_hdl = from_binding.handle_name
            if from_hdl == self.grasp_point.name:
                return [from_binding[1:]]
            elif from_hdl == self.off_point.name:
                ap_list = [self.grasp_point]
            else:
                raise(NotImplementedError("Unexpected transition"))
        elif to_node_item == KnobState.ON.name:
            from_hdl = from_binding.handle_name
            if from_hdl == self.on_point.name:
                return [from_binding[1:]]
            elif from_hdl == self.off_point.name:
                ap_list = [self.grasp_point]
            else:
                raise(NotImplementedError("Unexpected transition"))
        
        
        
        ap_dict = self.action_points_dict
        apk_exclude = self.get_conflicting_points(from_binding[1])
        bd_exclude = from_binding[2]
        apk_list = ap_dict.keys()
        ap_list = [ap_dict[apk] for apk in apk_list if apk not in apk_exclude]
        bd_list = [binder for bname, binder in actor_dict.items()
                   if (binder.geometry.name == to_node_item
                       and binder.check_available(Q_dict)
                       and bname != bd_exclude)]

        available_bindings = []
        for bd in bd_list:
            for ap in ap_list:
                if bd.check_type(ap):
                    available_bindings.append((ap.name, bd.name, bd.geometry.name))
        if not available_bindings:
            print("=================================")
            print("=================================")
            print("=================================")
            print("Not available:{}-{}".format(self.oname,to_node_item))
            print("ap_exclude:{}".format(apk_exclude))
            print("bd_exclude:{}".format(bd_exclude))
            print("=================================")
            print("=================================")
            print("=================================")
        return available_bindings
    ##
    # @brief (prototype) get all object-level node component
    @classmethod
    def get_all_node_components(cls, pscene):
        return [KnobState.FREE.name, KnobState.HELD.name, KnobState.ON.name]
        
    ##
    # @brief get object-level state_param update
    def get_updated_state_param(self, btf, state_param):
        return state_param


In [7]:

def add_lever(pscene, knob,lname="lever", lever_ang=np.pi/4, knob_offset=(0.09), dims=(0.02, 0.2, 0.02),
              link_name="base_link", color=(0.8, 0.8, 0.8, 1), clearance = 1e-3):
    gscene = pscene.gscene
    Q0 = [0]*gscene.joint_num
    Tbk = knob.get_tf(Q0)
    Tbkp = np.matmul(Tbk, SE3(np.identity(3), (0, 0, knob.dims[2]/2+clearance)))
    Tbl = np.matmul(Tbkp, SE3(np.identity(3), (0, knob_offset, dims[2]/2)))
    lever = gscene.create_safe(GEOTYPE.BOX, lname, link_name=link_name,
                                    center=Tbl[:3,3], rpy=Rot2rpy(Tbl[:3,:3]), dims=dims,
                                    fixed=False, collision=True, color=color)
    lgrasp = Grasp2Point("gp", lever, (0, 0, 0), (np.pi/2, 0, -np.pi/2))
    lattach = PlaceFrame("cp", lever, (0, -knob_offset, -dims[2]/2), (0,0,0))

    lever_g = pscene.create_subject(oname=lname+"_grip", gname=lever.name, _type=CustomObject,
                                     action_points_dict={lgrasp.name: lgrasp, lattach.name: lattach})
    kpoint0 = pscene.create_binder(bname=knob.name + "_0", gname=knob.name, _type=AttachFrame,
                                   point=(0, 0, knob.dims[2]/2+clearance))

    lever_plug = pscene.create_binder(bname=knob.name+"_plug", gname=lname, _type=KnobFramer,
                                   point=(0, -knob_offset, -dims[2]/2))
    rp1 = KnobFrame("r1", knob, (0, 0, knob.dims[2]/2+clearance), (0,0,0))
    rp2 = KnobFrame("r2", knob, (0, 0, knob.dims[2]/2+clearance), rpy=(0, 0, lever_ang))

    knob_s = pscene.create_subject(oname=knob.name, gname=knob.name, _type=KnobTask,
                                   action_points_dict={rp1.name: rp1, rp2.name: rp2},
                                   lever = lever_plug)
    return knob_s


In [8]:
add_lever(pscene, knob)

<pkg.planning.constraint.constraint_subject.KnobTask at 0x7fb0d87ec650>

### prepare planner

In [9]:
from pkg.planning.motion.moveit.moveit_planner import MoveitPlanner
mplan = MoveitPlanner(pscene)
mplan.update_gscene()

from pkg.planning.task.rrt import TaskRRT
tplan = TaskRRT(pscene)
tplan.prepare()

from pkg.planning.pipeline import PlanningPipeline
ppline = PlanningPipeline(pscene)
ppline.set_motion_planner(mplan)
ppline.set_task_planner(tplan)

from pkg.ui.ui_broker import *
# start UI
ui_broker = UIBroker.instance()
ui_broker.initialize(ppline, s_builder)
ui_broker.start_server()

ui_broker.set_tables()

Dash is running on http://0.0.0.0:8050/

 * Serving Flask app "pkg.ui.dash_launcher" (lazy loading)
 * Environment: production


In [10]:
from pkg.planning.filtering.grasp_filter import GraspChecker
from pkg.planning.filtering.reach_filter import ReachChecker
mplan.motion_filters = [GraspChecker(pscene), ReachChecker(pscene)]
mplan.incremental_constraint_motion = True

   Use a production WSGI server instead.
 * Debug mode: off


### Plan

In [11]:
gscene.show_pose(crob.home_pose)
initial_state = pscene.initialize_state(crob.home_pose)
print(pscene.subject_name_list)
print(initial_state.node)

['knob', 'lever_grip']
(0, 'knob')


In [13]:
goal_nodes = [(2,'grip0')]
ppline.search(initial_state, goal_nodes, max_solution_count=1,
              verbose=True, display=False, dt_vis=0.01, 
              timeout=0.5, timeout_loop=30, 
              multiprocess=True, add_homing=False)

Use 18/36 agents
from_state.node: (0, 'knob')
new_node: (1, 'knob')
to_state.node: (1, 'knob')
from_state.node: (0, 'knob')
new_node: (0, 'grip0')
try: 0 - (0, 'knob')->(1, 'knob')
from_state.node: (0, 'knob')
to_state.node: (0, 'grip0')
from_state.node: (0, 'knob')
new_node: (0, 'grip0')
new_node: (1, 'knob')
try: 0 - (0, 'knob')->(0, 'grip0')
to_state.node: (1, 'knob')
to_state.node: (0, 'grip0')
from_state.node: (0, 'knob')
try transition motion
new_node: (1, 'knob')
try: 0 - (0, 'knob')->(1, 'knob')
try: 0 - (0, 'knob')->(0, 'grip0')
result: 0 - (0, 'knob')->(1, 'knob') = fail
from_state.node: (0, 'knob')
try transition motion
to_state.node: (1, 'knob')
new_node: (1, 'knob')
try: 0 - (0, 'knob')->(1, 'knob')
to_state.node: (1, 'knob')
result: 0 - (0, 'knob')->(1, 'knob') = fail
result: 0 - (0, 'knob')->(1, 'knob') = fail
from_state.node: (0, 'knob')
new_node: (0, 'grip0')
from_state.node: (0, 'knob')
try: 0 - (0, 'knob')->(1, 'knob')
to_state.node: (0, 'grip0')
new_node: (1, 'knob'

result: 0 - (0, 'knob')->(1, 'knob') = fail
from_state.node: (1, 'grip0')
from_state.node: (1, 'grip0')
from_state.node: (0, 'knob')
bd_exclude:knob_plug
new_node: (2, 'grip0')
new_node: (0, 'grip0')
to_state.node: (0, 'grip0')
result: 3 - (1, 'grip0')->(1, 'knob') = success
new_node: (2, 'grip0')
to_state.node: (2, 'grip0')
to_state.node: (2, 'grip0')
branching: 4->13 (0.66/30.0 s, steps/err: 1(15.4209136963 ms)/0)
result: 5 - (0, 'grip0')->(0, 'knob') = success
from_state.node: (1, 'knob')
try: 7 - (1, 'grip0')->(2, 'grip0')
try: 0 - (0, 'knob')->(0, 'grip0')
try: 11 - (1, 'grip0')->(2, 'grip0')
try transition motion
branching: 3->14 (0.66/30.0 s, steps/err: 1(22.5110054016 ms)/0)
branching: 5->15 (0.67/30.0 s, steps/err: 1(45.3341007233 ms)/0)
from_state.node: (1, 'knob')
new_node: (0, 'knob')
new_node: (0, 'knob')
from_state.node: (1, 'grip0')
new_node: (1, 'knob')
from_state.node: (1, 'knob')
to_state.node: (1, 'knob')
try: 7 - (1, 'grip0')->(1, 'knob')
Not available:knob-0
new_no

transition motion tried: True
from_state.node: (1, 'grip0')
new_node: (2, 'grip0')
to_state.node: (2, 'grip0')
try: 24 - (1, 'grip0')->(2, 'grip0')
result: 12 - (0, 'knob')->(0, 'grip0') = success
branching: 12->25 (1.07/30.0 s, steps/err: 14(53.692817688 ms)/0.00196095058787)
from_state.node: (0, 'grip0')
new_node: (1, 'grip0')
end
constrained motion tried: True
to_state.node: (1, 'grip0')
try: 25 - (0, 'grip0')->(1, 'grip0')
result: 25 - (0, 'grip0')->(1, 'grip0') = success
end
branching: 25->27 (1.09/30.0 s, steps/err: 1(6.15692138672 ms)/0)
Goal reached
result: 7 - (1, 'grip0')->(2, 'grip0') = success
constrained motion tried: True
from_state.node: (1, 'grip0')
new_node: (2, 'grip0')
Goal reached
branching: 7->26 (1.12/30.0 s, steps/err: 80(604.787111282 ms)/7.05274074582e-06)
to_state.node: (2, 'grip0')
result: 3 - (1, 'grip0')->(2, 'grip0') = success
try: 27 - (1, 'grip0')->(2, 'grip0')
end
constrained motion tried: True
branching: 3->28 (1.15/30.0 s, steps/err: 80(743.672847748 

end
constrained motion tried: True
Goal reached
result: 7 - (1, 'grip0')->(2, 'grip0') = success
branching: 7->38 (1.53/30.0 s, steps/err: 80(940.270900726 ms)/7.05274074582e-06)
end
constrained motion tried: True
Goal reached
result: 20 - (1, 'grip0')->(2, 'grip0') = success
branching: 20->39 (1.6/30.0 s, steps/err: 80(678.823947906 ms)/5.80490813342e-06)
end
constrained motion tried: True
Goal reached
result: 17 - (1, 'grip0')->(2, 'grip0') = success
branching: 17->40 (1.62/30.0 s, steps/err: 80(765.94209671 ms)/6.73537246505e-06)
end
constrained motion tried: True
Goal reached
result: 2 - (1, 'grip0')->(2, 'grip0') = success
branching: 2->41 (1.63/30.0 s, steps/err: 80(731.513977051 ms)/4.93118495729e-06)
end
constrained motion tried: True
Goal reached
result: 21 - (1, 'grip0')->(2, 'grip0') = success
branching: 21->42 (1.71/30.0 s, steps/err: 80(758.095979691 ms)/1.1815018035e-05)
end
constrained motion tried: True
Goal reached
result: 27 - (1, 'grip0')->(2, 'grip0') = success
bran

In [14]:
snode_schedule = tplan.get_best_schedule(at_home=False)
ppline.play_schedule(snode_schedule)

(0, 'knob')->(0, 'grip0')
(0, 'grip0')->(1, 'grip0')
(1, 'grip0')->(2, 'grip0')


### push back

In [15]:
term_state = snode_schedule[-1].state
gscene.show_pose(term_state.Q)
print(pscene.subject_name_list)
print(term_state.node)

['knob', 'lever_grip']
(2, 'grip0')


In [25]:
goal_nodes = [(1,'knob')]
ppline.search(term_state, goal_nodes, max_solution_count=1,
              verbose=True, display=False, dt_vis=0.01, 
              timeout=0.5, timeout_loop=30, 
              multiprocess=False)

from_state.node: (2, 'grip0')
new_node: (1, 'grip0')
to_state.node: (1, 'grip0')
try: 0 - (2, 'grip0')->(1, 'grip0')
end
constrained motion tried: True
result: 0 - (2, 'grip0')->(1, 'grip0') = success
branching: 0->1 (0.45/30.0 s, steps/err: 80(450.143098831 ms)/3.89746326387e-05)
from_state.node: (1, 'grip0')
new_node: (1, 'knob')
to_state.node: (1, 'knob')
try: 1 - (1, 'grip0')->(1, 'knob')
Goal reached
result: 1 - (1, 'grip0')->(1, 'knob') = success
branching: 1->2 (0.46/30.0 s, steps/err: 1(3.70216369629 ms)/0)
++ adding return motion to acquired answer ++
Goal reached
from_state.node: (1, 'grip0')
new_node: (1, 'knob')
to_state.node: (1, 'knob')
try: 1 - (1, 'grip0')->(1, 'knob')
Goal reached
result: 1 - (1, 'grip0')->(1, 'knob') = success
branching: 1->4 (0.77/30.0 s, steps/err: 1(12.7818584442 ms)/0)
++ adding return motion to acquired answer ++
Goal reached


In [27]:
snode_schedule = tplan.get_best_schedule(home_pose=crob.home_pose)
ppline.play_schedule(snode_schedule)

(2, 'grip0')->(1, 'grip0')
(1, 'grip0')->(1, 'knob')
(1, 'knob')->(1, 'knob')
