In [1]:
import robotic as ry
import numpy as np
import time

In [21]:
config = ry.Config()
config.addFile("puzzles/p6-wall.g")

<robotic._robotic.Frame at 0x7f9dd4662070>

In [3]:
DEBUG_MODE = False

In [4]:
EGO_NAME = "ego"
OBJ_NAME = "obj"
SUB_GOAL_NAME = "sub-goal1"
GOAL_NAME = "goal"
CAMERA_NAME = "camera_top"
FLOOR_NAME = "floor"

GAP = config.getFrame(EGO_NAME).getSize()[1] * 2

In [22]:
config.view()
f = config.getFrame(CAMERA_NAME)
config.view_setCamera(f)

In [6]:
q_start = config.getFrame(EGO_NAME).getPosition()[:2]
q_obj = config.getFrame(OBJ_NAME).getPosition()[:2]
q_goal = config.getFrame(GOAL_NAME).getPosition()[:2]

In [7]:
def is_in_line_of_sight(position : np.array, target_frame : ry.Frame, config : ry.Config) -> bool:
    frame_position = target_frame.getPosition()
    frame_size = target_frame.getSize()
    frame_hypotenuse = np.sqrt(frame_size[0]**2 + frame_size[1]**2)

    sight_ray = frame_position[:2]  - position
    alpha = np.arctan2(sight_ray[1], sight_ray[0])
    theta = abs(alpha)

    offset = frame_hypotenuse / 2 # can be made more precise

    box_length = np.sqrt(sight_ray[0]**2 + sight_ray[1]**2) - 2 * offset
    box_width = frame_hypotenuse * max(
        np.cos(np.pi/4 - theta),
        np.sin(theta - np.pi/4)
    )
    box_height = .2

    box_x = position[0] + sight_ray[0]/2
    box_y = position[1] + sight_ray[1]/2
    box_z = 0.3
    box_yaw = alpha

    copy_config = ry.Config()
    copy_config.addConfigurationCopy(config)
    copy_config.addFrame("line_of_sight") \
        .setShape(ry.ST.box, [box_length, box_width, box_height]) \
        .setContact(1) \
        .setColor([1, 1, 1]) \
        .setPose(f"t({box_x} {box_y} {box_z}) E(0 0 {box_yaw})")
    copy_config.addFrame("point_A") \
        .setShape(ry.ST.marker, [.2]) \
        .setPosition(frame_position)
    copy_config.addFrame("point_B") \
        .setShape(ry.ST.marker, [.2]) \
        .setPosition(list(position) + [0.3])
    
    if DEBUG_MODE:
        copy_config.view_setCamera(copy_config.getFrame('camera_top'))
        copy_config.view(pause=True, message=f"box_width: {box_width:.2f}, alpha: {alpha/np.pi*180}, line: {sight_ray}")
        copy_config.view_close()

    is_in_sight = copy_config.getCollisionFree()
    del copy_config
    return is_in_sight

In [8]:
def cartesian_to_polar(coordinates):
    x = coordinates[0]
    y = coordinates[1]
    r = np.hypot(x, y)
    theta = np.arctan2(y, x)
    return r, theta

In [9]:
def polar_to_cartesian(coordinates):
    r = coordinates[0]
    theta = coordinates[1]
    x = r * np.cos(theta)
    y = r * np.sin(theta)
    return x, y

In [10]:
def solve_touch_with_komo(komo_config):
    komo = ry.KOMO(komo_config, phases=1, slicesPerPhase=1, kOrder=0, enableCollisions=True)
    komo.addObjective([], ry.FS.accumulatedCollisions, [], ry.OT.eq, scale=[1e2])
    komo.addObjective([], ry.FS.negDistance, [EGO_NAME, OBJ_NAME], ry.OT.eq, [1e1])
    ret = ry.NLP_Solver(komo.nlp(), verbose=0).solve()
    if ret:
        path = komo.getPath()
        return path
    else:
        return None

