In [2]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import math
import random
import matplotlib.path as path
from datetime import datetime
from tqdm import tqdm 

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import random_split, DataLoader
from torchvision import transforms

In [3]:
def plot_grid(obstacles, start = None, goal = None, dim = (100,100), tree=None, path = None):
  plt.grid()
  plt.xlim(0,dim[0])
  plt.ylim(0,dim[1])
  
  for obstacle in obstacles:
    obstacle_x = []
    obstacle_y = []
    for i in range(len(obstacle)):
      obstacle_x.append(obstacle[i][0])
      obstacle_y.append(obstacle[i][1])

    obstacle_x.append(obstacle[0][0])
    obstacle_y.append(obstacle[0][1])

    obstacle_x = np.array(obstacle_x)
    obstacle_y = np.array(obstacle_y)

    plt.plot(obstacle_x, obstacle_y, color = "red")

  
  l = [Patch(facecolor='red', label='Obstacle')]

  if tree != None:
    for traj in tree:
      if tree[traj] == None:
        continue
      plt.plot(traj[0],traj[1],'.',color="blue")
      x = [traj[0],tree[traj][0]]
      y = [traj[1],tree[traj][1]]
      plt.plot(x,y,'.',color="blue",linestyle='-')
  if path != None:
    for i in range(len(path)):
      plt.plot(path[i][0],path[i][1],'x',color="yellow")
  if start!=None:
    plt.plot(start[0],start[1],'.',color="orange")
    l.append(Patch(facecolor='orange', label='Start'))

  if goal != None:
    plt.plot(goal[0],goal[1],'.',color="green")
    l.append(Patch(facecolor='green', label='Goal'))



  plt.legend(handles=l)
  
  plt.show()

In [4]:
def dist(q1,q2):
  return math.sqrt((q1[0]-q2[0])*(q1[0]-q2[0])+(q1[1]-q2[1])*(q1[1]-q2[1]))

In [5]:
def heur(q):
  if q == None:
    return 0
  return dist(q,goal)/2

In [6]:
def detect_collision(q):#true if collision
  for obstacle in obstacles:
    obs = path.Path(obstacle)
    if obs.contains_point(q):
      return True
  return False

In [7]:
def clear_path(q1,q2):#true if collision
  while dist(q1,q2) >= ei:
    v = [ei*(q2[0]-q1[0])/dist(q1,q2),ei*(q2[1]-q1[1])/dist(q1,q2)]
    q1 = (q1[0]+v[0],q1[1]+v[1])
    for obstacle in obstacles:
      obs = path.Path(obstacle)
      if obs.contains_point(q1):
        return True
  return False

In [8]:
def clear_path_real(q1,q2):#true if collision
  created_path = path.Path([q1,q2])
  for obstacle in obstacles:
    obs = path.Path(obstacle)
    if obs.intersects_path(created_path):
      return True
  return False

In [9]:
def rand_samp():
  r = (random.uniform(0,dim[0]),random.uniform(0,dim[1]))
  while detect_collision(r):
    r = (random.uniform(0,dim[0]),random.uniform(0,dim[1]))
  return r

In [10]:
def rand_sample_range(q):
  r = (random.uniform(0,dim[0]),random.uniform(0,dim[1]))
  while detect_collision(r) or dist(r,q)>V:
    r = (random.uniform(0,dim[0]),random.uniform(0,dim[1]))
  return r

In [11]:
def DIR_select(DIR):
  in_DIR = []
  q_rand = rand_samp()
  for s,d in DIR.items():
    if dist(q_rand,s) < d:
      in_DIR.append(s)
  
  if len(in_DIR)==0:
    min = list(DIR.keys())[0]
    for s in DIR.keys():
      if dist(s,q_rand)< dist(min,q_rand):
        min = s
    in_DIR.append(min)

  select_state = random.randint(0,len(in_DIR)-1)
  x_cand = in_DIR[select_state]

  return x_cand

In [12]:
def find_f(q,tree):
  h = heur(q)
  g = 0
  while tree[q]!=None:
    g += dist(q,tree[q])
    q = tree[q]
  return h+g

In [13]:
def path_cost(q,tree):
  g = 0
  while tree[q]!=None:
    g += dist(q,tree[q])
    q = tree[q]
  return g

