In [21]:
from math import floor
import random

In [22]:
# At t = 0, this is what the first 10 "meters" of road looks like. Traffic moves to the right so that 
# EXAMPLE_ROAD_SNAPSHOT[i][j] gives you whatever is in lane i at meter j
SLICE_ROAD_SNAPSHOT = [
    ["   ", "   ", "   ", "   ", "   ", "   ", "   ", "   ", "   ", "   ",], # lane 0, no cars
    ["   ", "   ", "   ", "   ", "   ", "002", "   ", "001", "   ", "   ",], # lane 1, cars 001 and 002
    ["   ", "   ", "   ", "   ", "   ", "   ", "   ", "   ", "   ", "   ",], # lane 2
    ["***", "   ", "   ", "   ", "   ", "   ", "   ", "   ", "   ", "   ",], # lane 3, ego
]

# This code populates a full length road (empty of vehicles)
ROAD_LENGTH = 150
NUM_LANES = 4
EMPTY_ROAD_SNAPSHOT = [
    ["   " for i in range(ROAD_LENGTH)] for j in range(NUM_LANES)
]

In [23]:
# This data predicts that vehicle 001 (moving at 10 m/s) will be overtaken
# by vehicle 002 
EXAMPLE_PREDICTIONS = {
    "reference_time" : 12,
    "horizon" : 10,
    "predictions" : [
        {
            "001" : [
                (1, 7),
                (1, 17),
                (1, 27),
                (1, 37),
                (1, 47),
                (1, 57),
                (1, 67),
                (1, 77),
                (1, 87),
                (1, 97),
                (1, 107),
            ]
        },
        {
            "002" : [
                (1, 5),
                (0, 16),
                (0, 27),
                (0, 38),
                (0, 49),
                (1, 60),
                (1, 71),
                (1, 82),
                (1, 93),
                (1, 104),
                (1, 115),
            ]
        }
    ]
}

# This tells us that our vehicle is in lane 3 at position 0 with velocity 5 m/s
EXAMPLE_LOCALIZATION = {
    "lane" : 3,
    "s"    : 0,
    "v"    : 5,
}

# Before running the trial the behavior module will get some configuration data.
# This data says the vehicle must never exceed a speed of 20 m/s, can't CHANGE 
# it's speed by more than 2 m/s per time step, and is trying to get to lane 3
# at meter marker 150
EXAMPLE_VEHICLE_CONFIG = {
    "max_v" : 20,
    "max_a" : 2, 
    "goal" : (3, 150)
}

EXAMPLE_PLANNER_OUTPUT = {
    "target_lane" : 0,
    "vehicle_to_follow" : 7,
    "target_speed" : None,
    "seconds_to_reach" : None,
}

