# Path Visualization and Optimization for AWS DeepRacer

In [1]:
from os import listdir
from os.path import isfile, join, exists, splitext

import math
import numpy as np

from scipy.interpolate import CubicSpline, BPoly

import pickle

import matplotlib
import matplotlib.pyplot as plt

from ipywidgets import interact
import ipywidgets as widgets

# matplotlib.use('webagg')

%matplotlib notebook

matplotlib.rcParams['savefig.dpi'] = 80
matplotlib.rcParams['figure.dpi'] = 80

## Load waypoints for the track you want to run analysis on

In [3]:
tracks = [splitext(f)[0] for f in listdir("tracks/") if isfile(join("tracks/", f))]

print(tracks)

['Oval_track', 'ChampionshipCup2019_track', 'New_York_Track', 'Mexico_track', 'Tokyo_Training_track', 'reinvent_base', 'H_track', 'Bowtie_track', 'reInvent2019_track', 'China_track', 'Canada_Training', 'reInvent2019_wide', 'AWS_track', 'London_Loop_Train', 'reInvent2019_wide_mirrored', 'Virtual_May19_Train_track', 'Straight_track', 'Vegas_track']


In [18]:
def wrap_360(theta):
    theta = theta % 360;
    theta = (theta + 360) % 360
    
    return theta


def wrap_180(theta):
    theta = wrap_360(theta)
    if theta > 180:
        theta -= 360

    return theta


def convert_slope(m):
    radians = math.atan2(m[1], m[0])
    dir1 = radians
    dir2 = (radians + np.pi) % (2 * np.pi)
    
    return [dir1, dir2]


def get_distance(p1, p2):
    return math.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)


def get_angle(l1, l2):
    d1x = l1[1][0] - l1[0][0]
    d1y = l1[1][1] - l1[0][1]
    d2x = l2[1][0] - l2[0][0]
    d2y = l2[1][1] - l2[0][1]
    
    return math.atan2(d1x * d2y - d1y * d2x, d1x * d2x + d1y * d2y)


def get_spline(points, centerpoint):
    x = points[:,0]
    y = points[:,1]

    x_min = min(x)    
    x = x - x_min
    
    theta = get_periodic(list(zip(x, y)), centerpoint)
    theta[0] = 0
    
    return CubicSpline(theta, points, bc_type='periodic')


def get_points(cs, num=250):
    xs = 2 * np.pi * np.linspace(0, 1, num)
    return cs(xs)


def get_periodic(points, centerpoint):
    value = []
    l1 = (centerpoint, (centerpoint[0]+1, centerpoint[1]))
    
    for point in points:
        l2 = (centerpoint, point)
        value.append(get_angle(l1, l2) + np.pi)
        
    return value


def get_nearest_point(waypoints, point):
    min_i = None
    min_distance = math.inf

    for i, waypoint in enumerate(waypoints):
        distance = get_distance(waypoint, point)
        if distance < min_distance:
            min_i = i
            min_distance = distance
    
    return min_i


def get_best_position(waypoints, point):
    i1 = get_nearest_point(waypoints, point)
    p1 = waypoints[i1]
    
    i2 = i1 + 1
    if i2 > len(waypoints)-1:
        i2 = 0

    p2 = waypoints[i2]
    
    m = [p2[0] - p1[0], p2[1] - p1[1]]
    
    return p1, m


def plot_way(ax, cs, style="path"):
    xs = 2 * np.pi * np.linspace(0, 1, 250)
    cs_xs = cs(xs)
    x = cs_xs[:, 0]
    y = cs_xs[:, 1]
    
    if style == "path":
        ax.plot(x, y, label='spline')
    
    if style == "points":
        ax.scatter(x, y, s=20, label='spline')
    
    
def plot_slope(ax, point, slope, color="black"):
    x = point[0]
    y = point[1]
    
    rads = convert_slope(slope)[0]
    cos = np.cos(rads)
    
    slope = slope[1]/slope[0]
    
    b = y - (slope * x)
    xs = np.array([x-cos*.75, x+cos*.75])
    ys = (slope * xs) + b
    
    ax.plot(xs, ys, color=color)
    
    
def plot_best_position(ax, current_position, best_position, slope):
    positions = np.array([current_position, position])
    plot_waypoints(ax, positions, s=80, color="black")
    plot_slope(ax, position, slope)
    ax.plot(positions[:,0], positions[:,1], linestyle='dashed')

    
def plot_waypoints(ax, points, s=20, color=None, alpha=1):
    x = points[:,0]
    y = points[:,1]
    ax.scatter(x, y, s=s, color=color, alpha=alpha)

    
def plot_track_waypoints(ax, points):
    x = points[:,0]
    y = points[:,1]
    ax.plot(x, y, linewidth=5, alpha=.25)
    
    
def plot_track(ax, track_waypoints):
    plot_track_waypoints(ax, track_waypoints[:,0:2])
    plot_track_waypoints(ax, track_waypoints[:,2:4])
    plot_track_waypoints(ax, track_waypoints[:,4:6])