In [14]:
def Blossom(x_sel,expanded):
  if x_sel in expanded:
    return [rand_sample_range(x_sel)]
  e_cand = []
  for i in range(bn):
    e_cand.append(rand_sample_range(x_sel))
  return e_cand

In [15]:
def propogate(x_sel, u):
  v = [delta*(u[0]-x_sel[0])/dist(x_sel, u), delta*(u[1]-x_sel[1])/dist(x_sel, u)]
  x_next = (x_sel[0]+v[0],x_sel[1]+v[1])
  if detect_collision(x_next):
    return None
  while not detect_collision(x_next):
    if dist(x_next, u)< delta:
      return u
    x_sel = x_next
    v = [delta*(u[0]-x_sel[0])/dist(x_sel, u), delta*(u[1]-x_sel[1])/dist(x_sel, u)]
    x_next = (x_sel[0]+v[0],x_sel[1]+v[1])
    
  return x_sel

In [16]:
def BnB(x_new,tree, pi_sol):
  if pi_sol == None:
    return False
  if detect_collision(x_new):
    return True
  if clear_path(x_new,tree[x_new]):
    return True
  if find_f(x_new,tree) > path_cost(pi_sol,tree):
    return True
  return False

In [17]:
def update_dominance_regions(DIR, tree, x_sel, x_new):
  x_up=[]
  d = dist(x_sel,x_new)
  for i in DIR.keys():
    if dist(i, x_new) <= d:
      x_up.append(i)
  m = -1
  for i in range(len(x_up)):
    if find_f(x_up[i],tree) > find_f(x_new,tree):
      DIR[x_up[i]] = dist(x_up[i],x_new)
      if dist(x_new,x_up[i]) > m:
        m = dist(x_new,x_up[i])
  if m == -1:
    DIR[x_new] = V
  else:
    DIR[x_new] = m
  return DIR

In [18]:
def find_path(pi_sol, tree):
  if pi_sol == None:
    return None
  path = [pi_sol]
  while tree[pi_sol]!=None:
      pi_sol = tree[pi_sol]
      path.append(pi_sol)
  return list(reversed(path))

In [29]:
 def DIRT():
  #start_time = datetime.now()
  first_path = None
  first_iter = 0
  final_iter = 0
  x_new = None
  tree = {start:None}
  DIR = {start:V}
  expanded = {}
  e_cand = []
  pi_sol = None
  for i in range(N):
    if x_new != None and heur(x_new) < heur(tree[x_new]):
      x_sel = x_new
    else:
      x_sel = DIR_select(DIR)#x_sel is x_near
    if len(e_cand) == 0:#create a list of x_rand
      e_cand = Blossom(x_sel, expanded)
      expanded[x_sel] = 1
    while len(e_cand) != 0:#loop through potential x_rand
      #print(e_cand)
      u = e_cand[0]
      tree_temp = tree
      tree_temp[u] = x_sel
      f_min = find_f(u,tree_temp)
      for i2 in range(len(e_cand)):
        tree_temp = tree.copy()
        tree_temp[e_cand[i2]] = x_sel
        f = find_f(e_cand[i2],tree_temp)
        if f < f_min:
          u = e_cand[i2]
          f_min = f 
      e_cand.remove(u)
      x_new = propogate(x_sel,u)#find x_new from x_near and x_rand greedily
      temp_tree = tree.copy()
      temp_tree[x_new] = x_sel
      if x_new!=None and not BnB(x_new,temp_tree,pi_sol):
        tree[x_new] = x_sel
        DIR = update_dominance_regions(DIR,tree, x_sel, x_new)
        if dist(x_new,goal) <= delta: #let the goal region be a circle around goal with radius delta
          if pi_sol ==None:
            pi_sol = x_new
            first_path = find_path(pi_sol, tree)
            first_iter = i+1
            final_iter = i+1
            #print(i)
            #current_time = datetime.now()
            #print("First path time" + str(current_time-start_time))
            #print("First path cost " + str(path_cost(pi_sol,tree)))
          elif path_cost(x_new,tree) < path_cost(pi_sol,tree):
            pi_sol = x_new
            final_iter = i+1
            #print(i)
            #current_time = datetime.now()
            #print("Current path time" + str(current_time-start_time))
      else:
        x_new = None
  final_path = None
  #print("Last path cost " + str(path_cost(pi_sol,tree)))
  if pi_sol != None:
    final_path = find_path(pi_sol,tree)
  return first_path, first_iter, final_path, final_iter, tree 