In [11]:
ry.params_add({
    "rrt/stepsize": 0.1
})
def solve_rrt(cur_config : ry.Config, q_begin : np.array, q_object : np.array, q_target : np.array):
    ITERATIONS = 24
    delta_theta = 2 * np.pi / ITERATIONS
    delta_theta += np.random.uniform(0, 2 * np.pi)
    for i in range(ITERATIONS):
        transpose_polar1 = np.array([GAP, i * delta_theta])
        transpose_cartesian1 = np.array(polar_to_cartesian(transpose_polar1))
        q_home1 = q_object + transpose_cartesian1
        cur_config.setJointState(q_home1)
        q_sub = solve_touch_with_komo(cur_config)
        # keep all komo results
        if q_sub is not None:
            cur_config.setJointState(q_begin)
            copy_config = ry.Config()
            copy_config.addConfigurationCopy(cur_config)
            copy_config.setJointState(q_sub)
            if copy_config.getCollisionsTotalPenetration() > 0:
                continue
            
            rrt = ry.PathFinder()
            rrt.setProblem(cur_config, [q_begin], [q_sub])
            solution1 = rrt.solve()
            del rrt
            if not solution1.feasible:
                continue
            
            cur_config.setJointState(q_sub)
            cur_config.attach(EGO_NAME, OBJ_NAME)
            q_real_goal = q_target + q_sub - q_object

            rrt = ry.PathFinder()
            rrt.setProblem(cur_config, [q_sub], [q_real_goal])
            solution2 = rrt.solve()
            del rrt
            cur_config.attach(FLOOR_NAME, OBJ_NAME)
            cur_config.setJointState(q_begin)
            if solution1.feasible and solution2.feasible:
                return solution1, solution2

    cur_config.setJointState(q_begin)            
    return None, None

In [12]:
def get_proximity_score(position : np.array, goal_frame : ry.Frame) -> int:
    distance = np.linalg.norm(position - goal_frame.getPosition()[:2])
    if distance < 0.2:
        return 5
    elif distance < 0.4:
        return 2
    else:
        return 0

In [None]:
def get_goal_score(
        subgoal_position : np.array,
        agent_frame : ry.Frame,
        goal_frame : ry.Frame,
        config : ry.Config) -> int:
    return 10 * int(is_in_line_of_sight(subgoal_position, goal_frame, config)) \
        + 5 * int(is_in_line_of_sight(subgoal_position, agent_frame, config)) \
        + get_proximity_score(subgoal_position, goal_frame)

In [None]:
def select_node(configs : list) -> ry.Config:
    best_score = 0
    config_index = 0
    best_config = configs[config_index]
    for i in range(len(configs)):
        sample_config = configs[i]
        score = get_goal_score(sample_config.getFrame(SUB_GOAL_NAME).getPosition()[:2], 
                                  sample_config.getFrame(EGO_NAME),
                                  sample_config.getFrame(GOAL_NAME),
                                  sample_config)
        if score > best_score:
            best_score = score
            best_config = sample_config
            config_index = i
    return best_config, config_index

In [15]:
def reachable(reach_config : ry.Config, object_frame : ry.Frame) -> bool:
    copy_config = ry.Config()
    copy_config.addConfigurationCopy(reach_config)
    copy_config.getFrame(OBJ_NAME).setContact(0)
    rrt = ry.PathFinder()
    rrt.setProblem(copy_config, [copy_config.getJointState()], [object_frame.getPosition()[:2]])
    ret = rrt.solve()
    del rrt
    del copy_config
    return ret.feasible

