In [1]:
import numpy as np
from functools import partial
import ipywidgets as widgets
from IPython.display import display

In [2]:
#pydrake imports
from pydrake.all import RationalForwardKinematics
from pydrake.geometry.optimization import IrisOptions, HPolyhedron, Hyperellipsoid
from pydrake.solvers import MosekSolver, CommonSolverOption, SolverOptions
from pydrake.all import PiecewisePolynomial, InverseKinematics, Sphere, Rgba, RigidTransform, RotationMatrix, IrisInConfigurationSpace
import time
import pydrake
from pydrake.all import (SceneGraphCollisionChecker, 
                         StartMeshcat, 
                         RobotDiagramBuilder,
                         ProcessModelDirectives,
                         LoadModelDirectives,
                         MeshcatVisualizer,
                         DiagramBuilder,
                         AddMultibodyPlantSceneGraph,
                         Box,
                         Parser,
                         MultibodyPlant,
                        SpatialInertia)
from pydrake.all import (SceneGraphCollisionChecker, 
                         StartMeshcat, 
                         RobotDiagramBuilder,
                         ProcessModelDirectives,
                         LoadModelDirectives,
                         MeshcatVisualizer,
                        DiagramBuilder,
                         MultibodyPlant,
                         AddMultibodyPlantSceneGraph,
                         PiecewisePolynomial,
                        Parser)
from pydrake.all import GeometrySet, CollisionFilterDeclaration
from scipy.special import comb
from pydrake.all import Role, MeshcatVisualizerParams

import matplotlib.pyplot as plt
import logging
# from C_Iris_Examples.visualization_utils import visualize_body_at_s
from C_Iris_Examples.sampling_based_motion_planners import StraightLineCollisionChecker, PRMFixedEdges, BiRRT
import C_Iris_Examples.visualization_utils as vis_utils
from pathlib import Path
import pickle
from pydrake.all import ModelInstanceIndex
from pydrake.polynomial import Polynomial as PolynomialCommon

drake_logger = logging.getLogger("drake")
drake_logger.setLevel(logging.DEBUG)


In [3]:
meshcat = StartMeshcat()

DEBUG:drake:FindResource ignoring DRAKE_RESOURCE_ROOT because it is not set.
DEBUG:drake:FindRunfile mechanism = RUNFILES_{MANIFEST_FILE,DIR}
DEBUG:drake:cwd = "/home/peter/git/drake/cvisiris_examples"
DEBUG:drake:FindRunfile found by-manifest '/home/peter/git/drake/.drake-find_resource-sentinel' (good) and by-directory '/home/peter/.cache/bazel/_bazel_peter/76bc0000d64a792547034bb2ef203b82/execroot/drake/bazel-out/k8-opt/bin/cvisiris_examples/14dof_env_vis.runfiles/drake/.drake-find_resource-sentinel' (good)
DEBUG:drake:FindRunfile found by-manifest '/home/peter/.cache/bazel/_bazel_peter/76bc0000d64a792547034bb2ef203b82/execroot/drake/bazel-out/k8-opt/bin/geometry/meshcat.js' (good) and by-directory '/home/peter/.cache/bazel/_bazel_peter/76bc0000d64a792547034bb2ef203b82/execroot/drake/bazel-out/k8-opt/bin/cvisiris_examples/14dof_env_vis.runfiles/drake/geometry/meshcat.js' (good)
DEBUG:drake:FindResource ignoring DRAKE_RESOURCE_ROOT because it is not set.
DEBUG:drake:FindRunfile found 

In [5]:
builder = DiagramBuilder()
plant, scene_graph = AddMultibodyPlantSceneGraph(builder, 0.001)
inspector = scene_graph.model_inspector()
parser = Parser(plant)
#parser.package_map().Add("cvisirisexamples", missing directory)
meshcat_params = MeshcatVisualizerParams()
#         meshcat_params.role = Role.kProximity
visualizer = MeshcatVisualizer.AddToBuilder(builder, scene_graph, meshcat,meshcat_params)
directives_file = "14_dof_directives_newshelf.yaml"#FindResourceOrThrow() 
directives = LoadModelDirectives(directives_file)
models = ProcessModelDirectives(directives, plant, parser)

#     collision_filter_manager = scene_graph.collision_filter_manager()
#     inspector = scene_graph.model_inspector()
#     schunk_body = plant.GetBodyByName("body")
#     iiwa_link_7 = plant.GetBodyByName("iiwa_link_7")
#     iiwa_link_6 = plant.GetBodyByName("iiwa_link_6")
#     iiwa_links = [plant.GetBodyByName(f"iiwa_link_{i}") for i in range(8)]
#     bodies = []
#     for b in iiwa_links+[schunk_body]:#[schunk_body, iiwa_link_7, iiwa_link_6]:
#         bodies += plant.GetCollisionGeometriesForBody(b)
#     geom_set = GeometrySet(bodies)
#     decl = CollisionFilterDeclaration().ExcludeWithin(geom_set)
#     collision_filter_manager.Apply(decl)

#     shelves = [plant.GetBodyByName(f"shelf{i}",
#                                   plant.GetModelInstanceByName(f"shelves{i}")) for i in range(1,3)]
#     bodies = []
#     for b in iiwa_links[:5]+shelves:
#         bodies += plant.GetCollisionGeometriesForBody(b)
#     geom_set = GeometrySet(bodies)
#     decl = CollisionFilterDeclaration().ExcludeWithin(geom_set)
#     collision_filter_manager.Apply(decl)

# plant.Finalize()

