Skip to content

Commit

Permalink
Dynamic Lane Profile (DLP) - 0.8.12 - eFini refactor (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
sunnyhaibin committed Jan 1, 2022
1 parent 1bb8654 commit dcf9a9a
Showing 1 changed file with 23 additions and 34 deletions.
57 changes: 23 additions & 34 deletions selfdrive/controls/lib/lateral_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,47 +185,19 @@ def update(self, sm):
self.LP.lll_prob *= self.lane_change_ll_prob
self.LP.rll_prob *= self.lane_change_ll_prob
self.d_path_w_lines_xyz = self.LP.get_d_path(v_ego, self.t_idxs, self.path_xyz)
if self.use_lanelines:
if self.get_dynamic_lane_profile(): # this is where we call the method below
# laneline logic - True
d_path_xyz = self.d_path_w_lines_xyz
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, self.steer_rate_cost)
self.dynamic_lane_profile_status = False
elif self.dynamic_lane_profile == 0:
d_path_xyz = self.d_path_w_lines_xyz
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, self.steer_rate_cost)
self.dynamic_lane_profile_status = False
elif self.dynamic_lane_profile == 1:
d_path_xyz = self.path_xyz
path_cost = np.clip(abs(self.path_xyz[0, 1] / self.path_xyz_stds[0, 1]), 0.5, 1.5) * MPC_COST_LAT.PATH
# Heading cost is useful at low speed, otherwise end of plan can be off-heading
heading_cost = interp(v_ego, [5.0, 10.0], [MPC_COST_LAT.HEADING, 0.0])
self.lat_mpc.set_weights(path_cost, heading_cost, self.steer_rate_cost)
self.dynamic_lane_profile_status = True
elif self.dynamic_lane_profile == 2 and ((self.LP.lll_prob + self.LP.rll_prob)/2 < 0.3) and self.lane_change_state == LaneChangeState.off:
d_path_xyz = self.path_xyz
path_cost = np.clip(abs(self.path_xyz[0, 1] / self.path_xyz_stds[0, 1]), 0.5, 1.5) * MPC_COST_LAT.PATH
# Heading cost is useful at low speed, otherwise end of plan can be off-heading
heading_cost = interp(v_ego, [5.0, 10.0], [MPC_COST_LAT.HEADING, 0.0])
self.lat_mpc.set_weights(path_cost, heading_cost, self.steer_rate_cost)
self.dynamic_lane_profile_status = True
self.dynamic_lane_profile_status_buffer = True
elif self.dynamic_lane_profile == 2 and ((self.LP.lll_prob + self.LP.rll_prob)/2 > 0.5) and \
self.dynamic_lane_profile_status_buffer and self.lane_change_state == LaneChangeState.off:
d_path_xyz = self.d_path_w_lines_xyz
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, self.steer_rate_cost)
self.dynamic_lane_profile_status = False
self.dynamic_lane_profile_status_buffer = False
elif self.dynamic_lane_profile == 2 and self.dynamic_lane_profile_status_buffer == True and self.lane_change_state == LaneChangeState.off:
self.dynamic_lane_profile_status = False # store auto mode value
else:
# laneless logic - False
d_path_xyz = self.path_xyz
path_cost = np.clip(abs(self.path_xyz[0, 1] / self.path_xyz_stds[0, 1]), 0.5, 1.5) * MPC_COST_LAT.PATH
# Heading cost is useful at low speed, otherwise end of plan can be off-heading
heading_cost = interp(v_ego, [5.0, 10.0], [MPC_COST_LAT.HEADING, 0.0])
self.lat_mpc.set_weights(path_cost, heading_cost, self.steer_rate_cost)
self.dynamic_lane_profile_status = True
else:
d_path_xyz = self.d_path_w_lines_xyz
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, self.steer_rate_cost)
self.dynamic_lane_profile_status = False
self.dynamic_lane_profile_status_buffer = False
self.dynamic_lane_profile_status = True # store auto mode value

y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:, 1])
heading_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw)
Expand Down Expand Up @@ -257,6 +229,23 @@ def update(self, sm):
else:
self.solution_invalid_cnt = 0

def get_dynamic_lane_profile(self):
if self.dynamic_lane_profile == 1: # laneless
return False
if self.dynamic_lane_profile == 0: # laneline
return True
elif self.dynamic_lane_profile == 2: # auto
# only while lane change is off
if self.lane_change_state == LaneChangeState.off:
# laneline probability too low, we switch to laneless mode
if (self.LP.lll_prob + self.LP.rll_prob)/2 < 0.3:
self.dynamic_lane_profile_status_buffer = True
if (self.LP.lll_prob + self.LP.rll_prob)/2 > 0.5:
self.dynamic_lane_profile_status_buffer = False
if self.dynamic_lane_profile_status_buffer: # in buffer mode, always laneless
return True
return False

def publish(self, sm, pm):
plan_solution_valid = self.solution_invalid_cnt < 2
plan_send = messaging.new_message('lateralPlan')
Expand Down

0 comments on commit dcf9a9a

Please sign in to comment.