In [1]:
import time
from IPython.core.display import display, HTML
from matplotlib import collections as mc
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib import cm
import numpy as np
import pickle

from pydrake.geometry import StartMeshcat
from pydrake.geometry import MeshcatVisualizer, Rgba, StartMeshcat
from pydrake.geometry.optimization import Point
from pydrake.planning import GcsTrajectoryOptimization

from underactuated import running_as_notebook
from underactuated.uav_environment import (
    CONVEX_GCS_OPTION,
    NONLINEAR_GCS_OPTION,
    UavEnvironment,
)

# from underactuated.uav_environment import *
from custom_uav_env import *
import os

  from IPython.core.display import display, HTML


In [2]:
def reconstruct_trajectory_from_solution_path(edges, rounded_result, vertex_to_subgraph, gcs):
    bezier_curves = []
    bezier_curves_params = []
    for e in edges:
        phi_inv = 1. / rounded_result.GetSolution(e.phi())
        num_control_points = vertex_to_subgraph[e.u()].order() + 1
        # edge_path_points = 
        # print(rounded_result.GetSolution(e.xu()).shape, num_control_points)
        # total += (rounded_result.GetSolution(e.xu())[-1])
        xu_result = rounded_result.GetSolution(e.xu())
        h = phi_inv * xu_result[-1]
        edge_path_points = xu_result[:-1].reshape(num_control_points, gcs.num_positions()).T

        start_time = 0 if len(bezier_curves) == 0 else bezier_curves[-1].end_time()

        if num_control_points > 1 and h < 1e-10:
            # TODO
            pass
        elif not (num_control_points == 1): # TODO: enforce && vertex_to_subgraph_[&e->u()]->h_min_ == 0))
            bc = BezierCurve(start_time, start_time + h, edge_path_points)
            bezier_curves.append(bc)
            bezier_curves_params.append((start_time, start_time + h, edge_path_points))

    return CompositeTrajectory(bezier_curves), bezier_curves_params

In [5]:
tokenized_data_dir = "/mnt/tokenized_data_dir"

env_shape = (6, 6)
x = np.arange(1, env_shape[0]-1)
y = np.arange(1, env_shape[1]-1)
xx, yy = np.meshgrid(x, y)
possible_goals = np.stack([xx, yy]).reshape(2, -1).T * CELL_SIZE

num_time_bins = 512
num_space_bins = 1024
# num_time_bins = 2**16
# num_space_bins = 2**16

time_max, time_min = 0.5, 0
space_max, space_min = 1, -1

collision = 0
total = 0

for file_name in os.listdir(tokenized_data_dir):
    file_path = os.path.join(tokenized_data_dir, file_name)
    data = np.load(file_path, allow_pickle=True)

    prev_location = np.array([0, 0, 2.])
    prev_time = 0

    bezier_curves = []

    random_id = np.random.choice(len(data["trajs"]), 1)[0]

    for bez_params in data["trajs"][random_id].reshape(-1, 19):
        delta_time = (bez_params[0] - num_space_bins) / num_time_bins * (time_max - time_min) + time_min
        start_time = prev_time
        end_time = prev_time + delta_time

        if delta_time < 1e-6:
            continue

        ctrl_pts = [prev_location]
        disc_diffs = bez_params[1:].reshape(-1, 3)
        for disc_diff in disc_diffs:
            diff = disc_diff / num_space_bins * (space_max - space_min) + space_min
            ctrl_pts.append(ctrl_pts[-1] + diff)

        prev_time = end_time
        prev_location = ctrl_pts[-1]

        ctrl_pts = np.array(ctrl_pts).T

        bc = BezierCurve(start_time, end_time, ctrl_pts)
        bezier_curves.append(bc)

    traj = CompositeTrajectory(bezier_curves)

    seed, remainder = file_name.split("_")
    seed = int(seed)
    goal_ind, _ = remainder.split(".")
    goal_ind = int(goal_ind)

    uav_env = UavEnvironment(seed=seed, environment_shape=env_shape, DEFAULT_GOAL=possible_goals[goal_ind])
    regions, edges_between_regions = uav_env.compile()

    for t in np.linspace(traj.start_time(), traj.end_time(), 1000):
        freespace = False
        for region in regions:
            if region.PointInSet(traj.value(t), tol=1e-2):
                freespace = True
                break
        if not freespace:
            # print("Collision detected at time", t)
            # print(traj.value(t).reshape(-1))
            collision += 1
            break
    
    total += 1
    print(f"{collision} / {total} ({collision / total})")