gripper_col_geoms_by_gripper = []
gripper_visual_geoms_by_gripper = []
shelf_col_geom_ids = []
shelf_vis_geom_ids = []
for model_info in models:
    if "wsg" in model_info.model_name:
        col_geoms = []
        vis_geoms = []

        gripper_body = plant.GetBodyByName("body", model_info.model_instance)
        left_finger_body = plant.GetBodyByName("left_finger", model_info.model_instance)
        right_finger_body = plant.GetBodyByName("right_finger", model_info.model_instance)
        gripper_all_bodies = [gripper_body, left_finger_body, right_finger_body]
        gripper_all_body_frame_ids = [plant.GetBodyFrameIdIfExists(b.index()) for b in gripper_all_bodies]
        for frame_id in gripper_all_body_frame_ids:
            for geometry_id in inspector.GetGeometries(frame_id,
                                                   Role.kIllustration):
                vis_props = inspector.GetProperties(geometry_id, Role.kIllustration)
                # has visualization properties
                if vis_props is not None and vis_props.HasProperty("phong", "diffuse"):
                    vis_geoms.append(geometry_id)
            for geometry_id in inspector.GetGeometries(frame_id,
                                                   Role.kProximity):
                col_props = inspector.GetProperties(geometry_id, Role.kProximity)
                # has visualization properties
                if col_props is not None:
                    col_geoms.append(geometry_id)       

        gripper_col_geoms_by_gripper.append(col_geoms)
        gripper_visual_geoms_by_gripper.append(vis_geoms)
    elif "shelves1" in model_info.model_name:
        shelf_body_indices = plant.GetBodyIndices(model_info.model_instance)
        for body_index in shelf_body_indices:
            frame_id = plant.GetBodyFrameIdIfExists(body_index)
            for geometry_id in inspector.GetGeometries(frame_id,
                                       Role.kIllustration):
                vis_props = inspector.GetProperties(geometry_id, Role.kIllustration)
                # has visualization properties
                if vis_props is not None and vis_props.HasProperty("phong", "diffuse"):
                    shelf_vis_geom_ids.append(geometry_id)

            for geometry_id in inspector.GetGeometries(frame_id,
                                                   Role.kProximity):
                col_props = inspector.GetProperties(geometry_id, Role.kProximity)
                # has visualization properties
                if col_props is not None:
                    shelf_col_geom_ids.append(geometry_id) 

gripper_col_geoms_ids = [item for l in gripper_col_geoms_by_gripper for item in l]

important_collision_pairs = [(p1, p2) for p2 in shelf_col_geom_ids 
                             for p1 in gripper_col_geoms_ids]
important_collision_pairs += [(p1,p2) for p1, p2 in zip(gripper_col_geoms_ids[:-1], gripper_col_geoms_ids[1:])]

#important_collision_pairs.append((p1,p2) for p1, p2 in zip(gripper_col_geoms_ids[:-1], gripper_col_geoms_ids[1:]))

hyperplane_plants = {} # map a geometry pair to the appropriate hyerplane plant
plane_dimensions = (4,4,0.001)
plane_color = (0,1,0,0.25)
dummy_inertia = SpatialInertia.SolidBoxWithDensity(0.01, *plane_dimensions)
dummy_box = Box(*plane_dimensions)

for p1, p2 in important_collision_pairs:
    hyperplane_plant = builder.AddSystem(MultibodyPlant(0.01))
    hyperplane_plant.RegisterAsSourceForSceneGraph(scene_graph)
    builder.Connect(hyperplane_plant.get_geometry_poses_output_port(), scene_graph.get_source_pose_port(
              hyperplane_plant.get_source_id()))
    builder.Connect(
          scene_graph.get_query_output_port(),
          hyperplane_plant.get_geometry_query_input_port())
    hyperplane_model_instance = hyperplane_plant.AddModelInstance("hyperplane")
    body_name = f"hyperplane_{p1, p2}"
    body = hyperplane_plant.AddRigidBody(body_name, hyperplane_model_instance, dummy_inertia)    
    hyperplane_plant.RegisterVisualGeometry(body, RigidTransform(), dummy_box, body_name, plane_color)
    hyperplane_plant.Finalize()
    hyperplane_plants[p1, p2] = hyperplane_plant
    meshcat.SetProperty(f"visualizer/hyperplane/{body_name}", "visible", False)

plant.Finalize()


diagram = builder.Build()
diagram_context = diagram.CreateDefaultContext()
plant_context = plant.GetMyContextFromRoot(diagram_context)
diagram.ForcedPublish(diagram_context)
scene_graph_context = scene_graph.GetMyMutableContextFromRoot(
    diagram_context)
import pydrake.multibody.rational as rational_forward_kinematics
from pydrake.all import RationalForwardKinematics
Ratfk = RationalForwardKinematics(plant)

DEBUG:drake:FindResource ignoring DRAKE_RESOURCE_ROOT because it is not set.
DEBUG:drake:FindRunfile found by-manifest '/home/peter/git/drake/.drake-find_resource-sentinel' (good) and by-directory '/home/peter/.cache/bazel/_bazel_peter/76bc0000d64a792547034bb2ef203b82/execroot/drake/bazel-out/k8-opt/bin/cvisiris_examples/14dof_env_vis.runfiles/drake/.drake-find_resource-sentinel' (good)
DEBUG:drake:FindRunfile found by-manifest '/home/peter/git/drake/package.xml' (good) and by-directory '/home/peter/.cache/bazel/_bazel_peter/76bc0000d64a792547034bb2ef203b82/execroot/drake/bazel-out/k8-opt/bin/cvisiris_examples/14dof_env_vis.runfiles/drake/package.xml' (good)
DEBUG:drake:FindResource ignoring DRAKE_RESOURCE_ROOT because it is not set.
DEBUG:drake:FindRunfile found by-manifest '/home/peter/git/drake/.drake-find_resource-sentinel' (good) and by-directory '/home/peter/.cache/bazel/_bazel_peter/76bc0000d64a792547034bb2ef203b82/execroot/drake/bazel-out/k8-opt/bin/cvisiris_examples/14dof_env_

