-
-
Notifications
You must be signed in to change notification settings - Fork 6.7k
/
Copy pathbatch_informed_rrtstar.py
635 lines (535 loc) · 23.4 KB
/
batch_informed_rrtstar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
"""
Batch Informed Trees based path planning:
Uses a heuristic to efficiently search increasingly dense
RGGs while reusing previous information. Provides faster
convergence that RRT*, Informed RRT* and other sampling based
methods.
Uses lazy connecting by combining sampling based methods and A*
like incremental graph search algorithms.
author: Karan Chawla(@karanchawla)
Atsushi Sakai(@Atsushi_twi)
Reference: https://arxiv.org/abs/1405.5848
"""
import math
import random
import matplotlib.pyplot as plt
import numpy as np
show_animation = True
class RTree:
# Class to represent the explicit tree created
# while sampling through the state space
def __init__(self, start=None, lowerLimit=None, upperLimit=None,
resolution=1.0):
if upperLimit is None:
upperLimit = [10, 10]
if lowerLimit is None:
lowerLimit = [0, 0]
if start is None:
start = [0, 0]
self.vertices = dict()
self.edges = []
self.start = start
self.lowerLimit = lowerLimit
self.upperLimit = upperLimit
self.dimension = len(lowerLimit)
self.num_cells = [0] * self.dimension
self.resolution = resolution
# compute the number of grid cells based on the limits and
# resolution given
for idx in range(self.dimension):
self.num_cells[idx] = np.ceil(
(upperLimit[idx] - lowerLimit[idx]) / resolution)
vertex_id = self.real_world_to_node_id(start)
self.vertices[vertex_id] = []
@staticmethod
def get_root_id():
# return the id of the root of the tree
return 0
def add_vertex(self, vertex):
# add a vertex to the tree
vertex_id = self.real_world_to_node_id(vertex)
self.vertices[vertex_id] = []
return vertex_id
def add_edge(self, v, x):
# create an edge between v and x vertices
if (v, x) not in self.edges:
self.edges.append((v, x))
# since the tree is undirected
self.vertices[v].append(x)
self.vertices[x].append(v)
def real_coords_to_grid_coord(self, real_coord):
# convert real world coordinates to grid space
# depends on the resolution of the grid
# the output is the same as real world coords if the resolution
# is set to 1
coord = [0] * self.dimension
for i in range(len(coord)):
start = self.lowerLimit[i] # start of the grid space
coord[i] = int(np.around((real_coord[i] - start) / self.resolution))
return coord
def grid_coordinate_to_node_id(self, coord):
# This function maps a grid coordinate to a unique
# node id
nodeId = 0
for i in range(len(coord) - 1, -1, -1):
product = 1
for j in range(0, i):
product = product * self.num_cells[j]
nodeId = nodeId + coord[i] * product
return nodeId
def real_world_to_node_id(self, real_coord):
# first convert the given coordinates to grid space and then
# convert the grid space coordinates to a unique node id
return self.grid_coordinate_to_node_id(
self.real_coords_to_grid_coord(real_coord))
def grid_coord_to_real_world_coord(self, coord):
# This function maps a grid coordinate in discrete space
# to a configuration in the full configuration space
config = [0] * self.dimension
for i in range(0, len(coord)):
# start of the real world / configuration space
start = self.lowerLimit[i]
# step from the coordinate in the grid
grid_step = self.resolution * coord[i]
config[i] = start + grid_step
return config
def node_id_to_grid_coord(self, node_id):
# This function maps a node id to the associated
# grid coordinate
coord = [0] * len(self.lowerLimit)
for i in range(len(coord) - 1, -1, -1):
# Get the product of the grid space maximums
prod = 1
for j in range(0, i):
prod = prod * self.num_cells[j]
coord[i] = np.floor(node_id / prod)
node_id = node_id - (coord[i] * prod)
return coord
def node_id_to_real_world_coord(self, nid):
# This function maps a node in discrete space to a configuration
# in the full configuration space
return self.grid_coord_to_real_world_coord(
self.node_id_to_grid_coord(nid))
class BITStar:
def __init__(self, start, goal,
obstacleList, randArea, eta=2.0,
maxIter=80):
self.start = start
self.goal = goal
self.min_rand = randArea[0]
self.max_rand = randArea[1]
self.max_iIter = maxIter
self.obstacleList = obstacleList
self.startId = None
self.goalId = None
self.vertex_queue = []
self.edge_queue = []
self.samples = dict()
self.g_scores = dict()
self.f_scores = dict()
self.nodes = dict()
self.r = float('inf')
self.eta = eta # tunable parameter
self.unit_ball_measure = 1
self.old_vertices = []
# initialize tree
lowerLimit = [randArea[0], randArea[0]]
upperLimit = [randArea[1], randArea[1]]
self.tree = RTree(start=start, lowerLimit=lowerLimit,
upperLimit=upperLimit, resolution=0.01)
def setup_planning(self):
self.startId = self.tree.real_world_to_node_id(self.start)
self.goalId = self.tree.real_world_to_node_id(self.goal)
# add goal to the samples
self.samples[self.goalId] = self.goal
self.g_scores[self.goalId] = float('inf')
self.f_scores[self.goalId] = 0
# add the start id to the tree
self.tree.add_vertex(self.start)
self.g_scores[self.startId] = 0
self.f_scores[self.startId] = self.compute_heuristic_cost(
self.startId, self.goalId)
# max length we expect to find in our 'informed' sample space, starts as infinite
cBest = self.g_scores[self.goalId]
# Computing the sampling space
cMin = math.hypot(self.start[0] - self.goal[0],
self.start[1] - self.goal[1]) / 1.5
xCenter = np.array([[(self.start[0] + self.goal[0]) / 2.0],
[(self.start[1] + self.goal[1]) / 2.0], [0]])
a1 = np.array([[(self.goal[0] - self.start[0]) / cMin],
[(self.goal[1] - self.start[1]) / cMin], [0]])
eTheta = math.atan2(a1[1, 0], a1[0, 0])
# first column of identity matrix transposed
id1_t = np.array([1.0, 0.0, 0.0]).reshape(1, 3)
M = np.dot(a1, id1_t)
U, S, Vh = np.linalg.svd(M, True, True)
C = np.dot(np.dot(U, np.diag(
[1.0, 1.0, np.linalg.det(U) * np.linalg.det(np.transpose(Vh))])),
Vh)
self.samples.update(self.informed_sample(
200, cBest, cMin, xCenter, C))
return eTheta, cMin, xCenter, C, cBest
def setup_sample(self, iterations, foundGoal, cMin, xCenter, C, cBest):
if len(self.vertex_queue) == 0 and len(self.edge_queue) == 0:
print("Batch: ", iterations)
# Using informed rrt star way of computing the samples
self.r = 2.0
if iterations != 0:
if foundGoal:
# a better way to do this would be to make number of samples
# a function of cMin
m = 200
self.samples = dict()
self.samples[self.goalId] = self.goal
else:
m = 100
cBest = self.g_scores[self.goalId]
self.samples.update(self.informed_sample(
m, cBest, cMin, xCenter, C))
# make the old vertices the new vertices
self.old_vertices += self.tree.vertices.keys()
# add the vertices to the vertex queue
for nid in self.tree.vertices.keys():
if nid not in self.vertex_queue:
self.vertex_queue.append(nid)
return cBest
def plan(self, animation=True):
eTheta, cMin, xCenter, C, cBest = self.setup_planning()
iterations = 0
foundGoal = False
# run until done
while iterations < self.max_iIter:
cBest = self.setup_sample(iterations,
foundGoal, cMin, xCenter, C, cBest)
# expand the best vertices until an edge is better than the vertex
# this is done because the vertex cost represents the lower bound
# on the edge cost
while self.best_vertex_queue_value() <= \
self.best_edge_queue_value():
self.expand_vertex(self.best_in_vertex_queue())
# add the best edge to the tree
bestEdge = self.best_in_edge_queue()
self.edge_queue.remove(bestEdge)
# Check if this can improve the current solution
estimatedCostOfVertex = self.g_scores[bestEdge[
0]] + self.compute_distance_cost(
bestEdge[0], bestEdge[1]) + self.compute_heuristic_cost(
bestEdge[1], self.goalId)
estimatedCostOfEdge = self.compute_distance_cost(
self.startId, bestEdge[0]) + self.compute_heuristic_cost(
bestEdge[0], bestEdge[1]) + self.compute_heuristic_cost(
bestEdge[1], self.goalId)
actualCostOfEdge = self.g_scores[
bestEdge[0]] + self.compute_distance_cost(
bestEdge[0], bestEdge[1])
f1 = estimatedCostOfVertex < self.g_scores[self.goalId]
f2 = estimatedCostOfEdge < self.g_scores[self.goalId]
f3 = actualCostOfEdge < self.g_scores[self.goalId]
if f1 and f2 and f3:
# connect this edge
firstCoord = self.tree.node_id_to_real_world_coord(
bestEdge[0])
secondCoord = self.tree.node_id_to_real_world_coord(
bestEdge[1])
path = self.connect(firstCoord, secondCoord)
lastEdge = self.tree.real_world_to_node_id(secondCoord)
if path is None or len(path) == 0:
continue
nextCoord = path[len(path) - 1, :]
nextCoordPathId = self.tree.real_world_to_node_id(
nextCoord)
bestEdge = (bestEdge[0], nextCoordPathId)
if bestEdge[1] in self.tree.vertices.keys():
continue
else:
try:
del self.samples[bestEdge[1]]
except KeyError:
# invalid sample key
pass
eid = self.tree.add_vertex(nextCoord)
self.vertex_queue.append(eid)
if eid == self.goalId or bestEdge[0] == self.goalId or \
bestEdge[1] == self.goalId:
print("Goal found")
foundGoal = True
self.tree.add_edge(bestEdge[0], bestEdge[1])
g_score = self.compute_distance_cost(
bestEdge[0], bestEdge[1])
self.g_scores[bestEdge[1]] = g_score + self.g_scores[
bestEdge[0]]
self.f_scores[
bestEdge[1]] = g_score + self.compute_heuristic_cost(
bestEdge[1], self.goalId)
self.update_graph()
# visualize new edge
if animation:
self.draw_graph(xCenter=xCenter, cBest=cBest,
cMin=cMin, eTheta=eTheta,
samples=self.samples.values(),
start=firstCoord, end=secondCoord)
self.remove_queue(lastEdge, bestEdge)
else:
print("Nothing good")
self.edge_queue = []
self.vertex_queue = []
iterations += 1
print("Finding the path")
return self.find_final_path()
def find_final_path(self):
plan = [self.goal]
currId = self.goalId
while currId != self.startId:
plan.append(self.tree.node_id_to_real_world_coord(currId))
try:
currId = self.nodes[currId]
except KeyError:
print("cannot find Path")
return []
plan.append(self.start)
plan = plan[::-1] # reverse the plan
return plan
def remove_queue(self, lastEdge, bestEdge):
for edge in self.edge_queue:
if edge[1] == bestEdge[1]:
dist_cost = self.compute_distance_cost(edge[1], bestEdge[1])
if self.g_scores[edge[1]] + dist_cost >= \
self.g_scores[self.goalId]:
if (lastEdge, bestEdge[1]) in self.edge_queue:
self.edge_queue.remove(
(lastEdge, bestEdge[1]))
def connect(self, start, end):
# A function which attempts to extend from a start coordinates
# to goal coordinates
steps = int(self.compute_distance_cost(
self.tree.real_world_to_node_id(start),
self.tree.real_world_to_node_id(end)) * 10)
x = np.linspace(start[0], end[0], num=steps)
y = np.linspace(start[1], end[1], num=steps)
for i in range(len(x)):
if self._collision_check(x[i], y[i]):
if i == 0:
return None
# if collision, send path until collision
return np.vstack((x[0:i], y[0:i])).transpose()
return np.vstack((x, y)).transpose()
def _collision_check(self, x, y):
for (ox, oy, size) in self.obstacleList:
dx = ox - x
dy = oy - y
d = dx * dx + dy * dy
if d <= size ** 2:
return True # collision
return False
def compute_heuristic_cost(self, start_id, goal_id):
# Using Manhattan distance as heuristic
start = np.array(self.tree.node_id_to_real_world_coord(start_id))
goal = np.array(self.tree.node_id_to_real_world_coord(goal_id))
return np.linalg.norm(start - goal, 2)
def compute_distance_cost(self, vid, xid):
# L2 norm distance
start = np.array(self.tree.node_id_to_real_world_coord(vid))
stop = np.array(self.tree.node_id_to_real_world_coord(xid))
return np.linalg.norm(stop - start, 2)
# Sample free space confined in the radius of ball R
def informed_sample(self, m, cMax, cMin, xCenter, C):
samples = dict()
print("g_Score goal id: ", self.g_scores[self.goalId])
for i in range(m + 1):
if cMax < float('inf'):
r = [cMax / 2.0,
math.sqrt(cMax ** 2 - cMin ** 2) / 2.0,
math.sqrt(cMax ** 2 - cMin ** 2) / 2.0]
L = np.diag(r)
xBall = self.sample_unit_ball()
rnd = np.dot(np.dot(C, L), xBall) + xCenter
rnd = [rnd[(0, 0)], rnd[(1, 0)]]
random_id = self.tree.real_world_to_node_id(rnd)
samples[random_id] = rnd
else:
rnd = self.sample_free_space()
random_id = self.tree.real_world_to_node_id(rnd)
samples[random_id] = rnd
return samples
# Sample point in a unit ball
@staticmethod
def sample_unit_ball():
a = random.random()
b = random.random()
if b < a:
a, b = b, a
sample = (b * math.cos(2 * math.pi * a / b),
b * math.sin(2 * math.pi * a / b))
return np.array([[sample[0]], [sample[1]], [0]])
def sample_free_space(self):
rnd = [random.uniform(self.min_rand, self.max_rand),
random.uniform(self.min_rand, self.max_rand)]
return rnd
def best_vertex_queue_value(self):
if len(self.vertex_queue) == 0:
return float('inf')
values = [self.g_scores[v]
+ self.compute_heuristic_cost(v, self.goalId) for v in
self.vertex_queue]
values.sort()
return values[0]
def best_edge_queue_value(self):
if len(self.edge_queue) == 0:
return float('inf')
# return the best value in the queue by score g_tau[v] + c(v,x) + h(x)
values = [self.g_scores[e[0]] + self.compute_distance_cost(e[0], e[1])
+ self.compute_heuristic_cost(e[1], self.goalId) for e in
self.edge_queue]
values.sort(reverse=True)
return values[0]
def best_in_vertex_queue(self):
# return the best value in the vertex queue
v_plus_values = [(v, self.g_scores[v] +
self.compute_heuristic_cost(v, self.goalId))
for v in self.vertex_queue]
v_plus_values = sorted(v_plus_values, key=lambda x: x[1])
# print(v_plus_values)
return v_plus_values[0][0]
def best_in_edge_queue(self):
e_and_values = [
(e[0], e[1], self.g_scores[e[0]] + self.compute_distance_cost(
e[0], e[1]) + self.compute_heuristic_cost(e[1], self.goalId))
for e in self.edge_queue]
e_and_values = sorted(e_and_values, key=lambda x: x[2])
return e_and_values[0][0], e_and_values[0][1]
def expand_vertex(self, vid):
self.vertex_queue.remove(vid)
# get the coordinates for given vid
currCoord = np.array(self.tree.node_id_to_real_world_coord(vid))
# get the nearest value in vertex for every one in samples where difference is
# less than the radius
neighbors = []
for sid, s_coord in self.samples.items():
s_coord = np.array(s_coord)
if np.linalg.norm(s_coord - currCoord, 2) <= self.r and sid != vid:
neighbors.append((sid, s_coord))
# add an edge to the edge queue is the path might improve the solution
for neighbor in neighbors:
sid = neighbor[0]
h_cost = self.compute_heuristic_cost(sid, self.goalId)
estimated_f_score = self.compute_distance_cost(
self.startId, vid) + h_cost + self.compute_distance_cost(vid,
sid)
if estimated_f_score < self.g_scores[self.goalId]:
self.edge_queue.append((vid, sid))
# add the vertex to the edge queue
self.add_vertex_to_edge_queue(vid, currCoord)
def add_vertex_to_edge_queue(self, vid, currCoord):
if vid not in self.old_vertices:
neighbors = []
for v, edges in self.tree.vertices.items():
if v != vid and (v, vid) not in self.edge_queue and \
(vid, v) not in self.edge_queue:
v_coord = self.tree.node_id_to_real_world_coord(v)
if np.linalg.norm(currCoord - v_coord, 2) <= self.r:
neighbors.append((vid, v_coord))
for neighbor in neighbors:
sid = neighbor[0]
estimated_f_score = self.compute_distance_cost(
self.startId, vid) + self.compute_distance_cost(
vid, sid) + self.compute_heuristic_cost(sid, self.goalId)
if estimated_f_score < self.g_scores[self.goalId] and (
self.g_scores[vid] +
self.compute_distance_cost(vid, sid)) < \
self.g_scores[sid]:
self.edge_queue.append((vid, sid))
def update_graph(self):
closedSet = []
openSet = []
currId = self.startId
openSet.append(currId)
while len(openSet) != 0:
# get the element with lowest f_score
currId = min(openSet, key=lambda x: self.f_scores[x])
# remove element from open set
openSet.remove(currId)
# Check if we're at the goal
if currId == self.goalId:
break
if currId not in closedSet:
closedSet.append(currId)
# find a non visited successor to the current node
successors = self.tree.vertices[currId]
for successor in successors:
if successor in closedSet:
continue
else:
# calculate tentative g score
g_score = self.g_scores[currId] + \
self.compute_distance_cost(currId, successor)
if successor not in openSet:
# add the successor to open set
openSet.append(successor)
elif g_score >= self.g_scores[successor]:
continue
# update g and f scores
self.g_scores[successor] = g_score
self.f_scores[
successor] = g_score + self.compute_heuristic_cost(
successor, self.goalId)
# store the parent and child
self.nodes[successor] = currId
def draw_graph(self, xCenter=None, cBest=None, cMin=None, eTheta=None,
samples=None, start=None, end=None):
plt.clf()
# for stopping simulation with the esc key.
plt.gcf().canvas.mpl_connect(
'key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
for rnd in samples:
if rnd is not None:
plt.plot(rnd[0], rnd[1], "^k")
if cBest != float('inf'):
self.plot_ellipse(xCenter, cBest, cMin, eTheta)
if start is not None and end is not None:
plt.plot([start[0], start[1]], [end[0], end[1]], "-g")
for (ox, oy, size) in self.obstacleList:
plt.plot(ox, oy, "ok", ms=30 * size)
plt.plot(self.start[0], self.start[1], "xr")
plt.plot(self.goal[0], self.goal[1], "xr")
plt.axis([-2, 15, -2, 15])
plt.grid(True)
plt.pause(0.01)
@staticmethod
def plot_ellipse(xCenter, cBest, cMin, eTheta): # pragma: no cover
a = math.sqrt(cBest ** 2 - cMin ** 2) / 2.0
b = cBest / 2.0
angle = math.pi / 2.0 - eTheta
cx = xCenter[0]
cy = xCenter[1]
t = np.arange(0, 2 * math.pi + 0.1, 0.1)
x = [a * math.cos(it) for it in t]
y = [b * math.sin(it) for it in t]
R = np.array([[math.cos(angle), math.sin(angle)],
[-math.sin(angle), math.cos(angle)]])
fx = R @ np.array([x, y])
px = np.array(fx[0, :] + cx).flatten()
py = np.array(fx[1, :] + cy).flatten()
plt.plot(cx, cy, "xc")
plt.plot(px, py, "--c")
def main(maxIter=80):
print("Starting Batch Informed Trees Star planning")
obstacleList = [
(5, 5, 0.5),
(9, 6, 1),
(7, 5, 1),
(1, 5, 1),
(3, 6, 1),
(7, 9, 1)
]
bitStar = BITStar(start=[-1, 0], goal=[3, 8], obstacleList=obstacleList,
randArea=[-2, 15], maxIter=maxIter)
path = bitStar.plan(animation=show_animation)
print("Done")
if show_animation:
plt.plot([x for (x, y) in path], [y for (x, y) in path], '-r')
plt.grid(True)
plt.pause(0.05)
plt.show()
if __name__ == '__main__':
main()