In [1]:
import math
import unittest
import numpy as np


def translate(new_center):
    return np.array([
            [1, 0, -new_center[0]],
            [0, 1, -new_center[1]],
            [0, 0, 1]])


def rotate(alpha):
    return np.array([
            [ np.cos(alpha), np.sin(alpha), 0],
            [-np.sin(alpha), np.cos(alpha), 0],
            [ 0,             0,             1]])


class CoordinateSystem:
    def __init__(self, center, alpha):
        self.center = center
        self.alpha = alpha
        self.transform = rotate(alpha) @ translate(center)
    
    def global_to_local(self, pt):
        pt_g = np.array([pt[0], pt[1], 1])
        pt_l = self.transform @ pt_g
        return tuple(pt_l[:-1])


class CoordinateSystemTest(unittest.TestCase):
    
    def test_translation_only(self):
        local_system = CoordinateSystem((1, 1), 0)
        global_point = (3, 2)
        local_point = local_system.global_to_local(global_point)
        self.assertEqual((2, 1), local_point)
        
    def test_rotation_only(self):
        local_system = CoordinateSystem((0, 0), math.pi / 4)
        global_point = (1, 1)
        local_x, local_y = local_system.global_to_local(global_point)
        self.assertAlmostEqual(np.sqrt(2), local_x)
        self.assertAlmostEqual(0, local_y)
        
    def test_rotation_and_translation(self):
        local_system = CoordinateSystem((1, 1), math.pi / 4)
        global_point = (2, 2)
        local_x, local_y = local_system.global_to_local(global_point)
        self.assertAlmostEqual(np.sqrt(2), local_x)
        self.assertAlmostEqual(0, local_y)

In [2]:
class BicycleModel: 
    def __init__(self, frame_length):
        self.frame_length = frame_length
    
    def predict_trajectory(self, velocity, steer_angle, dt=0.2, n_points=50):
        states = [(0.0, 0.0, 0.0)]
        for _ in range(n_points-1):
            next_state = self._next_state(states[-1], velocity, steer_angle, dt)
            states.append(next_state)
        return [(x, y) for x, y, _ in states]
    
    def _next_state(self, state, velocity, steer_angle, dt):
        x, y, psi = state
        x_next = x + velocity * np.cos(psi) * dt
        y_next = y + velocity * np.sin(psi) * dt
        psi_next = psi + velocity / self.frame_length * steer_angle * dt
        return (x_next, y_next, psi_next)
    
    
class BicycleModelTest(unittest.TestCase):
    
    def test_no_motion(self):
        model = BicycleModel(frame_length=1)
        trajectory = model.predict_trajectory(velocity=0, steer_angle=0, dt=0.1, n_points=2)
        self.assertEqual([(0, 0), (0, 0)], trajectory)
        
    def test_linear_motion(self):
        model = BicycleModel(frame_length=1)
        trajectory = model.predict_trajectory(velocity=2, steer_angle=0, dt=0.1, n_points=2)
        self.assertEqual([(0, 0), (0.2, 0)], trajectory)
        
    def test_make_cicrle(self):
        model = BicycleModel(frame_length=2)
        one_degree_steer = math.pi / 180
        trajectory = model.predict_trajectory(velocity=2, steer_angle=one_degree_steer, dt=0.1, n_points=3601)
        last_x, last_y = trajectory[-1]
        self.assertAlmostEqual(0, last_x)
        self.assertAlmostEqual(0, last_y)
        
unittest.main(argv=[''], exit=False)        

......
----------------------------------------------------------------------
Ran 6 tests in 0.020s

OK


<unittest.main.TestProgram at 0x7ffbf906c520>

In [3]:
import re
import json
import traceback as tb

from simple_websocket_server import WebSocketServer, WebSocket

def mph2mps(speed):
    return 1609.34 * speed / 3600

class TelemetryPackage:
    def __init__(self, data, timestamp):
        self.timestamp = timestamp
        self.center = (data['x'], data['y'])
        self.heading = data['psi']
        self.velocity = mph2mps(data['speed'])
        self.steering_angle = -data['steering_angle']
        self.trajectory = self._to_vehicle_frame(data['ptsx'], data['ptsy'])
        
    def _to_vehicle_frame(self, xx, yy):
        vehicle_frame = CoordinateSystem(self.center, self.heading)
        points = [(x, y) for x, y in zip(xx, yy)]    
        return [vehicle_frame.global_to_local(pt) for pt in points]
    
    