In [6]:
hyperplane_plant_contexts = {k: diagram.GetMutableSubsystemContext(v, diagram_context) for k,v in hyperplane_plants.items()}

In [8]:
from pydrake.geometry.optimization_dev import CspaceFreePath
q_star = np.zeros(plant.num_positions())
print(q_star.shape)


(12,)


In [9]:
important_collision_pairs += [(p1,p2) for p1, p2 in zip(gripper_col_geoms_ids[:-1], gripper_col_geoms_ids[1:])]


In [11]:
sliders = []

rrt_end_q = np.array([
    -0.96706,  0.3056 , -0.26706, -1.3944 , -1.66706, -0.6944 ,
       -1.76706, -1.0944 ,  1.33294,  0.9056 ,  0.73294, -1.29
])
q = rrt_end_q.copy()

for i in range(plant.num_positions()):
    q_low = plant.GetPositionLowerLimits()[i]
    q_high = plant.GetPositionUpperLimits()[i]
    sliders.append(widgets.FloatSlider(min=q_low, max=q_high, value=q[i], description=f"q{i}"))
#np.zeros(plant.num_positions())
ik = InverseKinematics(plant, plant_context)
collision_constraint = ik.AddMinimumDistanceConstraint(0.001, 0.001)
def eval_cons(q, c, tol):
    return 1-1*float(c.evaluator().CheckSatisfied(q, tol))
diagram_col_context = diagram.CreateDefaultContext()
plant_col_context = diagram.GetMutableSubsystemContext(plant, diagram_col_context)
scene_graph_col_context = diagram.GetMutableSubsystemContext(
        scene_graph, diagram_col_context
    )
query_port = scene_graph.get_query_output_port()


def check_collision_q_by_query(q):
#     if np.all(q >= plant.GetPositionLowerLimits()) and np.all(
#         q <= plant.GetPositionUpperLimits()
#     ):
#         plant.SetPositions(plant_context, q)
#         query_object = query_port.Eval(scene_graph_context)
# #         if query_object.HasCollisions():
# #         print(query_object.ComputePointPairPenetration())
#         return 1 if query_object.HasCollisions() else 0
#     else:
#         print("exiting position limits")
#         print(np.logical_and(q >= plant.GetPositionLowerLimits(), 
#         q <= plant.GetPositionUpperLimits()))
#         print(plant.GetPositionLowerLimits())
#         print(q)

#         print(plant.GetPositionUpperLimits())
#         print()
#         return 1
    plant.SetPositions(plant_context, q)
    query_object = query_port.Eval(scene_graph_context)
#         if query_object.HasCollisions():
#         print(query_object.ComputePointPairPenetration())
    return 1 if query_object.HasCollisions() else 0
    

def check_collision_s_by_query(s):
    s = np.array(s)
    q = Ratfk.ComputeQValue(s, q_star)
    return check_collision_q_by_query(q)


    
col_func_handle = check_collision_q_by_query#partial(eval_cons, c=collision_constraint, tol=0.01)
# col_func_handle = partial(eval_cons, c=collision_constraint, tol=0.01)

col_shunk_col =  Rgba(0.8, 0.0, 0, 0.5)    
col_shunk_free =  Rgba(0.0, 0.8, 0.5, 0.5)   

def handle_slider_change(change, idx):
    q[idx] = change['new']
    showres(q)

def showres_s(s_vis):
    showres(Ratfk.ComputeQValue(s_vis, np.zeros(7)))

def showres(qvis):
    plant.SetPositions(plant_context, qvis)
    col = col_func_handle(qvis)
    shunk = plant.get_body(pydrake.multibody.tree.BodyIndex(9))
    tf_tot = shunk.EvalPoseInWorld(plant_context)
    tf = tf_tot.translation()
    
   # tf_tot= plant.EvalBodyPoseInWorld(plant_context, plant.get_body(pydrake.multibody.tree.BodyIndex(7)))
    #tf = tf_tot.translation() - tf_tot.GetAsMatrix4()[:3,:3][:,1] *0.15
    if col:
        meshcat.SetObject(f"/drake/visualizer/shunk",
                                   Sphere(0.05),
                                   col_shunk_col)
    else:
        meshcat.SetObject(f"/drake/visualizer/shunk",
                                   Sphere(0.05),
                                   col_shunk_free)
    meshcat.SetTransform(f"/drake/visualizer/shunk",
                                   RigidTransform(RotationMatrix(),
                                                  tf))
    
    diagram.ForcedPublish(diagram_context)

scaler = 1 #np.array([0.8, 1., 0.8, 1, 0.8, 1, 0.8]) #do you even geometry bro?
q_min = np.array(q_low)*scaler
q_max = np.array(q_high)*scaler
q_diff = q_max-q_min

# def sample_cfree_QPoint(MAXIT=1000):
# 	it = 0
# 	while it<MAXIT:
# 		rand = np.random.rand(5)
# 		q_s = q_min + rand*q_diff
# 		col = False
# 		for _ in range(10):
# 			r  = 0.05*(np.random.rand(5)-0.5)
# 			col |= (col_func_handle(q_s+r) > 0)
# 		if not col:
# 			return q_s #Ratfk.ComputeQValue(q_s, q_star)
# 		it+=1
# 	raise ValueError("no col free point found")

idx = 0
for slider in sliders:
    slider.observe(partial(handle_slider_change, idx = idx), names='value')
    idx+=1

display(sliders[0])
for slider in sliders:
    display(slider)

In [12]:
print(repr(q))
print(check_collision_q_by_query(q))