In [187]:
class Road(object):
    speed_limit = 30
    max_a = 2
    
    def __init__(self, num_lanes, length):
        self.num_lanes = num_lanes
        self.length = length
        self.road = [[None for i in range(length)] for j in range(num_lanes)]
        self.t = 0
        self.vehicles = {}
        self.ego_key = "***"
        self.next_id = 1
        self.ego_added = False
        self.planner = None
    
    # PUBLIC METHODS
    def get_predictions(self):
        horizon = 10
        predictions = {}
        for _id, vehicle in self.vehicles.items():
            lane, s = vehicle['loc']
            speed = vehicle['v']
            points = [(lane, s+speed*i) for i in range(horizon+1)]
            predictions[_id] = points
        return {"timestamp" : self.t, 'horizon' : 10, 'predictions': predictions}
    
    def get_locations(self):
        if not ego_added: return None
        v = self.vehicles[-1]
        lane, s = v['loc']
        speed = v['v']
        return {
            "lane" : lane,
            "s" : s,
            "v" : speed
        }
    
    def get_global_data(self):
        return {
            "max_v" : self.speed_limit,
            "max_a" : self.max_a,
            "goal"  : (self.num_lanes-1, self.length-1)
        }
    
    def set_planner(self, planner):
        self.planner = planner
        
    # PRIVATE METHODS
    def add_car(self, lane, s, v, ego=False):
        assert 0 <= lane < self.num_lanes
        assert 0 <= s < self.length
        if ego:
            self.ego_added = True
            _id = -1
        else:
            _id = self.next_id 
            self.next_id += 1
        vehicle = {
            "loc" : (lane, s),
            "v"   : v,
            "id"  : _id
        }
        self.vehicles[_id] = vehicle
        self.road[lane][s] = _id
        
    def naive_update(self):
        empty_road = [[None for i in range(self.length)] for j in range(self.num_lanes)]
        for _id, veh in self.vehicles.items():
            v = veh['v']
            lane, s = veh['loc']
            s += v
            if s >= self.length:
                del self.vehicles[_id]
                continue
            empty_road[lane][s] =  _id
            self.vehicles[_id]['loc'] = (lane, s)
            self.road = empty_road
            self.t += 1
    
    def _obstructions_in_lane(self, lane_num, in_seconds=0):
        lane = self.road[lane_num]
        obstructions = []
        ids = []
        for i, cell in enumerate(lane):
            if cell and cell != -1:
                obstructions.append(i)
                ids.append(cell)
        return obstructions, ids
    
    def update_ego_with_plan(self, plan):
        EXAMPLE_PLANNER_OUTPUT = {
            "target_lane" : 0,
            "vehicle_to_follow" : 7,
            "target_speed" : None,
            "seconds_to_reach" : None,
        }
        print "updating ego with plan"
        ego = self.vehicles[-1]
        cur_lane, cur_s = ego['loc']
        cur_v = ego['v']
        NEXT_LANE = cur_lane
        NEXT_V = cur_v
        
        # handle lane change... check for blocked
        target_lane = plan['target_lane']
        if cur_lane != target_lane:
            "target lane {} different from current lane {}".format(target_lane, cur_lane)
            NEXT_LANE = cur_lane
            blocked = self.is_lane_blocked_for_motion(target_lane, cur_s, cur_s + self.speed_limit, 1)
            print "lane is blocked? {}".format(blocked)
            if not blocked:
                NEXT_LANE = target_lane
                print "set NEXT_LANE to {}".format(NEXT_LANE)
        
        # figure out how to adjust speed
        speed_for_lane = self.best_speed_for_lane(NEXT_LANE, cur_s)
        print "speed_for_lane for lane {} is {}".format(NEXT_LANE, speed_for_lane)
        if speed_for_lane > cur_v:
            PREFERRED_SPEED_ADJUST = min(speed_for_lane-cur_v, self.max_a)
        elif speed_for_lane < cur_v:
            PREFERRED_SPEED_ADJUST = max(speed_for_lane-cur_v, -self.max_a)
        else:
            PREFERRED_SPEED_ADJUST = 0
        
        # but maybe there's something to consider because of vehicle following
        S_OF_INTEREST = cur_s
        target_vehicle_id = plan['vehicle_to_follow']
        if target_vehicle_id != None:
            targ_veh = self.vehicles[target_vehicle_id]
            targ_s = targ_veh['loc'][1]
            targ_speed = targ_veh['v']
            dist = targ_s - 2 - cur_s
            closing = targ_speed - cur_v
            if dist == 0 : 
                TIME = 0
            elif closing == 0: 
                if targ_s > cur_s:
                    PREFERRED_SPEED_ADJUST = self.max_a
                else:
                    PREFERRED_SPEED_ADJUST = - self.max_a
            else:
                TIME = int(-float(dist) / closing)
                if TIME < 0:
                    if targ_s > cur_s:
                        PREFERRED_SPEED_ADJUST = self.max_a
                    else:
                        PREFERRED_SPEED_ADJUST = - self.max_a
                else:
                    PREFERRED_SPEED_ADJUST = min(self.max_a, TIME)
        print "PREFERRED SPEED ADJUST IS {}".format(PREFERRED_SPEED_ADJUST)
        self.move_vehicle(-1, NEXT_LANE, cur_s)
        self.vehicles[-1]['v'] += PREFERRED_SPEED_ADJUST
        self.vehicles[-1]['v'] = min(self.vehicles[-1]['v'], self.speed_limit)
        
