-
Notifications
You must be signed in to change notification settings - Fork 1
/
local_planner.py
215 lines (146 loc) · 7.71 KB
/
local_planner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#
# local planner
#mk
#v0.08
#
import numpy as np
import numpy.linalg as nla
import csv
class Path_generator(object):
def __init__(self, waypoints_file, resolution = None):
# load waypoints from file into array
self.waypoints_file = waypoints_file #WAYPOINTS_FILENAME
self.waypoints = None
with open(self.waypoints_file) as waypoints_file_handle:
wp_list = list(csv.reader(waypoints_file_handle,
delimiter=',',
quoting=csv.QUOTE_NONNUMERIC))
self.waypoints = np.array(wp_list)
#print("waypoints LEN = " + str(len(self.waypoints)))
self.new_waypoints = [] # holds new batch of higher res wps
self.wp_interp = [] # interpolated values
# (rows = waypoints, columns = [x, y, v])
self.wp_interp_hash = [] # hash table which indexes waypoints
# to the index of the waypoint in wp_interp
self.interp_counter = 0 # counter current interpolated point index
self.closest_index = 0
self.closest_distance = 0
self.new_index = 0
self.new_distance = 0
self.wp_distances = []
self.interp_resolution = 0.01 # distance between interpolated points
self.interp_lookahead_distance = 20 #20 # incomming stream lookahead in meters
self._calc_interpolations() #one time only w/ resolution
def set_lookahead_distance(self, lookahead = None):
self.interp_lookahead_distance = lookahead
def _calc_interpolations(self):
waypoints = self.waypoints
INTERP_DISTANCE_RES = self.interp_resolution
wp_distances = []
for i in range(1, waypoints.shape[0]):
wp_distances.append(
np.sqrt((waypoints[i, 0] - waypoints[i-1, 0])**2 +
(waypoints[i, 1] - waypoints[i-1, 1])**2))
wp_distances.append(0) # last distance is 0 because it is the distance
# from the last waypoint to the last waypoint
# Linearly interpolate between waypoints and store in a list
# along with their hash table
wp_interp = [] # interpolated values
# (rows = waypoints, columns = [x, y, v])
wp_interp_hash = [] # hash table which indexes waypoints
# to the index of the waypoint in wp_interp
interp_counter = 0 # counter for current interpolated point index
for i in range(waypoints.shape[0] - 1):
# add original waypoint to interpolated waypoints list
# and append it to the hash table
wp_interp.append(list(waypoints[i]))
wp_interp_hash.append(interp_counter)
interp_counter+=1
# interpolate to the next waypoint & calc the number of
# points to interpolate based on the specified resolution and
# incrementally add interpolated points until the next waypoint
# is about to be reached
num_pts_to_interp = int(np.floor(wp_distances[i] /\
float(INTERP_DISTANCE_RES)) - 1)
wp_vector = waypoints[i+1] - waypoints[i]
wp_uvector = wp_vector / np.linalg.norm(wp_vector)
for j in range(num_pts_to_interp):
next_wp_vector = INTERP_DISTANCE_RES * float(j+1) * wp_uvector
wp_interp.append(list(waypoints[i] + next_wp_vector))
interp_counter+=1
# add last waypoint at the end
wp_interp.append(list(waypoints[-1]))
wp_interp_hash.append(interp_counter)
interp_counter+=1
self.wp_distances=wp_distances
self.wp_interp= wp_interp
self.wp_interp_hash = wp_interp_hash
def _calc_new_waypoints(self,cx,cy):
# form a reduction subset of all waypoints to send to the controller
# that are within some lookahead distance in front of and behind the vehicle
# find closest waypoint index to car
# increment the index from the previous one until the new distance calculations
# start increasing & decrement the index
# the final index needs to be the closest point assuming that
# the vehicle will always break out of instability points when there
# are two indices with the minimum distance
current_x = cx
current_y = cy
#print("current_x, current_y ",current_x, current_y)
#print("self.closest_index ", self.closest_index)
self.closest_distance = nla.norm(np.array([
self.waypoints[self.closest_index, 0] - current_x,
self.waypoints[self.closest_index, 1] - current_y]))
#print("self.closest_distance",self.closest_distance)
self.new_distance = self.closest_distance
self.new_index = self.closest_index
while self.new_distance <= self.closest_distance:
self.closest_distance = self.new_distance
self.closest_index = self.new_index
self.new_index += 1
if self.new_index >= self.waypoints.shape[0]: #end of path
break
self.new_distance = nla.norm(np.array([
self.waypoints[self.new_index, 0] - current_x,
self.waypoints[self.new_index, 1] - current_y]))
self.new_distance = self.closest_distance
self.new_index = self.closest_index
while self.new_distance <= self.closest_distance:
self.closest_distance = self.new_distance
self.closest_index = self.new_index
self.new_index -= 1
if self.new_index < 0: # beginning of path
break
self.new_distance = nla.norm(np.array([
self.waypoints[self.new_index, 0] - current_x,
self.waypoints[self.new_index, 1] - current_y]))
# use closest index found to return the path that is one
# waypoint behind and some lookahead waypoints in front
# specified by a fixed lookahead distance
waypoint_subset_first_index = self.closest_index - 1
if waypoint_subset_first_index < 0:
waypoint_subset_first_index = 0
waypoint_subset_last_index = self.closest_index
total_distance_ahead = 0
while total_distance_ahead < self.interp_lookahead_distance:
total_distance_ahead += self.wp_distances[waypoint_subset_last_index]
waypoint_subset_last_index += 1
if waypoint_subset_last_index >= self.waypoints.shape[0]:
waypoint_subset_last_index = self.waypoints.shape[0] - 1
break
self.new_waypoints = \
self.wp_interp[self.wp_interp_hash[waypoint_subset_first_index]:\
self.wp_interp_hash[waypoint_subset_last_index] + 1]
def get_wp_distances(self):
return self.wp_distances
def get_wp_interp(self):
return self.wp_interp
def get_all_waypoints(self):
# waypoints already loaded on init
return self.waypoints
def get_new_waypoints(self,cx,cy):
# need to feed in hash for the moment
self._calc_new_waypoints(cx,cy)
return self.new_waypoints
def get_wp_interp_hash(self):
return self.wp_interp_hash