array([-0.96706,  0.3056 , -0.26706, -1.3944 , -1.66706, -0.6944 ,
       -1.76706, -1.0944 ,  1.33294,  0.9056 ,  0.73294, -1.29   ])
0


In [13]:
rrt_end_s = Ratfk.ComputeSValue(rrt_end_q, q_star)
rrt_start_s = Ratfk.ComputeSValue(np.zeros_like(q_star), q_star)


In [14]:
np.random.seed(0)
collision_checker = StraightLineCollisionChecker(check_collision_s_by_query, 200)
good_rrt = BiRRT(tuple(rrt_end_s), tuple(rrt_start_s),
           Ratfk.ComputeSValue(plant.GetPositionLowerLimits(),q_star),
           Ratfk.ComputeSValue(plant.GetPositionUpperLimits(),q_star),
           collision_checker, max_dist = 0.5)
good_rrt.build_tree(int(100), exit_on_path = False)

100%|██████████████████████████████████| 100/100 [00:02<00:00, 34.14it/s]


True

In [15]:
np.random.seed(1)
collision_checker = StraightLineCollisionChecker(check_collision_s_by_query, 10)
bad_rrt = BiRRT(tuple(rrt_end_s), tuple(rrt_start_s),
           Ratfk.ComputeSValue(plant.GetPositionLowerLimits(),q_star),
           Ratfk.ComputeSValue(plant.GetPositionUpperLimits(),q_star),
           collision_checker, max_dist = 1)
bad_rrt.build_tree(int(1e2), exit_on_path = False)

100%|█████████████████████████████████| 100/100 [00:00<00:00, 369.59it/s]


True

In [16]:
diagram_vis_context = diagram.CreateDefaultContext()
plant_vis_context = diagram.GetMutableSubsystemContext(
    plant, diagram_vis_context
)
vis_bundle = vis_utils.VisualizationBundle(
        diagram,
        diagram_context,
        plant,
        plant_vis_context,
        Ratfk,
        meshcat,
        q_star,
    )
for i, body in enumerate([plant.GetBodyByName(f"iiwa_link_{7}", ModelInstanceIndex(3)), 
             plant.GetBodyByName(f"iiwa_link_{7}", ModelInstanceIndex(2))]):
#     good_rrt.draw_tree(vis_bundle,
#                  body,
#                  prefix=f"bi_rrt_{body.name()}{i}/")
    good_rrt.draw_start_target_path(vis_bundle,
                 body,
                 prefix=f"bi_rrt_{body.name()}{i}/")

In [52]:
import networkx as nx
def make_line_polys(tree, max_num_edges=-1):
    polys = np.empty(
        shape=(
            plant.num_positions(),
            len(tree.edges()) if max_num_edges < 0 else max_num_edges,
        ),
        dtype=object,
    )
    for i, (s0, s1) in enumerate(tree.edges()):
        for j in range(plant.num_positions()):
            if max_num_edges > 0 and not i < max_num_edges:
                break
            polys[j, i] = PolynomialCommon(np.array([s0[j], s1[j] - s0[j]]))

    return polys

def make_line_polys_shortest_path(rrt):
    path = nx.dijkstra_path(rrt.connected_tree, rrt.start_pos, rrt.end_pos)
    pairs = list(zip(path[:-1],path[1:]))
    polys = np.empty(
        shape=(
            plant.num_positions(),
            len(pairs),
        ),
        dtype=object,
    )
    for i, (s0, s1) in enumerate(pairs):
        for j in range(plant.num_positions()):
            polys[j, i] = PolynomialCommon(np.array([s0[j], s1[j] - s0[j]]))

    return polys
        
    
path_safe = make_line_polys_shortest_path(good_rrt)
for idx in range(1):#path_safe.shape[1]):
    cur_traj = PiecewisePolynomial([path_safe[:,idx]], [0,1])
    cur_traj.ReverseTime()
    cur_traj.shiftRight(1)
    for t in np.linspace(0,1,20000):
        s = cur_traj.value(t)
        q = Ratfk.ComputeQValue(s, q_star)
        plant.SetPositions(plant_context, q)
        diagram.ForcedPublish(diagram_context)
        #time.sleep(0.05)
        if check_collision_q_by_query(q):
            print("collision detected")
            time.sleep(0.01)
            break

In [20]:
plane_order = 1
max_degree = 1
t0 = time.time()
cspace_free_path = CspaceFreePath(
    plant,
    scene_graph,
    q_star,
    maximum_path_degree=max_degree,
    plane_order=plane_order,
)
t1 = time.time()
print(f"Time to build collision checker {t1-t0}")

DEBUG:drake:Generating collision pairs
DEBUG:drake:Collision pairs generated in 0.00119904 seconds
DEBUG:drake:Generating separating planes
DEBUG:drake:Separating planes generated in 0.004256168 seconds
DEBUG:drake:Generating TC-space rationals
DEBUG:drake:TC-space rationals generated in 46.420404572 seconds
DEBUG:drake:Pre-allocating PSD
DEBUG:drake:PSD allocated in 0.147485531 seconds
DEBUG:drake:Expanding rationals and pre-allocating programs
DEBUG:drake:Path rationals and programs allocated in 657.762665355 seconds


Time to build collision checker 705.3631045818329


In [42]:
path_to_data_save_folder = Path(
    "/home/amice/Documents/coding_projects/drake/C_Iris_Examples/final_experiment_data_redo"
)
############## CERTIFICATION ##################
cert_options = CspaceFreePath.FindSeparationCertificateGivenPathOptions()
cert_options.terminate_segment_certification_at_failure = False

cert_options.num_threads = -1
cert_options.verbose = False
cert_options.solver_id = MosekSolver.id()
cert_options.solver_options = SolverOptions()
cert_options.terminate_path_certification_at_failure = False

In [43]:
len(good_rrt.connected_tree.edges)