1 / 1 (1.0)
2 / 2 (1.0)
2 / 3 (0.6666666666666666)
3 / 4 (0.75)
3 / 5 (0.6)
4 / 6 (0.6666666666666666)
4 / 7 (0.5714285714285714)
5 / 8 (0.625)
5 / 9 (0.5555555555555556)
5 / 10 (0.5)
5 / 11 (0.45454545454545453)
6 / 12 (0.5)
7 / 13 (0.5384615384615384)
8 / 14 (0.5714285714285714)
8 / 15 (0.5333333333333333)
8 / 16 (0.5)
8 / 17 (0.47058823529411764)
8 / 18 (0.4444444444444444)
8 / 19 (0.42105263157894735)
8 / 20 (0.4)
8 / 21 (0.38095238095238093)
8 / 22 (0.36363636363636365)
8 / 23 (0.34782608695652173)
8 / 24 (0.3333333333333333)
9 / 25 (0.36)
9 / 26 (0.34615384615384615)
9 / 27 (0.3333333333333333)
10 / 28 (0.35714285714285715)
11 / 29 (0.3793103448275862)
12 / 30 (0.4)
13 / 31 (0.41935483870967744)
13 / 32 (0.40625)
13 / 33 (0.3939393939393939)
14 / 34 (0.4117647058823529)
14 / 35 (0.4)
15 / 36 (0.4166666666666667)
16 / 37 (0.43243243243243246)
17 / 38 (0.4473684210526316)
18 / 39 (0.46153846153846156)
18 / 40 (0.45)
18 / 41 (0.43902439024390244)
19 / 42 (0.4523809523809524)
19 / 43

In [None]:
region.PointInSet()

In [None]:
traj_list = [traj]
times = np.array([traj.end_time() - traj.start_time() for traj in traj_list])
my_cmap = cm.viridis
my_norm = mcolors.Normalize(vmin=times.min(), vmax=times.max())

In [None]:
fig, ax = plt.subplots(figsize=(13, 10))
info_map = {
    Building.make_external_wall: (5, "ext wall"),
    Building.make_external_window_left: (3, "ext window L"),
    Building.make_external_window_right: (3, "ext window R"),
    Building.make_external_windows: (3, "ext window"),
    Building.make_external_door: (1, "ext door"),
    Building.make_internal_horizontal_wall_left: (3, "int window L"),
    Building.make_internal_horizontal_wall_right: (3, "int window R"),
    Building.make_internal_vertical_wall: (3, "int window U"),
    Building.make_internal_door: (1, "int door"),

}

for idx, wall_type in enumerate(uav_env.walls.keys()):
    if wall_type == Building.make_internal_no_wall:
        continue

    linewidth, label = info_map[wall_type]

    xs, ys, direcs = [], [], []
    wall_segments = []
    for outdoor_wall in uav_env.walls[wall_type]:
        x, y, direc = outdoor_wall
        xs.append(x)
        ys.append(y)
        direcs.append(direc)
        
        if direc in [Direction.LEFT, Direction.RIGHT]:
            wall_start, wall_end = (x, y + CELL_SIZE / 2), (x, y - CELL_SIZE / 2)
        else:
            wall_start, wall_end = (x + CELL_SIZE / 2, y), (x - CELL_SIZE / 2, y)
        wall_segments.append((wall_start, wall_end))

    color_name = list(mcolors.TABLEAU_COLORS)[idx]
    color = mcolors.TABLEAU_COLORS[color_name]
    wall_collection = mc.LineCollection(wall_segments, linewidths=linewidth, colors=color)
    ax.add_collection(wall_collection)
    ax.scatter(xs, ys, label=label, c=color)
    ax.set_aspect('equal', adjustable='box')

for traj_idx, traj in enumerate(traj_list):
    xs = []
    ys = []
    for t in np.linspace(0, traj.end_time(), 100):
        x, y = traj.value(t).reshape(-1)[:-1]
        xs.append(x)
        ys.append(y)

    xs = np.array(xs)
    ys = np.array(ys)

    time = times[traj_idx]
    ax.plot(xs, ys, color=my_cmap(my_norm(time)))