In [21]:
#config
N = 6000
V = 4 #vision radius
bn = 3
ei = 0.5 #edge increment
delta = 1
U_Size = 5 #N in the paper

In [20]:
def detect_collision_grid(q,c_obs):#true if collision
  for obstacle in c_obs:
    obs = path.Path(obstacle)
    if obs.contains_point(q):
      #print("hit")
      return True
  return False

In [22]:
def clear_path_grid(q1,q2,c_obs):#true if collision
  ei = 0.1
  while dist(q1,q2) >= ei:
    v = [ei*(q2[0]-q1[0])/dist(q1,q2),ei*(q2[1]-q1[1])/dist(q1,q2)]
    q1 = (q1[0]+v[0],q1[1]+v[1])
    for obstacle in c_obs:
      obs = path.Path(obstacle)
      if obs.contains_point(q1):
        return True
  return False

In [23]:
def generate_grid(dim = (100,100)):
  num_obs = random.randint(1,3)
  c_obs = []
  for i in range(num_obs):
    temp_obs = []
    num_sides = 3#random.randint(3,6)
    obs_cand=[random.randint(0,dim[0]-1),random.randint(0,dim[1]-1)]
    while detect_collision_grid(obs_cand,c_obs):
      obs_cand=[random.randint(0,dim[0]-1),random.randint(0,dim[1]-1)]
    temp_obs.append(obs_cand)
    prev_obs = obs_cand
    first_obs = obs_cand
    for j in range(1,num_sides):
      obs_cand=[random.randint(0,dim[0]-1),random.randint(0,dim[1]-1)]
      while detect_collision_grid(obs_cand,c_obs) or clear_path_grid(prev_obs,obs_cand,c_obs) or clear_path_grid(first_obs,obs_cand,c_obs):
        obs_cand=[random.randint(0,dim[0]-1),random.randint(0,dim[1]-1)]
      temp_obs.append(obs_cand)
      prev_obs = obs_cand
    c_obs.append(temp_obs)
  start = (random.randint(0,dim[0]-1),random.randint(0,dim[1]-1))
  while detect_collision_grid(start,c_obs):
    start = (random.randint(0,dim[0]-1),random.randint(0,dim[1]-1))
  goal = (random.randint(0,dim[0]-1),random.randint(0,dim[1]-1))
  while detect_collision_grid(goal,c_obs):
    goal = (random.randint(0,dim[0]-1),random.randint(0,dim[1]-1))
  return c_obs, start, goal

In [31]:
num_trials = 50
dim = (100,100)
success_rate = 0
cost_first = 0
cost_final = 0
iter_first = 0
iter_final = 0
for trial in tqdm(range(num_trials)):
  #print(trial)
  obstacles, start, goal = generate_grid()
  obstacles = np.array(obstacles,dtype=object)
  path_first_i, iter_first_i, path_final_i, iter_final_i, tree_i = DIRT()#first_path, first_iter, final_path, final_iter, tree 
  if path_first_i != None:
    success_rate += 1
    cost_first += path_cost(path_first_i[len(path_first_i)-1],tree_i)
    iter_first += iter_first_i

  if path_final_i != None:
    cost_final += path_cost(path_final_i[len(path_final_i)-1],tree_i)
    iter_final += iter_final_i
if success_rate == 0:
  cost_first = -1
  iter_first = -1
  cost_final = -1
  iter_final = -1
else:
  cost_first /= success_rate
  iter_first /= success_rate
  cost_final /= success_rate
  iter_final /= success_rate
success_rate /= num_trials

print("\nSuccess Rate: " + str(success_rate))
print("Average First Iterations:  " + str(iter_first))
print("Average First Cost: " + str(cost_first))
print("Average Final Iterations:  " + str(iter_final))
print("Average Final Cost: " + str(cost_final))

100%|██████████| 50/50 [1:32:11<00:00, 110.62s/it]


Success Rate: 1.0
Average First Iterations:  1389.78
Average First Cost: 119.47870772856518
Average Final Iterations:  3512.98
Average Final Cost: 107.49599957764828