120

In [24]:
# def rrt_experiment(rrt, save_name):
#     edges = make_line_polys(rrt.connected_tree)
#     print(edges.shape)
#     t0 = time.time()
#     (
#         statistics,
#         cert_result,
#     ) = cspace_free_path.FindSeparationCertificateGivenPath(
#         edges, set(), cert_options
#     )
#     t1 = time.time()
#     file_name = path_to_data_save_folder / (save_name)
#     with open(
#         file_name,
#         "wb",
#     ) as f:
#         pickle.dump(statistics, f)
#     print(f"num pairs to certify {len(statistics[0].total_time_to_certify_pair)}")
#     print(f"Certification of safe PRM for {plant.num_positions()} links in {t1 - t0}s")
#     print(
#         f"Frac edges safe = {sum([1 if s.certified_safe() else 0 for s in statistics])}/{len(statistics)}"
#     )
#     print(
#         f"Unsafe inds = {[idx for idx, s in enumerate(statistics) if not s.certified_safe()]}"
#     )
#     return statistics, cert_result
# rrt_experiment(good_rrt, f"bimanual_example_edges_GOOD_RRT.pkl")
# rrt_experiment(bad_rrt, f"bimanual_example_edges_BAD_RRT.pkl")

In [44]:
make_line_polys(good_rrt.connected_tree).shape

(12, 120)

In [26]:
# file_name = path_to_data_save_folder / (f"bimanual_example_edges_GOOD_RRT.pkl")
# with open(file_name, "rb") as f:
#     statistics = pickle.load(f)
# statistics

In [45]:
path_safe = make_line_polys_shortest_path(good_rrt)
t0 = time.time()
(
    statistics_path_safe,
    cert_result_path_safe,
) = cspace_free_path.FindSeparationCertificateGivenPath(
    path_safe, set(), cert_options
)
t1 = time.time()
print(t1-t0)

35.15886878967285


In [46]:
sum([s.total_time_to_solve_progs() for s in statistics_path_safe])/20

1041.8965741046122

In [47]:
total_times = [s.total_time_to_certify()/20 for s in statistics_path_safe]
print(f"Average time = {np.mean(total_times)/1000}")
print(f"Total Time = {np.sum(total_times)/1000/60}")

Average time = 11.711584202416665
Total Time = 0.5855792101208334


In [48]:
for s in statistics_path_safe:
    print(s.certified_safe())

False
False
True


In [75]:
# safe = [s.c() for s in statistics]
# print(f"Num Safe = {sum(safe)}")
fails =  []
for k in statistics_path_safe[1].solution_statuses.keys():
    if statistics_path_safe[1].solution_statuses[k] == 5:
        fails.append(k)
# print(fails)
# len(cspace_free_path.separating_planes())
planeid_to_geompair = {v:k for k,v in cspace_free_path.map_geometries_to_separating_planes().items()}
unsafe_pairs = [planeid_to_geompair[id] for id in fails]
for a in unsafe_pairs:
    print('----------')
    for g in a:
        print(plant.GetBodyFromFrameId(inspector.GetFrameId(g)).name())

In [34]:
# print(s.solution_statuses)

In [56]:
len(inspector.GetCollisionCandidates())

246

## START PLANE PLOTTING

In [114]:
geoms = []
# for model_info in [plant.GetModelInstanceByName("shelves1"), plant.GetModelInstanceByName("shelves2")]:

names = ["top", "bottom", "right", "left", "shelf1", "shelf2"]
for n in names:
# shelf_geom = plant.GetBodyByName(, model_info)

    for s in ["shelves1"]:
        path = f"/drake/visualizer/{s}/{n}"
        
        meshcat.SetProperty(path, "color", np.array([0.5,0.95,0.6,1]))  
        
for n in ["left","right"]:
# shelf_geom = plant.GetBodyByName(, model_info)

    for s in ["shelves1"]:
        path = f"/drake/visualizer/{s}/{n}"
        
        meshcat.SetProperty(path, "color", np.array([0.5,0.95,0.6,0.5]))  
#     #meshcat.SetProperty(path+body, "color", np.array([0.8,.2,0,0.3]))
#     meshcat.SetProperty(f"visualizer/hyperplane/{body}", "visible", True)
#     body_obs = plant.GetBodyFromFrameId(inspector.GetFrameId(pair[0]))
#     if plant.GetModelInstanceByName('shelves2') ==body_obs.model_instance():
#         path = f"/drake/visualizer/shelves2/{body_obs.name()}"
#         meshcat.SetProperty(path, "color",np.array([color[0],color[1],color[2],1.0]))
#     else:
#         path = f"/drake/visualizer/shelves1/{body_obs.name()}"
#         meshcat.SetProperty(path, "color",np.array([color[0],color[1],color[2],1.0]))
#     body_obs = plant.GetBodyFromFrameId(inspector.GetFrameId(pair[1]))
#     path = f"/drake/visualizer/wsg/{body_obs.name()}"
#     meshcat.SetProperty(path, "color",np.array([1,0,0,1.0]))
for i  in [1,2]:
    path = f"/drake/visualizer/wsg{i}/body"
    meshcat.SetProperty(path, "color",np.array([0.8,0.,.1,1.0]))
    path = f"/drake/visualizer/wsg{i}/left_finger"
    meshcat.SetProperty(path, "color",np.array([0.8,0.,.1,1.0]))
    path = f"/drake/visualizer/wsg{i}/right_finger"
    meshcat.SetProperty(path, "color",np.array([0.8,0.,.1,1.0]))


In [77]:
t_breaks = np.arange(0,path_safe.shape[1]+1)
traj_safe = PiecewisePolynomial(path_safe.T, t_breaks)