class CommandPackage: 
    steering_angle_bound = np.deg2rad(25.0)
    
    def __init__(self):
        self.throttle = 0
        self.steering_angle = 0
        self.planned_trajectory = [(0, 0)]
        self.predicted_trajectory = [(0, 0)]
        
    def data(self):
        next_x, next_y = self._unzip(self.planned_trajectory)
        mpc_x, mpc_y = self._unzip(self.predicted_trajectory)
        return {
            'throttle': self.throttle, 
            'steering_angle': -self._calc_steering_value(), 
            'mpc_x': mpc_x,
            'mpc_y': mpc_y,
            'next_x': next_x, 
            'next_y': next_y
        }
        
    def to_json(self):
        return json.dumps(self.data())
    
    def _calc_steering_value(self):
        value = self.steering_angle / self.steering_angle_bound
        return np.sign(value) * min(abs(value), 1)
    
    def _unzip(self, tuples):
        return tuple(zip(*tuples))
    
    
class CommandPackageTest(unittest.TestCase):
    
    def setUp(self):
        self.command = CommandPackage()
    
    def test_zero(self):
        self.command.steering_angle = 0
        
        data = self.command.data()
        self.assertEqual(0, data['steering_angle'])
        
    def test_upper_bound(self):
        boundary_value = np.deg2rad(25.0)
        
        self.command.steering_angle = boundary_value
        data = self.command.data()
        self.assertEqual(-1, data['steering_angle'])
        
        self.command.steering_angle = boundary_value + 1
        data = self.command.data()
        self.assertEqual(-1, data['steering_angle'])
        
    def test_lower_bound(self):
        boundary_value = np.deg2rad(-25.0)
        
        self.command.steering_angle = boundary_value
        data = self.command.data()
        self.assertEqual(1, data['steering_angle'])
        
        self.command.steering_angle = boundary_value - 1
        data = self.command.data()
        self.assertEqual(1, data['steering_angle'])
        
    def test_scale_value(self):
        value = np.deg2rad(12.5)
        
        self.command.steering_angle = value
        data = self.command.data()
        self.assertEqual(-0.5, data['steering_angle'])
        
        self.command.steering_angle = -value
        data = self.command.data()
        self.assertEqual(0.5, data['steering_angle'])

unittest.main(argv=[''], exit=False)

..........
----------------------------------------------------------------------
Ran 10 tests in 0.024s

OK


<unittest.main.TestProgram at 0x7ffbf906a4c0>

In [4]:
import time 

class SimulatorClient(WebSocket):
    incoming_pattern = re.compile('42\["telemetry",\s*(.*)\s*\]')
    manual_command = '42["manual",{}]'
    steer_template = '42["steer",%s]'
    
    def __init__(self, server, sock, address, controller):
        super().__init__(server, sock, address)
        self.controller = controller
        self.start_timestamp = None
    
    def handle(self):
        try: 
            telemetry = self._receive_telemetry(self.data)
            if not telemetry: 
                self._send_manual()
            else:
                command = CommandPackage()
                command.planned_trajectory = telemetry.trajectory
                self.controller.process(telemetry, command)
                self._send_streer(command)
        except Exception as ex:
            tb.print_exc()
    
    def _receive_telemetry(self, data):
        match_data = self.incoming_pattern.match(data)
        if not match_data: return None
        
        if not self.start_timestamp: 
            self.start_timestamp = time.time()
        timestamp = time.time() - self.start_timestamp
        telemetry_data = json.loads(match_data.group(1))
        return TelemetryPackage(telemetry_data, timestamp)
    
    def _send_manual(self):
        self.send_message(self.manual_command)
        
    def _send_streer(self, command):
        response = self.steer_template % command.to_json()
        self.send_message(response)
        

class UdacitySimulator: 
    
    def __init__(self, controller):
        self.controller = controller
    
    def run(self):
        server = WebSocketServer('', 4567, self._make_client)
        try:
            server.serve_forever()
        except KeyboardInterrupt: 
            server.close()
            
    def _make_client(self, server, sock, address):
        return SimulatorClient(server, sock, address, self.controller)
            

## Pure pursuit controller

In [5]:
def squared_distance_to(x, y):
    return x*x + y*y

class PurePursuitController: 
    
    def __init__(self, frame_length, lookahead_distance=3, use_interp=True):
        self.frame_length = frame_length
        self.use_interp = use_interp
        self.lookahead_distance = lookahead_distance
    
    def process(self, telemetry, command):
        if self.use_interp: 
            interp_trajectory = self._interpolate(telemetry.trajectory)
        else:
            interp_trajectory = telemetry.trajectory

        lookahead_point = self._get_lookahead_point(interp_trajectory)
        self._steer_to(lookahead_point, command)
    
    def _steer_to(self, pt, command): 
        x, y = pt
        squared_distance = squared_distance_to(x, y)
        tangent_steer = 2 * self.frame_length * y / squared_distance
        command.steering_angle = np.arctan(tangent_steer)
        command.lookahead_distance = np.sqrt(squared_distance)
    
    def _get_lookahead_point(self, trajectory):
        points_ahead = [(x, y) for x, y in trajectory if self._is_beyond_lookahead(x, y)]
        return points_ahead[0]
    
    def _is_beyond_lookahead(self, x, y):
        distance = math.sqrt(squared_distance_to(x, y))
        return x > 0 and distance > self.lookahead_distance
    
    def _interpolate(self, trajectory):
        xp, yp = tuple(zip(*trajectory))
        xx = list(range(50))
        yy = np.interp(xx, xp, yp)
        return zip(xx, yy)