ax.legend()
fig.colorbar(cm.ScalarMappable(norm=my_norm, cmap=my_cmap), orientation="vertical", label="Float value")

In [None]:
meshcat = StartMeshcat()
trajectories = traj_list
fly_in_sequence = False
quadrotor_seperation = None

view_regions = False
track_uav = False

builder = DiagramBuilder()
plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.0)
parser = Parser(plant)
ConfigureParser(parser)
parser.AddModelsFromString(uav_env.sdf_as_string, "sdf")
plant.Finalize()

ApplyVisualizationConfig(
        config=VisualizationConfig(publish_inertia=False, publish_contacts=False),
        plant=plant,
        scene_graph=scene_graph,
        builder=builder,
        meshcat=meshcat,
    )

trajectory_colors = (
    cmap_ibm(np.linspace(0, 1, len(trajectories))) if trajectories else []
)

previous_trajectory_end_time = 0.0
total_animation_duration = 0.0
for i, traj in enumerate(trajectories):
    if fly_in_sequence:
        traj = shift_composite_trajectory(traj, previous_trajectory_end_time)
        previous_trajectory_end_time = (
            traj.end_time() + uav_env.DELAY_BETWEEN_TRAJECTORIES
        )
    total_animation_duration = max(total_animation_duration, traj.end_time())

    # Visualize the quadrotor.
    if quadrotor_seperation:
        offsets = np.linspace(
            traj.start_time(),
            traj.end_time(),
            int((traj.end_time() - traj.start_time()) / quadrotor_seperation),
        )
        for j, offset in enumerate(offsets):
            flat_traj_source = builder.AddSystem(
                FlatQuadrotorTrajectorySource(
                    shift_composite_trajectory(traj, -offset)
                )
            )
            QuadrotorGeometry.add_to_builder(
                builder,
                flat_traj_source.get_output_port(0),
                scene_graph,
                f"quadrotor_{i}/{j}",
            )

    else:
        flat_traj_source = builder.AddSystem(
            FlatQuadrotorTrajectorySource(traj)
        )
        QuadrotorGeometry.add_to_builder(
            builder,
            flat_traj_source.get_output_port(0),
            scene_graph,
            f"quadrotor_{i}",
        )

    # Visualize the trajectory.
    cartesian_traj_source = builder.AddSystem(TrajectorySource(traj))
    tracer = builder.AddNamedSystem(
        f"quadrotor_trace_{i}",
        TraceVisualizer(
            meshcat,
            line_width=10, # uav_env.LINE_WIDTH,
            rgba=Rgba(*trajectory_colors[i]),
        ),
    )
    meshcat.SetLine(
        f"/drake/traces/persistent_quadrotor_trace_{i}",
        np.hstack(
            [
                traj.value(t)
                for t in np.linspace(0, traj.end_time(), uav_env.NUM_SAMPLES)
            ]
        ),
        line_width=10, # uav_env.LINE_WIDTH,
        rgba=Rgba(*trajectory_colors[i]),
    )

    meshcat.SetProperty(
        f"/drake/traces/persistent_quadrotor_trace_{i}",
        "visible",
        False,
    )

    builder.Connect(
        cartesian_traj_source.get_output_port(),
        tracer.get_input_port(0),
    )


meshcat_cpp = MeshcatVisualizer.AddToBuilder(builder, scene_graph, meshcat)

animator = meshcat_cpp.StartRecording()
# if not track_uav:
#     animator = None
# traj_system = builder.AddSystem(FlatnessInverter(trajectory, animator))
# quad = QuadrotorGeometry.AddToBuilder(builder, traj_system.get_output_port(0), scene_graph)

diagram = builder.Build()

# Set up a simulator to run this diagram
simulator = Simulator(diagram)
simulator.set_target_realtime_rate(0.25)

meshcat.Delete()

if view_regions:
    for ii in range(len(regions)):
        v = VPolytope(regions[ii])
        meshcat.SetTriangleMesh("iris/region_" + str(ii), v.vertices(),
                                ConvexHull(v.vertices().T).simplices.T, Rgba(0.698, 0.67, 1, 0.4))
        
# Simulate
end_time = traj.end_time()
simulator.AdvanceTo(end_time+0.05)
meshcat_cpp.PublishRecording()

with open ("/mnt/trajectory.html", "w") as f:
    f.write(meshcat.StaticHtml())