In [None]:
POINT_COUNT = 300
THRESHOLD = 2
SUBSET_SIZE = 25
def propose_subgoals(config : ry.Config, object_frame : ry.Frame) -> list:
    generated_points = np.random.uniform(low=-2, high=2, size=(POINT_COUNT, 2)) # random point sampling

    filtered_points = []
    for point in generated_points:
        config.getFrame(SUB_GOAL_NAME).setPosition([point[0], point[1], 0.2])
        score = get_goal_score(point, config.getFrame(EGO_NAME), config.getFrame(GOAL_NAME), config)
        if score >= THRESHOLD:
            filtered_points.append(point)

    filtered_points.append(object_frame.getPosition()[:2])
    points_array = np.array(filtered_points)
    subset_size = min(len(filtered_points), SUBSET_SIZE)
    point_subset = points_array[np.random.choice(points_array.shape[0], size=subset_size, replace=False)]

    return point_subset

In [17]:
def animate_solution(anim_config, solution):
    for state in solution.x:
        anim_config.setJointState(state)
        anim_config.view()
        time.sleep(0.025)

In [None]:
class SolutionNode:
    def __init__(self, goal_config : ry.Config, parent : 'SolutionNode' = None):
        self.goal_config = goal_config
        self.parent = parent
        self.children = []
        self.score = get_goal_score(goal_config.getFrame(OBJ_NAME).getPosition()[:2],
                                    goal_config.getFrame(EGO_NAME),
                                    goal_config.getFrame(GOAL_NAME),
                                    goal_config)

    def add_child(self, new_goal : ry.Config):
        new_node = SolutionNode(new_goal, parent=self)
        self.children.append(new_node)


class SolutionTree:
    def __init__(self, root : SolutionNode):
        self.root = root


In [None]:
# forward subproblem search algorithm
objects = [OBJ_NAME]
def forward_subproblem_search(start_config : ry.Config, goal_frame : ry.Frame):
    L = [start_config]
    while len(L) > 0:
        x, index = select_node(L)
        L.pop(index)
        solution1, solution2 = solve_rrt(x,
                                         x.getJointState(),
                                         x.getFrame(OBJ_NAME).getPosition()[:2],
                                         goal_frame.getPosition()[:2])
        if solution1 is not None: # if found
            return x, solution1, solution2
        
        for object in objects: # there is just one object for now
            object_frame = x.getFrame(object)
            if not reachable(x, object_frame):
                continue
            subgoals = propose_subgoals(x, object_frame)
            for subgoal in subgoals:
                new_config = ry.Config()
                new_config.addConfigurationCopy(x)
                new_config.getFrame(SUB_GOAL_NAME).setPosition(subgoal.tolist() + [0.2])
                path1, path2 = solve_rrt(new_config, new_config.getJointState(), object_frame.getPosition()[:2], subgoal)
                # if path found and not rej(L, x^f) then
                if path1 is not None:
                    new_config.setJointState(path1.x[-1])
                    new_config.attach(EGO_NAME, OBJ_NAME)
                    new_config.setJointState(path2.x[-1])
                    new_config.attach(FLOOR_NAME, OBJ_NAME)
                    if DEBUG_MODE:
                        new_config.view_setCamera(new_config.getFrame(CAMERA_NAME))
                        new_config.view(pause=True)
                        new_config.view_close()
                    L.append(new_config) # add last elememnt of path to L
    return None, None, None

In [19]:
# test random points
subgoals = propose_subgoals(config, config.getFrame(OBJ_NAME))
count = 0
for point in subgoals:
    count += 1
    config.addFrame(f"sg_{count}") \
        .setShape(ry.ST.sphere, [.05]) \
        .setPosition(list(point) + [0.2]) \
        .setColor([1, 0, 1])
config.view()

0

In [None]:
solution_config, path1, path2 = forward_subproblem_search(config, config.getFrame(GOAL_NAME))
if solution_config is not None:
    solution_config.view_setCamera(solution_config.getFrame(CAMERA_NAME))
    solution_config.view()
    animate_solution(solution_config, path1)
    solution_config.attach(EGO_NAME, OBJ_NAME)
    animate_solution(solution_config, path2)
    solution_config.attach(FLOOR_NAME, OBJ_NAME)
    solution_config.view()

In [None]:
config.view_close()
del config