## Helper controller classes

In [6]:
class CompositeController:
    def __init__(self, *controllers):
        self.controllers = controllers
        
    def process(self, telemetry, command):
        for c in self.controllers:
            c.process(telemetry, command)
            
def rmse(data):
    return np.sqrt(np.sum(np.square(data)) / len(data))

            
class CteTracker:
    def __init__(self, max_track_time=None):
        self.max_track_time = max_track_time
        self.tt = []
        self.cte = []
        self.steering_angles = []
        
    def process(self, telemetry, command):        
        timestamp = telemetry.timestamp
        if self.max_track_time and timestamp > self.max_track_time:
            raise Exception("Stop tracking")
            
        self.tt.append(timestamp)
        self.cte.append(self._get_cte(telemetry))
        self.steering_angles.append(command.steering_angle)
        
    def rmse(self):
        return np.sqrt(np.sum(np.square(self.cte)) / len(self.cte))
        
    def _get_cte(self, telemetry):
        (x1, y1), (x2, y2) = telemetry.trajectory[0:2]
        return ((x2 - x1) * y1 - (y2 - y1) * x1) / np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)


In [7]:
class PathMapper:
    n_points = 10
    ahead_distance = 50.0
    
    def __init__(self, vehicle_model):
        self.vehicle_model = vehicle_model
            
    def process(self, telemetry, command):
        time_horizon = self.ahead_distance / max(telemetry.velocity, 0.01)
        dt = time_horizon / self.n_points
        command.predicted_trajectory = self.vehicle_model.predict_trajectory(
            telemetry.velocity, telemetry.steering_angle, dt=dt, n_points=self.n_points)


## Throttle controller

In [8]:
class ConstantThrottleController:
    def __init__(self, throttle=0.1):
        self.throttle = throttle
        
    def process(self, telemetry, command):
        command.throttle = self.throttle

## Run simulator with different lookahead distances

In [None]:
def make_controller(lookahead_distance):
    vehicle_model = BicycleModel(frame_length=2.67)
    tracker = CteTracker(max_track_time=90)
    controller = CompositeController(
        PurePursuitController(frame_length=vehicle_model.frame_length, lookahead_distance=lookahead_distance),
        ConstantThrottleController(throttle=0.1),
        PathMapper(vehicle_model),
        tracker)
    return controller, tracker

ld_to_try = [30, 10, 5, 2, 0]
results = {}

for ld in ld_to_try:
    print("Trying distance: ", ld)
    controller, tracker = make_controller(ld)
    sim = UdacitySimulator(controller)
    sim.run()
    print("Recording results for ", ld)
    results[ld] = tracker

In [None]:
fig, axx = plt.subplots(len(results), 1, sharex=True)

for d, ax in zip(results.keys(), axx):
    ax.spines['left'].set_position('zero')
    ax.spines['bottom'].set_position('zero')
    ax.spines['top'].set_color('none')
    ax.spines['right'].set_color('none')

    tracker = results[d]
    metric = tracker.cte
    line, = ax.plot(tracker.tt, metric, color='tab:blue')
    line.set_label('lookahead_distance = %d, RMSE = %.2f' % (d, rmse(metric)))
    ax.set_ylim(-1, 1)
    ax.legend(loc='upper right')
    
fig.suptitle('Cross-track for different look-ahead distances')
fig.tight_layout()


In [None]:
fig, axx = plt.subplots(len(results), 1, sharex=True)

for d, ax in zip(results.keys(), axx):
    ax.spines['left'].set_position('zero')
    ax.spines['bottom'].set_position('zero')
    ax.spines['top'].set_color('none')
    ax.spines['right'].set_color('none')

    tracker = results[d]
    metric = np.degrees(tracker.steering_angles)
    line, = ax.plot(tracker.tt, metric, color='tab:green')
    line.set_label('lookahead_distance = %d, RMSE = %.2f' % (d, rmse(metric)))
    ax.set_ylim(-3, 10)
    ax.legend(loc='upper right')
    
fig.suptitle('Steering angles for different look-ahead distances')
fig.tight_layout()


## Run simulator

In [None]:
def make_controller(lookahead_distance):
    vehicle_model = BicycleModel(frame_length=2.67)
    tracker = CteTracker()
    controller = CompositeController(
        PurePursuitController(frame_length=vehicle_model.frame_length, lookahead_distance=lookahead_distance),
        ConstantThrottleController(throttle=0.2),
        PathMapper(vehicle_model),
        tracker)
    return controller, tracker

controller, tracker = make_controller(10)
sim = UdacitySimulator(controller)
sim.run()
