In [None]:
# import sys
# sys.path.append("..")
import numpy as np
import os
import json
from glob import glob
from tqdm import tqdm
import torch
import matplotlib.pyplot as plt, cv2
import utils.workspace as ws
from models.generator.generator import Generator
import time

from RRTStar.rrt_star import RRTStar
from numpy.linalg import norm


In [None]:
path_img = "../2dDeepSDF/data/test/test_images"
filenames_img = sorted(glob(os.path.join(path_img, "*.jpg")))
imgs = [cv2.imread(f) for f in filenames_img]

env_data_folder = "../2dDeepSDF/chomp256/Reconstructions/test/codes"
filenames_env = sorted(glob(os.path.join(env_data_folder, '*.npy')))
envs = [np.load(f).squeeze() for f in filenames_env ]

#obstacles
obs_path = "../metrics/obstacle_zone_test.json"
obs_list_all = json.load(open(obs_path))
obs_list_all = [obs_list_all[k] for k in sorted(obs_list_all.keys())]

print("Found {} images".format(len(filenames_img)))
print("Found {} obs list".format(len(obs_list_all)))
print("Found {} Latent codes".format(len(filenames_env)))

In [None]:
#Loading the model
use_cuda = torch.cuda.is_available()
if use_cuda:
    device = torch.device("cuda")
    print("Using GPU")
else:
    device = torch.device("cpu")
    print("Using CPU")

experiment_directory = "models"
checkpoint = str(600)
specs = ws.load_experiment_specifications(experiment_directory)


# Instantiating the generator
specs_g = specs["generator"]
input_dim = specs_g["InputDim"]
n_points = specs_g["NPoints"]
generator = Generator(input_dim, n_points).to(device)

path_to_model_dir = os.path.join(experiment_directory,"generator", ws.model_params_dir)
print("Loading checkpoint {} model from: {}".format(
    checkpoint, os.path.abspath(path_to_model_dir)))
generator.load_model_parameters(path_to_model_dir, checkpoint)

print('############# Generator Model: #####################')
print(generator)
print('######################################################')
generator.to(device)
generator.eval()

In [None]:
def generate_plot(start, goal, img_nb, envs=envs, imgs=imgs):
    start = np.array(start) / 63
    goal = np.array(goal) / 63
    img = imgs[img_nb]
    env = np.squeeze(envs[img_nb])
    
    inputs = np.concatenate([env,start, goal], axis=0)

    inputs = torch.from_numpy(inputs).unsqueeze(0).to(device).float()
    with torch.no_grad():
        trajectory_intermediate = generator(inputs).squeeze().cpu().numpy()
    trajectory = np.concatenate([start, trajectory_intermediate, goal], axis=0).reshape(-1,2) *63
    print(trajectory.tolist())
    
    # Plotting the treajectory
    x = trajectory[:,0]
    y = trajectory[:,1]
    
#     fig, axes= plt.subplots(nrows=1, ncols=1,figsize=(6,3))

    plt.plot(x, y, "-o")
    plt.gca().invert_yaxis()
    plt.imshow(img)
#     plt.show()
    plt.title("Generated trajectory")
    plt.draw()

In [None]:
%matplotlib qt
start, goal = [20,40], [50, 10]
img_nb = 1
generate_plot(start, goal, img_nb)

# Success Rate / Collision checking

In [None]:
obs_path = "../metrics/obstacle_zone_test.json"
obs_zone_all = json.load(open(obs_path))
obs_zone_all = [obs_zone_all[k] for k in sorted(obs_zone_all.keys())]

In [None]:
sg_path = "../metrics/start_goal_test.json"
sg_all = json.load(open(sg_path))
sg_all = [sg_all[k] for k in sorted(sg_all.keys())]

In [None]:
def generate_paths(sg_list, env):
    """ Generate a set of trajectories for only one environment"""
#     for k in range(len(sg_list)):
    sg_list = np.array(sg_list).reshape(-1, 4) / 63
    start, goal = sg_list[:, :2], sg_list[:, 2:]
    env_new = np.broadcast_to(env, (sg_list.shape[0], env.shape[0]))
    inputs = np.concatenate(
        (env_new, sg_list), axis=1)
    inputs = torch.from_numpy(inputs).to(device).float()
    with torch.no_grad():
        trajectory_intermediate = generator(inputs).squeeze().cpu().numpy()
    trajectories = np.concatenate([start, trajectory_intermediate, goal], axis=1).reshape(-1,64,2) *63
    return trajectories

In [None]:
sg = sg_all[0]
env = envs[0]
generate_paths(sg, env)