#             S_OF_INTEREST = max(0, targ_s-1)
            
            
#         target_speed = self.best_speed_for_lane(NEXT_LANE, S_OF_INTEREST)
        
#         if target_vehicle_id != None:
#             targ_veh = self.vehicles[target_vehicle_id]
#             targ_veh_v = targ_veh['v']
            
        
#             delta = target_lane - cur_lane
#             lane = self.road[target_lane]
#             start_check = min(0, cur_s-50)
#             s_max = cur_s + self.max_a + 1
#             s_min = cur_s - self.max_a - 1
#             collides = False          
#             NEXT_LANE = cur_lane
#             for cell in lane[start_check : s_max]:
#                 if cell is None:
#                     continue
#                 vehicle = self.vehicles[cell]
#                 speed = vehicle['v']
#                 v_cur_s = vehicle['loc'][1]
#                 next_s = v_cur_s + speed
#                 if v_cur_s < cur_s and next_s >= s_min:
#                     collides = True
#                     break
#                 if v_cus_s >= cur_s and next_s <= s_max:
#                     collides = True
#                     break
#             if not collides:
#                 NEXT_LANE = target_lane

            
        
        
#         handle speed change
#         if cur_lane == target_lane:
#             lane = self.road[cur_lane]
#             start_check = min(0, cur_s-50)
#             for cell in lane[0:cur_s+50]:
                
#                 if cell is None:
#                     continue
#                 vehicle = self.vehicles[cell]
#                 other_speed = vehicle['v']
    
    def move_vehicle(self, v_id, lane_num, s):
        cur = self.vehicles[v_id]
        cur_lane, cur_s = cur['loc']
        self.road[cur_lane][cur_s] = None
        self.road[lane_num][s] = v_id
        self.vehicles[v_id]['loc'] = (lane_num, s)
    
    def __repr__(self):
        print_width = 36
        min_center = print_width / 2 - 1
        max_center = (self.length - 1) - min_center
        center = min_center
        if self.ego_added:
            s = self.vehicles[-1]['loc'][1]
            if s < min_center: s = min_center
            if s > max_center: s = max_center
            center = s
        width = print_width / 2
        L = center - width
        L = max(L, 0)
        R = center + width
        R = min(R, self.length)
        to_print = "___" * print_width + "\n"
        for i in range(self.num_lanes):
            lane = self.road[i]
            for cell in lane[L:R]:
                st = "   "
                if cell == -1:
                    st = self.ego_key
                elif type(cell) == type(3):
                    st = "%03d" % cell
                to_print += st
            to_print += "\n"
        to_print += "___" * print_width + "\n"
        return to_print
            
    def incremental_show(self):
        while True:
            print self
            self.naive_update()
            yield  
            
    def is_lane_blocked_for_motion(self, lane_num, from_s, to_s, t_f):
        lane = self.road[lane_num]
        predictions = self.get_predictions()
        S_BUFFER = 1
        def s(t): return int(from_s + float(to_s - from_s) / t_f * t)
        for _id, v_preds in predictions['predictions'].items():
            v_preds = v_preds[:t_f+1]
            t = 0
            try:
                now = v_preds.pop(0)
                while now[0] != lane_num:
                    now = v_preds.pop(0)
            except IndexError: # exhausted list
                continue
                
            last_s  = now[1]
            for p in v_preds:
                t += 1
                if p[0] != lane_num:
                    break
                cur_s = p[1]
                
                s_at_t = s(t)
                print "s at {} is {} for {}".format(t, s_at_t, _id) 
                if s_at_t - S_BUFFER <= cur_s <= s_at_t + S_BUFFER or (last_s < s_at_t + S_BUFFER and cur_s > s_at_t - S_BUFFER):
                    return True