In [78]:
def animate_traj(traj, num_samples = int(1e3), do_break = False):
    for t in np.linspace(traj.start_time(), traj.end_time(),num_samples):
        s = traj.value(t)
        showres_s(s)
        time.sleep(0.01)
        if col_func_handle(Ratfk.ComputeQValue(s, np.zeros(7))):
            print('col')
            if do_break:
                break
animate_traj(traj_safe)

col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col
col


In [95]:
from pydrake.all import Evaluate
from pydrake.all import AngleAxis,RigidTransform
from scipy.linalg import null_space
query = scene_graph.get_query_output_port().Eval(scene_graph.GetMyContextFromRoot(diagram_context))
inspector = query.inspector()
geompair_to_planes = CspaceFreePath.map_geometries_to_separating_planes(cspace_free_path)

# cert result is indexed by the prm edge. 
# first_edge_cert_result = cert_result[0]

desired_result_tuple = important_collision_pairs[0]
def get_associated_plane_result_tuple(p1, p2, cert_result_for_edge):
    key = (p1, p2) if (p1,p2) in cert_result_for_edge.keys() else (p2,p1)
    return cert_result_for_edge[key]
# sep_cert_result = get_associated_plane_result_tuple(*desired_result_tuple,
#                                                     first_edge_cert_result)


def compute_plane_transform(a, b, p1, p2):
    try:
        #determine normal
        assert len(a)==3
        p1 = p1.squeeze()
        p2 = p2.squeeze()
        mu = -b / (a.T @ (p2 - p1))
        offset = mu * (p2 - p1)
        axis = (a / np.linalg.norm(a))[:]
        n = (a/np.linalg.norm(a)).squeeze()
        a = a.squeeze()
        target_vector = np.array([0, 0, 1])
        cos_theta = np.dot(target_vector, n)
        rotation_axis = np.cross(target_vector, n)
        rotation_axis = rotation_axis/np.linalg.norm(rotation_axis) 
        theta = np.arccos(cos_theta)
        rotation = AngleAxis(theta, rotation_axis).rotation()
        #print(axis.shape)
        P = null_space(axis.T)
        rotation = np.hstack([P, axis])
        R = RotationMatrix(rotation)
        return RigidTransform(R, offset)
    except:
        return RigidTransform()
# def compute_plane_transform(a, b, p1, p2):
#     #determine normal
#     assert len(a)==3
#     p1 = p1.squeeze()
#     p2 = p2.squeeze()
#     mu = -b / (a.T @ (p2 - p1))
#     offset = mu * (p2 - p1)
#     axis = (a / np.linalg.norm(a))[:]
#     n = (a/np.linalg.norm(a)).squeeze()
#     a = a.squeeze()
#     target_vector = np.array([0, 0, 1])
#     cos_theta = np.dot(target_vector, n)
#     rotation_axis = np.cross(target_vector, n)
#     rotation_axis = rotation_axis/np.linalg.norm(rotation_axis) 
#     theta = np.arccos(cos_theta)
#     rotation = AngleAxis(theta, rotation_axis).rotation()
#     #print(axis.shape)
#     #P = null_space(axis.T)
#     #rotation = np.hstack([P, axis])
#     R = RotationMatrix(rotation)
#     return RigidTransform(R, offset)
def compute_plane_transform(a, b, p1, p2):
    #determine normal
    assert len(a)==3
    p1 = p1.squeeze()
    p2 = p2.squeeze()
    t = (-b -a.T@p2)/ (a.T @ (p1 - p2))
    np.clip(t, 0,1)
    offset = t*p1 + (1-t)*p2
    #offset = mu * (p2 - p1)
#         offset = 0.5*(p2 - p1)
    axis = (a / np.linalg.norm(a))[:]
    n = (a/np.linalg.norm(a)).squeeze()
    a = a.squeeze()
    target_vector = np.array([0, 0, 1])
    cos_theta = np.dot(target_vector, n)
    rotation_axis = np.cross(target_vector, n)
    rotation_axis = rotation_axis/np.linalg.norm(rotation_axis) 
    theta = np.arccos(cos_theta)
    rotation = AngleAxis(theta, rotation_axis).rotation()
    #print(axis.shape)
    #P = null_space(axis.T)
    #rotation = np.hstack([P, axis])
    R = RotationMatrix(rotation)
    return RigidTransform(R, offset)
#     translation = (-b-a@pt2)/(a@pt1-a@pt2) *pt1
#     X = RigidTransform(RotationMatrix(rotation),translation)
    #return X

def get_hyperplane_transform_from_geom_pair(pair, mu_val, edge_result):
    cert = get_associated_plane_result_tuple(*pair,
                                            edge_result)
    a = Evaluate(cert.a, {mu: mu_val for mu in cert.a[0].indeterminates()})
    b = Evaluate(np.array([cert.b]), 
                 {mu: mu_val for mu in cert.b.indeterminates()})[0].item()
    
    plane_idx = geompair_to_planes[pair]
    body_index = cspace_free_path.separating_planes()[plane_idx].expressed_body
    expressed_body = plant.get_body(body_index)
    X_WE = plant.EvalBodyPoseInWorld(
                plant_context, expressed_body)
    X_EW = X_WE.inverse()
    f_ids = [inspector.GetFrameId(p) for p in pair]
    bodies = [plant.GetBodyFromFrameId(f) for f in f_ids]
    X_WG = [plant.EvalBodyPoseInWorld(plant_context,b) for b in bodies]
    p1 = (X_EW @ X_WG[0]).translation()
    p2 = (X_EW @ X_WG[1]).translation()
    X_E_plane = compute_plane_transform(a,b,p1, p2)
    
    return X_WE @ X_E_plane
    
