In [6]:
import matplotlib.pyplot as plt
import numpy as np
import math
import time

from statistics import mean
from scipy.interpolate import interp1d
from scipy import spatial

In [8]:
class RacetrackCommandNodeSeries:
    '''
    Member variables
    racetrack: the racetrack object contextualizing the command node series
    command_nodes: list of 2-tuples containing [waypoint_idx, distance_from_innerbound_waypoint]. List is always sorted by waypoint_idx
        waypoint_idx: corresponds to inner track bound waypoint
        distance_from_innerbound_waypoint: distance from the waypointIdx normal to the curve of the inner track edge
    command_node_coordinates: the absolute position of command nodes
    route_waypoints: derived route waypoints
    '''
    
    def __init__(self, racetrack, init_random_nodes=True, init_cp_frequency=20, verbosity=1):
        if not isinstance(racetrack, Racetrack):
            raise Error('Hey wiseguy, the constructor argument needs a [racetrack] Racetrack object')
        self.racetrack = racetrack
        self.verbosity = verbosity
        self.init_cp_frequency = init_cp_frequency
        self.command_nodes = []
        self.command_node_coordinates = []
        self.route_waypoints = []
        if init_random_nodes: 
            self.generateRandomCommandNodes()
        self.calculateAbsoluteCommandNodeCoordinates()
        
    def generateRandomCommandNodes(self):
        '''
        init_cp_frequency: frequency of command nodes along the racetrack calculated using a random normal distribution
        '''
        #Reset coordinates and waypoints
        self.command_nodes = []
        self.command_node_coordinates = []
        self.route_waypoints = []
        next_idx = 0
        while next_idx < len(self.racetrack.waypoints_inner):
            dist_from_inner_waypoint = np.random.uniform(0, self.racetrack.trackwidth/2) + np.random.uniform(0, self.racetrack.trackwidth/2)
            self.command_nodes.append([next_idx, dist_from_inner_waypoint])
            next_idx += 2 + int(np.random.uniform(0, self.init_cp_frequency/2) + np.random.uniform(0, self.init_cp_frequency/2))
        #First and last command nodes are always the centerline
        self.command_nodes[0][1] = (self.racetrack.trackwidth/2)
        self.command_nodes.append(self.command_nodes[0])
        self.validateAndFixObstructions()
        
    def generateRandomCommandNodeAtIndex(self, next_idx):
        dist_from_inner_waypoint = np.random.uniform(0, self.racetrack.trackwidth/2) + np.random.uniform(0, self.racetrack.trackwidth/2)
        cn_insert_index = 0
        for idx, cn in enumerate(self.command_nodes):
            if cn[0] < next_idx : cn_insert_index +=1
        self.command_nodes.insert((cn_insert_index-1), [next_idx, dist_from_inner_waypoint])
    
    def generateRandomCommandNodeNearWaypoint(self, next_waypoint, inner_min_idx_increment=0):
        [inner_dist, inner_min_idx] = self.racetrack.waypoints_inner_kdtree.query(next_waypoint)
        if self.verbosity >= 2: print('adding a new command node at inner index:', inner_min_idx+inner_min_idx_increment)
        self.generateRandomCommandNodeAtIndex(inner_min_idx+inner_min_idx_increment)
        
    def validateAndFixObstructions(self):
        is_path_obstructed = True
        cn_retries = 0
        last_cn_fix = -1
        while is_path_obstructed:
            is_path_obstructed = False
            #1) Generate waypoint path
            self.calculateAbsoluteCommandNodeCoordinates()
            self.calculateRouteWaypoints()
            #2) Calculate path obstruction indices
            obstructed_indices_waypoints = self.findObstructedRouteWaypoints(distance_threshold=0)
            #3) A: Reposition the closest command node. B: Insert new nodes at first path obstructed index between each command node.
            
            if len(obstructed_indices_waypoints) > 0: 
                is_path_obstructed = True
                closest_command_node_to_obstruction = self.findNearestCommandNodeToWaypoint(obstructed_indices_waypoints[0][1])
                if self.verbosity >= 2: print('Fixing obstruction at command node:', closest_command_node_to_obstruction)
                
                #Catch a bad scenario where the system is trying to generate a "fix" at an index earlier than a previous fix
                if last_cn_fix >= closest_command_node_to_obstruction[0] and self.verbosity >= 2: cn_retries+=1
                else: cn_retries = 0
                
                if cn_retries > 3 and self.verbosity >= 2:
                    print('Unable to resolve route obstruction. This usually happens when there is (1) a problem with the track data or (2) The CP frequency is too low')
                    self.plotBigTrackAndRoute(coordinate_labels=True)
                    plt.scatter(obstructed_indices_waypoints[0][1][0], obstructed_indices_waypoints[0][1][1], c='#FF00FF')
                    plt.show()
                    print('Terminating...')
                    raise SystemExit(0)
                
                #Sometimes the algorithm gets stuck trying to fix/update an obstruction by repeatedly modifying the same CP or even worse, modifying an earlier one
                #When this happens, try incrementing the nearest inner waypoint index used to derive the "fix" CP
                if last_cn_fix >= closest_command_node_to_obstruction[0]:
                    self.generateRandomCommandNodeNearWaypoint(obstructed_indices_waypoints[0][1], inner_min_idx_increment=3)
                else:
                    self.generateRandomCommandNodeNearWaypoint(obstructed_indices_waypoints[0][1])
                
                closest_command_node_to_obstruction[1] = np.random.uniform(0, self.racetrack.trackwidth/2) + np.random.uniform(0, self.racetrack.trackwidth/2)
                last_cn_fix = closest_command_node_to_obstruction[0]
                
    def findNearestCommandNodeToWaypoint(self, waypoint):
        cnc_kd = spatial.KDTree(self.command_node_coordinates)
        [command_node_absolute_position, closest_command_node_idx] = cnc_kd.query(waypoint)
        return self.command_nodes[closest_command_node_idx]
        
    def findObstructedRouteWaypoints(self, route=None, distance_threshold=0, step=1):
        '''
        route: the waypoint array to be examined. If none, then self.route_waypoints are used
        distance_threshold: if the vehicle is closer than this value to either track boundary, it is regarded as out of bounds
        step: how many waypoints to skip between examinations. 1=examine every waypoint, 2=examine every other, etc.
        Returns route indices of obstructed waypoints, and waypoint reference [route_idx, waypoint_coords]
        '''
        if route == None:
            route = self.route_waypoints
        obstructed_indices = []
        
        #Check each waypoint's distance from track edges
        for idx, waypoint in enumerate(route):
            inner_dist, outer_dist, inner_min_idx, outer_min_idx = self.racetrack.distanceToTrackEdges(waypoint)
            if inner_dist < distance_threshold or outer_dist < distance_threshold:
                obstructed_indices.append([idx, waypoint])
        return obstructed_indices
    
    def calculateAbsoluteCommandNodeCoordinates(self):
        self.command_node_coordinates = []
        for command_node in self.command_nodes:
            waypoint_idx = command_node[0]
            distance = command_node[1]
            inner_waypoint = self.racetrack.waypoints_inner[waypoint_idx]
            angle = self.racetrack.waypoints_inner_angles[waypoint_idx] + 90
            next_position = []
            next_position.append(distance*math.cos(math.radians(angle)) + inner_waypoint[0])
            next_position.append(distance*math.sin(math.radians(angle)) + inner_waypoint[1])
            self.command_node_coordinates.append(next_position)
        
    def calculateRouteWaypoints(self):
        dist_between_waypoints = self.racetrack.waypoints_inner_distances[0]
        centerline_total_distance = np.sum(self.racetrack.waypoints_center_distances)
        # Linear length along the interpolation:
        cn_coords = np.asarray(self.command_node_coordinates)
        s = np.cumsum(np.sqrt(np.sum(np.diff(cn_coords, axis=0)**2, axis=1)))
        s = np.insert(s, 0, 0)/s[-1]
        interpolator = interp1d(s, cn_coords, kind='quadratic', axis=0)
        alpha = np.linspace(0, 1, int(centerline_total_distance/dist_between_waypoints))
        self.route_waypoints = interpolator(alpha)
    
    def plotTrackAndRoute(self):
        '''
        Plots route, absolute angles, steering angles side-by-side
        '''
        #Calculate the absolue angles, steering angles for plotting
        [absolute_angles_arr, steering_angles_arr, distances_arr] = self.racetrack.calculateWaypointDistancesAndAngles(self.route_waypoints, denormalize_angles=True)
        
        plt.figure(figsize=(25,6))
        splt0 = plt.subplot(1, 3, 1)
        plt.grid()
        #Plot racetrack
        self.racetrack.plotTrackSeries()
        #Plot waypoints
        self.calculateRouteWaypoints()
        [waypoints_x, waypoints_y] = np.split(np.asarray(self.route_waypoints), 2, axis=1)
        splt0.scatter(waypoints_x, waypoints_y, c='#00CC33')
        splt0.plot(waypoints_x, waypoints_y, c='#00CC33')
        #Plot command points
        [command_points_x, command_points_y] = np.split(np.asarray(self.command_node_coordinates), 2, axis=1)
        splt0.scatter(command_points_x, command_points_y, c='#CC0000', s=25)
        #Plot absolute angles
        splt1 = plt.subplot(1, 3, 2)
        plt.grid()
        plt.title(label='Absolute steering angle at waypoint')
        plt.xlabel("Waypoint index")
        plt.ylabel("Degrees from origin")
        splt1.plot(absolute_angles_arr)
        #Plot steering angles
        splt2 = plt.subplot(1, 3, 3)
        plt.grid()
        plt.title(label='Incremental steering angle at waypoint')
        plt.xlabel("Waypoint index")
        plt.ylabel("Degrees (negative = left turn)")
        splt2.plot(steering_angles_arr)
        plt.show()
        
    def plotBigTrackAndRoute(self, coordinate_labels=False):
        '''
        coordinate_labels: if True, then print coordinates next to each route point
        '''
        plt.figure(figsize=(20,15))
        #Plot racetrack
        self.racetrack.plotTrackSeries()
        #Plot waypoints
        self.calculateRouteWaypoints()
        [waypoints_x, waypoints_y] = np.split(np.asarray(self.route_waypoints), 2, axis=1)
        plt.scatter(waypoints_x, waypoints_y, c='#00CC33')
        plt.plot(waypoints_x, waypoints_y, c='#00CC33')
        #Plot command points
        [command_points_x, command_points_y] = np.split(np.asarray(self.command_node_coordinates), 2, axis=1)
        plt.scatter(command_points_x, command_points_y, c='#CC0000', s=25)
        #Plot labels
        if coordinate_labels:
            for idx, point in enumerate(self.route_waypoints):
                plt.annotate(str(idx), (point[0]+.02, point[1]+.02))
        
    #Calculate a fitness of the waypoint series. Fitness is equal to theoretical best track time assuming instant acceleration
    def routeFitness(self):
        total_cost = 0
        absolute_angles_arr, steering_angles_arr, distances_arr = self.racetrack.calculateWaypointDistancesAndAngles(self.route_waypoints)
        for idx, angle_delta in enumerate(steering_angles_arr):
            velocity = self.racetrack.maxVelocityAtSteeringAngle(angle_delta)
            cost = (distances_arr[idx]/velocity)
            total_cost += cost
        return total_cost