In [None]:
def check_collision(traj, obs_list):
    """Check whether a trajectory collide with an environment"""
    n_collision = 0
    path_x = traj[:, 0]
    path_y = traj[:, 1]
    for o in obs_list:
        ox, oy = o
        dx_list = [ox - x for x in path_x]
        dy_list = [oy - y for y in path_y]
        d_list = [dx * dx + dy * dy for (dx, dy) in zip(dx_list, dy_list)]
        n_collision += len([c for c in d_list if c < 0.5])
    if n_collision > 1:
        print("Number of collision: {}".format(n_collision))
        return 0  # collision
    return 1 # avoid
#     print("Number of collision: {}".format(n_collision))

In [None]:
def eval_success_rate(start_goal_list_all, obs_list_all, envs):
    success_rate = {}
    #loop over all environments
    for k in tqdm(range(len(obs_list_all))):
        obs_list = obs_list_all[k]
        sg_list = sg_all[k]
        env = envs[k]
        paths = generate_paths(sg_list, env)
        # store n_avoid avoidance for each path
        n_avoid = [check_collision(path, obs_list) for path in paths]
        success_rate[k] = n_avoid
        path_to_success_rate = "success_rate_test.json"
        json.dump(success_rate, open(path_to_success_rate, "w"))
    return success_rate

In [None]:
new_sr = eval_success_rate(sg_all, obs_zone_all, envs)

In [None]:
sr = new_sr

In [None]:
# path_to_success_rate = "success_rate.json"
# json.dump(sr, open(path_to_success_rate, "w"))

## Visualize success rate

In [None]:
success_rate_list = [np.round(np.mean(np.array(sr[k])), 5) for k in sorted(sr.keys())]

In [None]:
success_rates = ["SuccessRate"] + success_rate_list
environments = ["Environment"] + list(range(500))

### Path cost 

In [None]:
def eval_path_cost(paths, output_path_cost):
    """Compute the path cost of all trajectories and saves a 1-d npy array of all trajectories"""
#     paths = paths * 63
    ###########
    def path_cost(path):
        cost = 0
        for k in range(len(path)-1):
            p, q = path[k], path[k+1]
            cost += norm(p-q)
        return cost
    ###########

    path_cost_all = np.apply_along_axis(path_cost, 1, paths)
    np.save(output_path_cost, path_cost_all)

In [None]:
def compare_rrtstar(start, goal, img_nb, envs=envs, imgs=imgs, obs_list_all=obs_list_all, rrt=True):
    start = np.array(start) / 63
    goal = np.array(goal) / 63
    img = imgs[img_nb]
    img = 1 - np.round(img / 255)
    print("Image {} | start {} - goal {} ".format(img_nb,start*63, goal*63))
    env = np.squeeze(envs[img_nb])
    obs_list = obs_list_all[img_nb]
    
    inputs = np.concatenate([env,start, goal], axis=0)

    inputs = torch.from_numpy(inputs).unsqueeze(0).to(device).float()
    with torch.no_grad():
        start_time_net = time.time()
        trajectory_intermediate = generator(inputs).squeeze().cpu().numpy()
        end_time_net = time.time()
        
    trajectory = np.concatenate([start, trajectory_intermediate, goal], axis=0).reshape(-1,2) * 63
    path = None
    if rrt:
        #rrtstar
        rrt_star = RRTStar(start=start*63,
                    goal=goal*63,
                    rand_area=[0, 63],
                    obstacle_list=obs_list,
                    obstacle_radius=0.6)
        print("RRT finding path")
        start_time_rrt = time.time()
        path = rrt_star.planning(animation=True)
        end_time_rrt = time.time()
    
        #######
        def path_cost(path):
            cost = 0
            for k in range(len(path)-1):
                p, q = path[k], path[k+1]
                cost += norm(p-q)
            return cost
        ######
        path_cost_net = path_cost(trajectory)
        path_cost_rrt = path_cost(np.array(path))
        print("Network path cost", path_cost_net)
        print("RRT* path cost", path_cost_rrt)
    
    #comp time
    
    # Plotting the treajectory
    x = trajectory[:,0]
    y = trajectory[:,1]
    
#     fig, axes= plt.subplots(nrows=1, ncols=1,figsize=(6,3))
    my_dpi = 200
    fig = plt.figure(figsize=(800/my_dpi, 800/my_dpi), dpi=my_dpi)

    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)

    ax.plot([x[0], x[-1]], [y[0], y[-1]], "or")
    ax.plot(x, y, "-", linewidth=3)
    if path is not None:
        #comp time
        print("Found rrt path in {}".format(end_time_rrt-start_time_rrt))
        print("Found net path in {}".format(end_time_net-start_time_net))
        x_r = [c[0] for c in path]
        y_r = [c[1] for c in path]
        ax.plot(x_r, y_r, "-")
#     ax.gca().invert_yaxis()
    ax.imshow(img)
#     plt.show()
#     ax.title("Generated trajectory")
    plt.show()

In [None]:
%matplotlib qt
rrt = False
start, goal = [50,30], [20, 20]
img_nb = 61
compare_rrtstar(start, goal, img_nb, rrt=rrt)