def set_hyperplane_transform_by_col_pair(p1, p2, X):
    key = (p1, p2) if (p1,p2) in hyperplane_plants.keys() else (p2, p1)
    plant = hyperplane_plants[key]
    plant_context = hyperplane_plant_contexts[key]
    p = X.translation()
    quat = X.rotation().ToQuaternion().wxyz()
    plant.SetPositions(plant_context, np.append(quat, p))

In [96]:
print(gripper_col_geoms_ids)
print(important_collision_pairs)


[<GeometryId value=118>, <GeometryId value=132>]
[(<GeometryId value=118>, <GeometryId value=146>), (<GeometryId value=132>, <GeometryId value=146>), (<GeometryId value=118>, <GeometryId value=152>), (<GeometryId value=132>, <GeometryId value=152>), (<GeometryId value=118>, <GeometryId value=158>), (<GeometryId value=132>, <GeometryId value=158>), (<GeometryId value=118>, <GeometryId value=164>), (<GeometryId value=132>, <GeometryId value=164>), (<GeometryId value=118>, <GeometryId value=170>), (<GeometryId value=132>, <GeometryId value=170>), (<GeometryId value=118>, <GeometryId value=176>), (<GeometryId value=132>, <GeometryId value=176>), (<GeometryId value=118>, <GeometryId value=132>), (<GeometryId value=118>, <GeometryId value=132>)]


In [97]:
get_hyperplane_transform_from_geom_pair(important_collision_pairs[0],
                                       1, cert_result_path_safe[-1])

RigidTransform(
  R=RotationMatrix([
    [-0.3622586368064893, -0.3357789123206901, -0.8694947970516206],
    [0.05768060700151817, 0.9229926612508288, -0.38047009718642755],
    [0.9302911520782301, -0.18798156643098698, -0.31499413176615193],
  ]),
  p=[-0.545422987726133, 0.09745360931221171, 0.25034806017646266],
)

In [98]:
important_collision_id_to_body_name = dict()

for elt in important_collision_pairs:
    for p in elt:
        frame_id = inspector.GetFrameId(p)
        body = plant.GetBodyFromFrameId(frame_id)
        important_collision_id_to_body_name[p] = body.name()
print(important_collision_id_to_body_name)
show_res_s_with_planes(traj_safe.value(traj_safe.end_time()),1, 
                       cert_result_path_safe[-1])

{<GeometryId value=118>: 'body', <GeometryId value=146>: 'bottom', <GeometryId value=132>: 'body', <GeometryId value=152>: 'top', <GeometryId value=158>: 'right', <GeometryId value=164>: 'left', <GeometryId value=170>: 'shelf1', <GeometryId value=176>: 'shelf2'}


In [99]:
pairs_to_show =important_collision_pairs[-1:]
#important_collision_pairs
def set_colors_col_pair(pair, color):
    path = "/drake/visualizer/hyperplane/"
    body = f"hyperplane_{pair[0], pair[1]}"
    meshcat.SetProperty(path+body, "color", np.array([color[0],color[1],color[2],0.2]))  
    #meshcat.SetProperty(path+body, "color", np.array([0.8,.2,0,0.3]))
    meshcat.SetProperty(f"visualizer/hyperplane/{body}", "visible", True)
for pair in pairs_to_show:
    set_colors_col_pair(pair, np.array([1.0,0,0,0.1]))

In [100]:
important_collision_pairs[-1]

(<GeometryId value=118>, <GeometryId value=132>)

In [101]:
important_collision_pairs

[(<GeometryId value=118>, <GeometryId value=146>),
 (<GeometryId value=132>, <GeometryId value=146>),
 (<GeometryId value=118>, <GeometryId value=152>),
 (<GeometryId value=132>, <GeometryId value=152>),
 (<GeometryId value=118>, <GeometryId value=158>),
 (<GeometryId value=132>, <GeometryId value=158>),
 (<GeometryId value=118>, <GeometryId value=164>),
 (<GeometryId value=132>, <GeometryId value=164>),
 (<GeometryId value=118>, <GeometryId value=170>),
 (<GeometryId value=132>, <GeometryId value=170>),
 (<GeometryId value=118>, <GeometryId value=176>),
 (<GeometryId value=132>, <GeometryId value=176>),
 (<GeometryId value=118>, <GeometryId value=132>),
 (<GeometryId value=118>, <GeometryId value=132>)]

In [119]:
from pydrake.all import PiecewisePolynomial
from tqdm import tqdm
traj = PiecewisePolynomial(path_safe.T,
                           np.arange(path_safe.shape[1]+1))


cert_result = cert_result_path_safe

# for pair in important_collision_pairs:
#     if pair not in pairs_to_show:
#         path = "/drake/visualizer/hyperplane/"
#         body = f"hyperplane_{pair[0], pair[1]}"
#         meshcat.Delete(path+body)  
# #pair = pairs_to_show
# for pair in pairs_to_show:
#     path = "/drake/visualizer/hyperplane/"
#     body = f"hyperplane_{pair[0], pair[1]}"
#     meshcat.SetProperty(path+body, "color", np.array([0.8,.2,0,0.3]))  

#     name_obs = plant.GetBodyFromFrameId(inspector.GetFrameId(pair[1])).name()
#     path = "/drake/visualizer/obstacle/"
#     meshcat.SetProperty(path+name_obs, "color",np.array([0.8,.2,0,0.9]))  
#     name_link = plant.GetBodyFromFrameId(inspector.GetFrameId(pair[0])).name()
#     path = "/drake/visualizer/pendulum/"
#     meshcat.SetProperty(path+name_link, "color",np.array([0.8,.2,0,0.9]))  
    
num_edges_to_navigate = path_safe.shape[1]
animation = visualizer.StartRecording()