In [25]:
import numpy as np
from scipy import interpolate

def cubic_spline_samples(points, tangents, resolution):
    '''
    Compute and sample the cubic splines for a set of input points with
    optional information about the tangent (direction AND magnitude). The 
    splines are parametrized along the traverse line (piecewise linear), with
    the resolution being the step size of the parametrization parameter.
    The resulting samples have NOT an equidistant spacing.

    Arguments:      points: a list of n-dimensional points
                    tangents: a list of tangents
                    resolution: parametrization step size
    Returns:        samples

    Notes: Lists points and tangents must have equal length. In case a tangent
           is not specified for a point, just pass None. For example:
                    points = [[0,0], [1,1], [2,0]]
                    tangents = [[1,1], None, [1,-1]]

    '''
    resolution = float(resolution)
    points = np.asarray(points)
    num_points, dim = points.shape

    # parametrization parameter s.
    dp = np.diff(points, axis=0)                 # difference between points
    dp = np.linalg.norm(dp, axis=1)              # distance between points
    d = np.cumsum(dp)                            # cumsum along the segments
    d = np.hstack([[0], d])                      # add distance from first point
    l = d[-1]                                    # length of point sequence
    num_samples = int(l/resolution)              # number of samples
    s, r = np.linspace(0, l, num_samples, retstep=True) # sample parameter and step

    # Bring points and (optional) tangent information into correct format.
    assert(len(points) == len(tangents))
    
    data = np.empty([num_points, dim], dtype=object)
    
    for i,p in enumerate(points):
        t = tangents[i]
        # either tangent is None or has the same
        # number of dimensions as the point p.
        assert(t is None or len(t) == dim)
        fuse = list(zip(p, t) if t is not None else zip(p,))
        data[i,:] = fuse

    # compute splines per dimension separately.
    samples = np.zeros([num_samples, dim])
    
    for i in range(dim):
        poly = BPoly.from_derivatives(d, data[:, i])
        samples[:, i] = poly(s)

    return samples

In [27]:
points = []
tangents = []
resolution = 0.2

points.append([0., 0.]); tangents.append([1, 1])
points.append([3., 4.]); tangents.append([1, 0])
points.append([5., 2.]); tangents.append([0, -1])
points.append([3., 0.]); tangents.append([-1, -1])

points = np.asarray(points)
tangents = np.asarray(tangents)

# interpolate with different tangent lengths, but equal direction.
scale = 1.
tangents1 = np.dot(tangents, scale*np.eye(2))
samples1 = cubic_spline(points, tangents1, resolution)

scale = 2.
tangents2 = np.dot(tangents, scale*np.eye(2))
samples2 = cubic_spline(points, tangents2, resolution)

scale = 0.6
tangents3 = np.dot(tangents, scale*np.eye(2))
samples3 = cubic_spline(points, tangents3, resolution)

plt.scatter(samples1[:,0], samples1[:,1], marker='o', label='samples1')
plt.scatter(samples2[:,0], samples2[:,1], marker='o', label='samples2')
plt.scatter(samples3[:,0], samples3[:,1], marker='o', label='samples3')
plt.scatter(points[:,0], points[:,1], s=100, c='k', label='input')

plt.axis('equal')
plt.title('Interpolation')

plt.legend()
plt.show()

In [30]:
def get_track_waypoints(track):
    return np.load("./tracks/%s.npy" % track)


# def get_track_controlpoints(track):
#     file = "./controlpoints/%s.pkl" % track
    
#     if exists(file):
#         value = pickle.load(open(file, "rb"))        
#         return (np.array(value['points']), value['centerpoint'])
    
#     return None, None


# def track_click(event):
#     print('%s click: button=%d, x=%d, y=%d, xdata=%f, ydata=%f' %
#           ('double' if event.dblclick else 'single', event.button,
#            event.x, event.y, event.xdata, event.ydata))


# cid = None    

# def track_selected(x):
track = "reinvent_base"

track_waypoints = get_track_waypoints(track) ### re:invent track
track_waypoints.shape

# Choose control points to draw the spline for the chosen optimal path
# controlpoints, tangents = get_track_controlpoints(track)

points = []
tangents = []
resolution = 0.2

points.append([0., 0.]); tangents.append([1, 1])
points.append([3., 4.]); tangents.append([1, 0])
points.append([5., 2.]); tangents.append([0, -1])
points.append([3., 0.]); tangents.append([-1, -1])

if not points is None:
#     way = get_spline(controlpoints, centerpoint=centerpoint)
    waypoints = cubic_spline_samples(points, tangents, .1)

# print(waypoints.tolist())

# Plot the results
fig, ax = plt.subplots(figsize=(10, 8))
ax.axis('equal')

cid = fig.canvas.mpl_connect('button_press_event', track_click)

# if not controlpoints is None:
#     plot_way(ax, way)

plot_track(ax, track_waypoints)

if not controlpoints is None:
    plot_waypoints(ax, waypoints, alpha=.25, color="skyblue")
    plot_waypoints(ax, controlpoints, s=50, color="black")