#                 if from_s <= cur_s <= to_s or (last_s < from_s and cur_s > to_s):
#                     return True
                last_s = cur_s
        return False
    
    def best_speed_for_lane(self, lane_num, s):
        lane = self.road[lane_num]
        max_speed = self.speed_limit
        min_speed = 0
        start_check = max(0, s-10)
        for i,cell in enumerate(lane[start_check:s]):
            if not cell or cell == -1:
                continue
            vehicle = self.vehicles[cell]
            vehicle_v = vehicle['v']
            min_speed = vehicle_v
        for i,cell in enumerate(lane[s:s+10]):
            if not cell or cell == -1:
                continue
            vehicle = self.vehicles[cell]
            vehicle_v = vehicle['v']
            max_speed = vehicle_v
        if max_speed < min_speed:
            print "ERROR! Maximum safe speed is lower than minimum"
        return max_speed
        
            

# Exploration Below

In [188]:
DENSITY = 0.05 # 1 out of 20 cells should have a vehicle on it
LANE_SPEEDS = [4,3,3,2]
def simulate(road):
    num_cells = road.length * road.num_lanes
    desired_cars = int(num_cells * DENSITY)
    total_cars = len(road.vehicles)
    density = float(total_cars) / desired_cars
    while density < (DENSITY / 2.0):
        print density
        for i in range(road.num_lanes):
            if random.random() > 0.5:
                continue
            speed = LANE_SPEEDS[i]
            road.add_car(i, 0, speed)
        road.naive_update()
        total_cars = len(road.vehicles)
        density = float(total_cars) / desired_cars
    
    print("done prepping road... adding ego")
    road.add_car(3, 0, 2, True)
    while True and -1 in road.vehicles:
        print road
        road.naive_update()
        total_cars = len(road.vehicles)
        density = total_cars / desired_cars
        if density < DENSITY:
            for i in range(road.num_lanes):
                if random.random() > 0.3:
                    continue
                
                speed = LANE_SPEEDS[i]
                road.add_car(i, 0, speed)
        yield
    print "DONE!"
    
            

In [209]:
road = Road(4,150)

In [210]:
simulator = simulate(road)

In [211]:
next(simulator)

0.0
done prepping road... adding ego
____________________________________________________________________________________________________________
            001                                                                                          
         002                                                                                             
         003                                                                                             
***                                                                                                      
____________________________________________________________________________________________________________



In [208]:
road

____________________________________________________________________________________________________________
                                    011                     007                                          
                           012      009               005      004      003                              
014                        013                                                                     002   
015   ***               010   008   006                                                                  
____________________________________________________________________________________________________________

In [205]:
plan = {
            "target_lane" : 2,
            "vehicle_to_follow" : 2,
        }
road.update_ego_with_plan(plan)
road

updating ego with plan
speed_for_lane for lane 2 is 3
PREFERRED SPEED ADJUST IS 0


____________________________________________________________________________________________________________
               007                                                                     001                  
            005      004      003                                                                           
                                                      ***002                                                
                                                                                                            
____________________________________________________________________________________________________________

In [116]:
road.add_car(3, 0, 5, True)
road.add_car(2, 0, 6)
road.add_car(1, 0, 7)
road.add_car(0, 0, 8)

In [117]:
shower = road.incremental_show()

In [135]:
next(shower)

center is 85
showing from 67 to 103
____________________________________________________________________________________________________________
                                                                                                            
                                                                                                            
                                                                                                         001
                                                      ***                                                   
____________________________________________________________________________________________________________

1 at lane 2 pos 108
2 at lane 1 pos 126
3 at lane 0 pos 144
-1 at lane 3 pos 90


In [213]:
min([2])

2