def show_res_s_with_planes(s, path_time, cert_result):
    q = Ratfk.ComputeQValue(s, q_star)
    plant.SetPositions(plant_context, q)
    if check_collision_q_by_query(q):
        print(f"COL at idx {idx}")
        has_col = True
    for pair in pairs_to_show:
        X = get_hyperplane_transform_from_geom_pair(pair, path_time, cert_result)
        set_hyperplane_transform_by_col_pair(*pair, X)
    diagram_context.SetTime(time.time())
    diagram.ForcedPublish(diagram_context)
diagram_context.SetTime(0)
diagram.ForcedPublish(diagram_context)


for idx in tqdm(range(1)):
    has_col = False
    for t in np.linspace(0.9,0,400):
        cur_time = idx + t
        s = traj.value(cur_time)
        show_res_s_with_planes(s, t, cert_result[idx])
        time.sleep(0.025)
    for _ in range(100):
        cur_time = idx + 0
        s = traj.value(cur_time)
        show_res_s_with_planes(s, 0, cert_result[idx])
        time.sleep(0.025)
visualizer.StopRecording()
visualizer.PublishRecording()

100%|██████████████████████████████████████| 1/1 [00:13<00:00, 13.45s/it]


In [63]:
traj.end_time()



2.0

In [67]:
traj.get_segment_times()

[0.0, 1.0, 2.0]

## END PLANE PLOTTING

In [None]:
edges_safe = [np.all([val == 0 or val == 1 for val in s.solution_statuses.values()]) for s in statistics] 
print(len(edges_safe))
print(f"Num Safe = {sum(edges_safe)}")

In [None]:

path_to_data_save_folder = Path(
    "/home/amice/Documents/coding_projects/drake/C_Iris_Examples/final_experiment_data"
)
l = 100
######### BUILD PRM ##########
Ratfk = RationalForwardKinematics(plant)
diagram_col_context = diagram.CreateDefaultContext()
plant_col_context = diagram.GetMutableSubsystemContext(
    plant, diagram_col_context
)
scene_graph_col_context = diagram.GetMutableSubsystemContext(
    scene_graph, diagram_col_context
)
query_port = scene_graph.get_query_output_port()

def check_collision_q_by_query(q):
    if np.all(q >= plant.GetPositionLowerLimits()) and np.all(
        q <= plant.GetPositionUpperLimits()
    ):
        plant.SetPositions(plant_col_context, q)
        query_object = query_port.Eval(scene_graph_col_context)
        return 1 if query_object.HasCollisions() else 0
    else:
        return 1

def check_collision_s_by_query(s):
    s = np.array(s)
    q = Ratfk.ComputeQValue(s, q_star)
    return check_collision_q_by_query(q)

def sample_col_free_point():
    q = np.random.uniform(
        plant.GetPositionLowerLimits(),
        plant.GetPositionUpperLimits(),
    )
    s = Ratfk.ComputeSValue(q, q_star)
    return s

collision_checker = StraightLineCollisionChecker(check_collision_s_by_query, 100)
prm_save_name = (
    path_to_data_save_folder / f"bimanual_example_{l}_edges_PRM.pkl"
)
print(str(prm_save_name))
if True:#not prm_save_name.exists():
    prm = PRMFixedEdges(
        sample_col_free_point, l, collision_checker, dist_thresh=3
    )
    with open(prm_save_name, "wb") as f:
        pickle.dump(prm, f)
else:
    with open(prm_save_name, "rb") as f:
        prm = pickle.load(f)

    diagram_vis_context = diagram.CreateDefaultContext()
    plant_vis_context = diagram.GetMutableSubsystemContext(
        plant, diagram_vis_context
    )
    vis_bundle = vis_utils.VisualizationBundle(
        diagram,
        diagram_context,
        plant,
        plant_vis_context,
        Ratfk,
        meshcat,
        q_star,
    )
    end_effector = plant.GetBodyByName(f"iiwa_link_{6}", ModelInstanceIndex(3))

prm.draw_tree(vis_bundle, end_effector)
path_safe = make_line_polys(prm)
############## CERTIFICATION ##################
cert_options = CspaceFreePath.FindSeparationCertificateGivenPathOptions()
cert_options.terminate_segment_certification_at_failure = False

cert_options.num_threads = -1
cert_options.verbose = False
cert_options.solver_id = MosekSolver.id()
cert_options.solver_options = SolverOptions()
cert_options.terminate_path_certification_at_failure = False

In [None]:
 t0 = time.time()
(
    statistics,
    cert_result,
) = cspace_free_path.FindSeparationCertificateGivenPath(
    path_safe, set(), cert_options
)
t1 = time.time()
file_name = path_to_data_save_folder / (
    f"bimanual_example_{l}_edges_STATISTIC.pkl"
)
with open(
    file_name,
    "wb",
) as f:
    pickle.dump(statistics, f)
print(f"num pairs to certify {len(statistics[0].total_time_to_certify_pair)}")
print(f"Certification of safe PRM for {plant.num_positions()} links in {t1 - t0}s")
print(
    f"Frac edges safe = {sum([1 if s.certified_safe() else 0 for s in statistics])}/{len(statistics)}"
)
print(
    f"Unsafe inds = {[idx for idx, s in enumerate(statistics) if not s.certified_safe()]}"
)

In [None]:
unsafe_idx = [idx for idx, s in enumerate(statistics) if not s.certified_safe()]
for idx in unsafe_idx:
    cur_traj = PiecewisePolynomial([path_safe[:,idx]], [0,1])
    for t in np.linspace(0,1,1000):
        s = cur_traj.value(t)
        q = Ratfk.ComputeQValue(s, q_star)
        plant.SetPositions(plant_context, q)
        diagram.ForcedPublish(diagram_context)
#         time.sleep(0.01)
        if check_collision_q_by_query(q):
            print("collision detected")
            break