plt.grid(color='black', linestyle='-', linewidth=.5)
plt.show()

    
# interact(track_selected, x=tracks)

<IPython.core.display.Javascript object>

## Visualize Nearest Point

In [11]:
current_position = [6.5, 2]

position, slope = get_best_position(waypoints, current_position)

fig, ax = plt.subplots(figsize=(20, 10))
ax.axis('equal')

plot_way(ax, way)
plot_track(ax, track_waypoints)

plot_best_position(ax, current_position, position, slope)

plt.grid(color='black', linestyle='-', linewidth=.5)

plt.show()

TypeError: 'NoneType' object is not iterable

## Visualize Headings

In [14]:
positions = way(2 * np.pi * np.linspace(0, 1, 100))

fig, ax = plt.subplots(figsize=(20, 10))
ax.axis('equal')

plot_way(ax, way)
plot_track(ax, track_waypoints)

for position in positions:
    position, slope = get_best_position(waypoints, position)
    plot_slope(ax, position, slope, color=None)
    plot_waypoints(ax, np.array([position]), color="skyblue")

plt.grid(color='black', linestyle='-', linewidth=.5)

plt.show()

TypeError: 'NoneType' object is not callable

## Visualize Reward Function

In [15]:
def get_test_params(position, heading):
    return {
        # known good values
        'all_wheels_on_track': True,
        'track_width': .4,
        'is_reversed': False,
        'distance_from_center': .045,

        'heading': heading,
        'x': position[0],
        'y': position[1]
    }

def reward_function(params):
    # Read input parameters
    print('Params:', params)
    
    is_reversed = params['is_reversed']
    all_wheels_on_track = params['all_wheels_on_track']
    track_width = params['track_width']
    distance_from_center = params['distance_from_center']
    heading = params['heading']
    x = params['x']
    y = params['y']

    heading = wrap_360(heading)
    
    print("Heading:", heading)
    
    position = [x, y]
    
    # Give a very low reward by default
    reward = 1e-3

    best_position, slope = get_best_position(waypoints, [x, y])
    best_headings = [wrap_360(math.degrees(direction)) for direction in convert_slope(slope)]

    print("Best Position:", best_position)
    print("Best Headings:", best_headings)
    
    best_position_diff = get_distance(position, best_position)
    best_heading_diff = min([abs(heading - best_heading) for best_heading in best_headings])
    
    print("Difference from Best Position:", best_position_diff)
    print("Difference from Best Heading:", best_heading_diff)

    # Give a high reward if no wheels go off the track and
    # the agent is somewhere in between the track borders
    if not is_reversed and all_wheels_on_track and (0.5*track_width - distance_from_center) >= 0.05:
        reward = 1.0

    position_reward = (1 + (1 - math.tanh(best_position_diff))) ** 2
    heading_reward = (1 + (1 - best_heading_diff / 360)) ** 2
    reward *= (position_reward * heading_reward) ** 2
    
    print("Position Reward:", position_reward)
    print("Heading Reward:", heading_reward)    
    print("Reward:", reward)

    # Always return a float value
    return float(reward)


####################################### position 1 #######################################

current_position = [1.55, 1]
position, slope = get_best_position(waypoints, current_position)

fig, ax = plt.subplots(figsize=(20, 10))
ax.axis('equal')

plot_way(ax, way)
plot_track(ax, track_waypoints)
plot_best_position(ax, current_position, position, slope)

plt.grid(color='black', linestyle='-', linewidth=.5)
plt.show()

reward_function(get_test_params(current_position, wrap_180(convert_slope(slope)[1])))


####################################### position 2 #######################################

current_position = [6.5, 2]
position, slope = get_best_position(waypoints, current_position)

fig, ax = plt.subplots(figsize=(20, 10))
ax.axis('equal')

plot_way(ax, way)
plot_track(ax, track_waypoints)
plot_best_position(ax, current_position, position, slope)

plt.grid(color='black', linestyle='-', linewidth=.5)
plt.show()

reward_function(get_test_params(current_position, wrap_180(convert_slope(slope)[1])))


####################################### position 3 #######################################

current_position = [4.5, 3]
position, slope = get_best_position(waypoints, current_position)

fig, ax = plt.subplots(figsize=(20, 10))
ax.axis('equal')

plot_way(ax, way)
plot_track(ax, track_waypoints)
plot_best_position(ax, current_position, position, slope)

plt.grid(color='black', linestyle='-', linewidth=.5)
plt.show()

reward_function(get_test_params(current_position, 30))


####################################### position 4 #######################################

current_position = [2, 4.5]
position, slope = get_best_position(waypoints, current_position)

fig, ax = plt.subplots(figsize=(20, 10))
ax.axis('equal')

plot_way(ax, way)
plot_track(ax, track_waypoints)
plot_best_position(ax, current_position, position, slope)

plt.grid(color='black', linestyle='-', linewidth=.5)
plt.show()

reward_function(get_test_params(current_position, -140))

TypeError: 'NoneType' object is not iterable