# Traffic Simulation

In [24]:
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as sts
import random


In [25]:
import matplotlib.animation as ani

print(ani.writers.list())


def make_animation(sim, total_frames, steps_per_frame=1, interval=100):
    '''Creates an animation of the given simulator.'''
    
    def update(frame_number):
        for _ in range(steps_per_frame):
            sim.update()
        progress_bar.update(1)
        return [sim.observe(steps_per_frame)]

    sim.initialize()
    progress_bar = tqdm(total=total_frames)
    plt.rcParams['animation.ffmpeg_path'] = '/opt/homebrew/bin/ffmpeg'


    Writer = ani.writers['ffmpeg']
    animation = FuncAnimation(
        sim.figure, update, init_func=lambda: [], frames=total_frames, interval=interval, blit=True)
    output = HTML(animation.to_html5_video())
    sim.figure.clf()

    return output

# make a grid filled with -1s where the lanes are supposed to be


class Lane:
    def __init__(self, start_point, direction, movement, prob_of_car=0.9, prob_turn = 0.1):
        self.start_point = start_point
        self.direction = direction
        self.movement = movement
        self.cars = []
        self.prob_of_car = prob_of_car
        self.car_counter = 0
        self.car_exit_counter = 0
        self.prob_turn = prob_turn

        self.average_traffic_flow = []

        

    def add_cars(self, grid):
        if grid[self.start_point[0], self.start_point[1]] == -1:
            if random.random() < self.prob_of_car:
                grid[self.start_point[0], self.start_point[1]] = 0
                self.cars.append(Car(self, 0))
                self.car_counter += 1

    def move_cars(self, grid, sim):
        for car in self.cars:
            car.move(grid, sim)

    def update(self, grid, sim):

        if sim.step_counter <= sim.add_until:
            self.add_cars(grid) # only adds cars until a certain point
        self.move_cars(grid, sim)
        
        if self.direction == 'ns' or self.direction == 'sn':
            self.road_length = grid.shape[0]
        else:
            self.road_length = grid.shape[1]

        self.average_traffic_flow.append(sum([car.speed for car in self.cars]) / self.road_length)



class Car:
    def __init__(self, lane, speed, max_speed=5, commentary='None'):
        self.lane = lane
        self.position = lane.start_point
        self.direction = lane.direction
        self.movement = lane.movement
        self.speed = speed
        self.max_speed = max_speed

        self.distance_to_car = 0
        self.distance_to_intersection = 0
        self.distance_to_boundary = 0

        self.turning = False

        self.commentary = commentary


    def move(self, grid, sim):
        """Updates information necessary for motion and then changes the speed and position of the car."""

        self.update_speed(grid, sim)
        self.update_position(grid, sim)

    def update_speed(self, grid, sim):
        """Updates the speed of the car based on the lights and intersection distance."""

        self.distance_to_intersection = self.check_distance_to_intersection(grid, sim)
        self.distance_to_car = self.check_distance_to_car(grid, sim)
        self.distance_to_boundary = self.check_distance_to_boundary(grid, sim)

        if sim.traffic_light() == 'red':
            # red light means green light for horizontal lanes
            if self.direction == 'ew' or self.direction == 'we' or self.turning == True:
                self.distance_to_intersection = float('inf') # Ignore the distance to the intersection

        else:
            # green light means green light for vertical lanes
            if self.direction == 'ns' or self.direction == 'sn' or self.turning == True:
                self.distance_to_intersection = float('inf')    


        # All cars continue updating their speed with new information in mind.
        self.continue_updating_speed(grid, sim) 
                
    
    def continue_updating_speed(self, grid, sim):
        """Cars update the speed given the closest object in front: car in front, boundary, maximum speed, intersection."""

        distance = min(self.distance_to_car, self.distance_to_intersection, self.max_speed)
        
        if self.speed >= 0:

            # acceleration
            if (self.speed + 1 <= self.max_speed):
                self.speed = min(self.speed + 1, self.max_speed)
            
            # deceleration
            if self.speed >= distance:
                self.speed = max(0, distance - 1)

            # random slow down
            if self.speed > 0:
                if random.random() < sim.prob_slow_down:
                    self.speed -= 1


    def update_position(self, grid, sim):
        """Cars advance on the grid."""

        # remove car from current position
        x, y = self.position 
        grid[x, y] = -1

        
        # iterate through the next possible steps to catch if the car is turning
        next_x, next_y = x, y
        for _ in range(1, self.speed + 1):
            next_x += self.movement[0]
            next_y += self.movement[1]

            self.position = (next_x, next_y)
            
            # exit if at the boundary
            if self.position == sim.boundary_dict[self.direction]:
                self.exit(grid)
                return

            # turn at the intersection corresponding to that direction
            if self.position == sim.intersection_dict[self.direction]:

                # makes sure the car is not double-turning
                if self.turning == False:
                    self.turn(grid, sim)
                    if self.turning == True:
                        self.update_speed(grid, sim)
                        self.update_position(grid, sim)
                
                if self.turning == True:
                    return

            
        grid[next_x, next_y] = self.speed

    def turn(self, grid, sim):
        """
        Change the car's attributes to reflect a turn to the right.
        """
        
        prob_turn = self.lane.prob_turn

        if random.random() < prob_turn:
            
            # get new direction from the  dictionary
            direction = self.direction
            self.direction = sim.turn_dict[direction]

            # changes lanes
            self.lane.cars.remove(self)
            self.lane.car_exit_counter += 1

            self.lane = [lane for lane in sim.lanes if lane.direction == self.direction][0]
            self.lane.cars.append(self)
            self.lane.car_counter += 1

            # the new movement would be that of the new lane
            self.movement = self.lane.movement
            self.turning = True
            
        
    
    def check_distance_to_boundary(self, grid, sim):
        """
        Checks the distance to the exit from the car's lane.
        """
        x, y = self.position
        dx = self.movement[0]
        dy = self.movement[1]


        db = 0
        
        boundary_coords = sim.boundary_dict[self.direction]


        for i in range(1, self.max_speed + 1):
            next_coord = (x + dx * i, y + dy * i)

            db += 1
            if next_coord != boundary_coords:
                if next_coord[0] < 0 or next_coord[0] >= grid.shape[0] or next_coord[1] < 0 or next_coord[1] >= grid.shape[1]:
                    return db

        return db
    

    def check_distance_to_intersection(self, grid, sim):
        """
        Checks the distance to the next intersection on that car's lane.
        """

        x, y = self.position
        dx = self.movement[0]
        dy = self.movement[1]

        di = 0

        intersection_coord =  sim.intersection_dict[self.direction]

        for i in range(1, self.max_speed + 1):
            next_coord = ((x + dx * i) % grid.shape[0], (y + dy * i) % grid.shape[1])

            di += 1
            if next_coord == intersection_coord:
                return di
        return di
        
    def check_distance_to_car(self, grid, sim):
        """
        Checks distance to the car in front.
        """
        
        x, y = self.position
        dx = self.movement[0]
        dy = self.movement[1]
        
        dc = 0

        for i in range(1, self.max_speed + 1):
            # checks the possible range of motion
            next_pos = grid[(x + dx * i) % grid.shape[0], (y + dy * i) % grid.shape[1]]
            dc += 1

            if next_pos > -1 and (next_pos < self.max_speed + 1):
                return dc
            
        return dc
        
    def exit(self, grid):
        x, y = self.position
        grid[x, y] = -1

        self.lane.cars.remove(self)
        self.lane.car_exit_counter += 1
        return


def initialize_grid(width, height):

    grid = np.full((height, width), -2)

    sn_column = grid[:, width//2]
    ns_column = grid[:, width//2 - 1]
    we_column = grid[height//2, :]
    ew_column = grid[height//2 - 1, :]

    # fill in the columns above
    sn_column[::] = -1
    ns_column[::] = -1
    we_column[::] = -1
    ew_column[::] = -1
    
    return grid

def get_start_points(grid):
    # get the starting point of each lane
    sn_start = (grid.shape[0] - 1, grid.shape[1]//2)
    ns_start = (0, grid.shape[1]//2 - 1)
    we_start = (grid.shape[0]//2 - 1, grid.shape[1] - 1)
    ew_start = (grid.shape[0]//2, 0)
    return [sn_start, ns_start, we_start, ew_start]


class Simulation:

    def __init__(self, width, height, red_length = 10, green_length = 10, 
                 prob_slow_down=0.25,
                 prob_sn_car=0.3, prob_ns_car=0.3, prob_we_car=0.3, prob_ew_car=0.3,
                 prob_sn_turn=0.5, prob_ns_turn=0.5, prob_we_turn=0.5, prob_ew_turn=0.5, 
                 add_until=25, alternate=False, show_figure=True):
        
        self.grid = initialize_grid(width, height)
        self.add_until = add_until
        self.prob_slow_down = prob_slow_down


        sn_start, ns_start, we_start, ew_start = get_start_points(self.grid)

        self.prob_sn_car = prob_sn_car
        self.prob_ns_car = prob_ns_car
        self.prob_we_car = prob_we_car
        self.prob_ew_car = prob_ew_car

        self.prob_sn_turn = prob_sn_turn
        self.prob_ns_turn = prob_ns_turn
        self.prob_we_turn = prob_we_turn
        self.prob_ew_turn = prob_ew_turn

        self.show_figure = show_figure

        sn_lane = Lane(sn_start, 'sn', [-1, 0], self.prob_sn_car, self.prob_sn_turn)
        ns_lane = Lane(ns_start, 'ns', [1, 0], self.prob_ns_car, self.prob_ns_turn)
        we_lane = Lane(we_start, 'we', [0, -1], self.prob_we_car, self.prob_we_turn)
        ew_lane = Lane(ew_start, 'ew', [0, 1], self.prob_ew_car, self.prob_ew_turn)
        self.lanes = [sn_lane, ns_lane, we_lane, ew_lane]


        self.intersection_dict = {'we' : (height//2 - 1, width//2),
                                  'sn' : (height//2, width//2),
                                  'ew' : (height//2, width//2 - 1),
                                  'ns' : (height//2 - 1, width//2 - 1)}
            
        self.turn_dict = {'ns': 'we', 'ew': 'ns', 'sn' : 'ew', 'we' : 'sn'}

        self.boundary_dict = {'sn': (0, width//2),
                              'ns': (height - 1, width//2 - 1),
                              'we': (height//2 - 1, 0),
                              'ew': (height//2, width - 1)}


        self.step_counter = 0
        self.max_speed = 5

        self.alternate = alternate

        if alternate:
            self.alternate_traffic_light_length()
        else:
            self.red_length = red_length
            self.green_length = green_length


    def alternate_traffic_light_length(self):
        """
        Makes traffic lights proportional to the density.
        """
        horizontal_density = max(self.prob_we_car, self.prob_ew_car)
        vertical_density = max(self.prob_sn_car, self.prob_ns_car)
        
        total_time = 60

        # red lights are for the horizontal intersections to cross
        self.red_length = int(total_time * horizontal_density / (horizontal_density + vertical_density))
        # green lights are for the vertical intersections to cross
        self.green_length = int(total_time * vertical_density / (horizontal_density + vertical_density))



    def initialize(self):
        if self.show_figure:
            self.figure, self.axes = plt.subplots()
    
    def observe(self, step):
        # clear the axes
        self.axes.clear()
        
        plot = self.axes.imshow(
            self.grid, interpolation='nearest', vmin = -2, vmax = self.max_speed + 1)
        self.axes.set_title(f'State at step {self.step_counter}')
        # show the traffic lights at each step
        
        if self.traffic_light() == 'red':
            self.axes.text(0, 0, 'RED', color='red', fontsize=20)
        else:
            self.axes.text(0, 0, 'GREEN', color='green', fontsize=20)
        
        if self.step_counter == step:
            self.figure.colorbar(plot)
        return plot
    
    def update(self):
        for lane in self.lanes:
            lane.update(self.grid, self)
        self.step_counter += 1

    def simulate(self, steps):
        self.show_figure = False
        self.initialize()
        
        for _ in range(steps):
            self.update()

    def traffic_light(self):
        if self.step_counter % (self.red_length + self.green_length) < self.red_length:
            return 'red'
        else:
            return 'green'
        
    def test_number_enter_exit(self):
        # only if the grid contains all numbers <= -1
        if np.all(self.grid <= -1):
            for lane in self.lanes:
                print(lane.car_counter == lane.car_exit_counter)
        else:
            print('Grid is not cleared.')


height, width = 20, 20


sim_everyone = Simulation(height, width, red_length=10, green_length=10, prob_ns_car=1, prob_sn_car=1, prob_ew_car=1, prob_we_car=1, add_until=25)

sim_only_v = Simulation(height, width, red_length=10, green_length=10, prob_ns_car=1, prob_sn_car=1, prob_ew_car=0, prob_we_car=0)
sim_only_h = Simulation(height, width, red_length=10, green_length=10, prob_ns_car=0, prob_sn_car=0, prob_ew_car=1, prob_we_car=1)

sim_only_ns = Simulation(height, width, red_length=10, green_length=10, prob_ns_car=1, prob_sn_car=0, prob_ew_car=0, prob_we_car=0)
sim_only_ew = Simulation(height, width, red_length=10, green_length=10, prob_ns_car=0, prob_sn_car=0, prob_ew_car=1, prob_we_car=0)
sim_only_sn = Simulation(height, width, red_length=10, green_length=10, prob_ns_car=0, prob_sn_car=1, prob_ew_car=0, prob_we_car=0)
sim_only_we = Simulation(height, width, red_length=10, green_length=10, prob_ns_car=0, prob_sn_car=0, prob_ew_car=0, prob_we_car=1)

make_animation(sim_everyone, 10, steps_per_frame=1, interval=100)

['pillow', 'ffmpeg', 'ffmpeg_file', 'html']


100%|██████████| 10/10 [00:00<00:00, 13.30it/s]

<Figure size 640x480 with 0 Axes>

['pillow', 'ffmpeg', 'ffmpeg_file', 'html']


## Test cases


### Traffic lights

The following code implements a traffic light at the middle of the road. The light is green for 10 time steps and red for 10 time steps. SN- and NS-bound cars can only cross the intersection when the light is green; EW- and WE-bound cars can only cross when the light is red. Otherwise, they should form a queue in front of the intersection.

The first two code cells simulate the queueing behavior when either horizontal- or vertical-bound cars have priority. The third cell shows the queueing behavior when both directions are possible.

The third cell is run until the grid is cleared out in order to test whether the number of cars entering and exiting the system under the normal model is the same given a simple simulation with only traffic lights enabled. As can be seen, for each lane, the number of cars entering the system is the same as the number of cars exiting the system.

The fourth cell ensures the traffic light behavior is observed even for the alternate strategy.

To observe queueing behavior in both lanes, the turning has been disabled.

In [26]:
# only EW, WE cars can pass
height, width = 20, 20
sim_h_green = Simulation(height, width, red_length=10, green_length=0, 
                         prob_ew_turn=0, prob_we_turn=0, prob_ns_turn=0, prob_sn_turn=0)
make_animation(sim_h_green, 25, steps_per_frame=1, interval=100)



<Figure size 640x480 with 0 Axes>

In [27]:
# only SN, NS cars can pass
sim_v_green = Simulation(height, width, red_length=0, green_length=10,
                        prob_ew_turn=0, prob_we_turn=0, prob_ns_turn=0, prob_sn_turn=0)

make_animation(sim_v_green, 25, steps_per_frame=1, interval=100)


100%|██████████| 10/10 [00:20<00:00, 13.30it/s]




[A[A[A


[A[A[A


[A[A[A


[A[A[A


100%|██████████| 10/10 [23:49:55<00:00, 8579.55s/it]
100%|██████████| 10/10 [00:21<00:00,  2.16s/it]
100%|██████████| 25/25 [00:05<00:00,  4.29it/s]



[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A

<Figure size 640x480 with 0 Axes>

In [29]:
# simulation clears out
height, width = 20, 20
sim_clear = Simulation(height, width, red_length=10, green_length=10)

make_animation(sim_clear, 50, steps_per_frame=1, interval=100)

  0%|          | 0/10 [23:50:25<?, ?it/s]6it/s]
100%|██████████| 25/25 [00:10<00:00,  2.40it/s]
100%|██████████| 50/50 [00:03<00:00, 15.50it/s]

<Figure size 640x480 with 0 Axes>

In [30]:
# Ensures that the number of cars leaving an entering the system is the same
sim_clear.test_number_enter_exit()

True
True
True
True


In [31]:
# simulation clears out
height, width = 20, 20
sim_clear_alternate = Simulation(height, width, red_length=10, green_length=10, alternate=True)

make_animation(sim_clear_alternate, 50, steps_per_frame=1, interval=100)

100%|██████████| 50/50 [00:08<00:00,  5.76it/s]


<Figure size 640x480 with 0 Axes>

In [None]:
# Ensures that the number of cars leaving an entering the system is the same
sim_clear_alternate.test_number_enter_exit()

### Turning cars

The four code cells below demonstrate the turning behavior. 

In the first case, the car from a particular lane, in this case, the NS lane has a 100% chance of turning. As observed, it turns right when its light is green and continues moving down the lane. In that lane, cars slow down or speed up according to the rules of the Nagel & Schreckenberg model but are not affected by the traffic lights behind them, as expected in real-world traffic.

In the second case, the NS lane has a 50% chance of turning. As observed, the car can either turn or continue down its lane. In the third code cell, The NS and the WE lanes have a 50% of turning. The cars follow the expected patterns and obey traffic light laws for their respective lanes. Additionally, after the simulation is ran until clearing, for each lane in the simulation, the number of cars entering it is equal to the number of cars exiting, given the addition of turning.

Finally, the fourth cell ensures that the turning behavior is preserved even under the alternate strategy.

In [32]:
# only one lane turns
height, width = 20, 20
sim_one_turn = Simulation(height, width, red_length=10, green_length=10,
                          prob_ew_car=0, prob_we_car=0, prob_ns_car=0.5, prob_sn_car=0,
                          prob_ew_turn=0, prob_we_turn=0, prob_ns_turn=1, prob_sn_turn=0)

make_animation(sim_one_turn, 25, steps_per_frame=1, interval=100)

 96%|█████████▌| 24/25 [00:01<00:00, 15.88it/s]

<Figure size 640x480 with 0 Axes>

In [33]:
# only one lane sometimes turns
height, width = 20, 20
sim_one_turn = Simulation(height, width, red_length=10, green_length=10,
                            prob_ew_car=0, prob_we_car=0, prob_ns_car=0.5, prob_sn_car=0,
                            prob_ew_turn=0, prob_we_turn=0, prob_ns_turn=0.5, prob_sn_turn=0)

make_animation(sim_one_turn, 25, steps_per_frame=1, interval=100)


[A
[A
[A
[A
[A
[A
[A
[A
100%|██████████| 50/50 [00:08<00:00,  6.24it/s]
100%|██████████| 25/25 [00:02<00:00,  8.65it/s]

[A
[A
[A
[A

<Figure size 640x480 with 0 Axes>

In [34]:
# two lanes sometimes turn
height, width = 20, 20
sim_two_turn = Simulation(height, width, red_length=10, green_length=10,
                            prob_ew_car=0, prob_sn_car=0, prob_ns_car=0.3, prob_we_car=0.3,
                            prob_ew_turn=0, prob_sn_turn=0, prob_ns_turn=0.5, prob_we_turn=0.5)

make_animation(sim_two_turn, 50, steps_per_frame=1, interval=100)

100%|██████████| 50/50 [00:03<00:00, 15.84it/s]

<Figure size 640x480 with 0 Axes>

In [35]:
# Check that after running the simulation, the number of cars entering and leaving the system is the same
# even despite the turns
sim_two_turn.test_number_enter_exit()

print(sim_two_turn.lanes[2].average_traffic_flow)

Grid is not cleared.
[0.0, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.15, 0.2, 0.25, 0.2, 0.15, 0.2, 0.4, 0.0, 0.1, 0.3, 0.1, 0.1, 0.2, 0.0, 0.0, 0.1, 0.35, 0.2, 0.3, 0.5, 0.1, 0.35, 0.55, 0.3, 0.2, 0.25, 0.05, 0.1, 0.15, 0.0, 0.0, 0.0, 0.0, 0.05, 0.2, 0.25, 0.3, 0.05, 0.05, 0.15, 0.2, 0.05, 0.1]



100%|██████████| 50/50 [00:23<00:00, 15.84it/s]

In [None]:
# Ensures behavior for alternate strategy
sim_two_turn_alternate = Simulation(height, width, red_length=10, green_length=10, alternate=True,
                            prob_ew_car=0, prob_sn_car=0, prob_ns_car=0.3, prob_we_car=0.3,
                            prob_ew_turn=0, prob_sn_turn=0, prob_ns_turn=0.5, prob_we_turn=0.5)

make_animation(sim_two_turn_alternate, 100, steps_per_frame=1, interval=100)

In [None]:
sim_two_turn_alternate.test_number_enter_exit()

## Jingmei-Shunan Road

The code cell below visualizes the data for the Jingmei-Shunan road.

In [None]:
measured_height = 212.59
measured_width = 232.56

average_car_length = 4.53

height, width = int(measured_height/average_car_length), int(measured_width/average_car_length)

prob_Shunan_S = 29 / width
prob_Shunan_N = 24 / width
prob_Jingmei_W = 6 / height
prob_Jingmei_E = 4 / height

prob_turn_S = 3 / 4
prob_turn_N = 4 / 6
prob_turn_W = 2 / 29
prob_turn_E = 6 / 24

steps = 100

Shunan_red = 30
Jingmei_red = 90

sim_JS_road = Simulation(height, width, red_length=Shunan_red, green_length=Jingmei_red,
                        prob_ew_car = prob_Jingmei_W, prob_we_car = prob_Jingmei_E, prob_ns_car = prob_Shunan_S, prob_sn_car = prob_Shunan_N,
                        prob_ew_turn = prob_turn_W, prob_we_turn = prob_turn_E, prob_ns_turn = prob_turn_S, prob_sn_turn = prob_turn_N,
                        add_until = steps)

make_animation(sim_JS_road, steps, steps_per_frame=1, interval=100)

## Alternate Strategy

The code cell below visualizes the alternate traffic light strategy for the Jingmei-Shunan road. The alternate strategy consists of setting the red and green lights proportional to the historical data about the density.

In [None]:
measured_height = 212.59
measured_width = 232.56

average_car_length = 4.53

height, width = int(measured_height/average_car_length), int(measured_width/average_car_length)

prob_Shunan_S = 29 / width
prob_Shunan_N = 24 / width
prob_Jingmei_W = 6 / height
prob_Jingmei_E = 4 / height

prob_turn_S = 3 / 4
prob_turn_N = 4 / 6
prob_turn_W = 2 / 29
prob_turn_E = 6 / 24

steps = 100

Shunan_red = 30
Jingmei_red = 90

sim_JS_road_alternate = Simulation(height, width, red_length=Shunan_red, green_length=Jingmei_red,
                        prob_ew_car = prob_Jingmei_W, prob_we_car = prob_Jingmei_E, prob_ns_car = prob_Shunan_S, prob_sn_car = prob_Shunan_N,
                        prob_ew_turn = prob_turn_W, prob_we_turn = prob_turn_E, prob_ns_turn = prob_turn_S, prob_sn_turn = prob_turn_N,
                        add_until = steps, alternate=True)

make_animation(sim_JS_road_alternate, steps, steps_per_frame=1, interval=100)

## Analysis

### MFA: Theoretical vs. Simulation

In [None]:
# MFA Code taken from class 5.1

import numpy as np
import matplotlib.pyplot as plt
import random

def mfa(v, density, p_slow):
    '''
    Calculate the updated distribution over speeds given the current
    distribution by applying the MFA traffic rules.

    Inputs:
    
        v (list of float) The car speed probability vector. v[i] is the
        probability that a car is moving at speed i.

        density (float) The average number of cars per cell on the road.
        
        p_slow (float) The probability of random slow-down.
    
    Returns: A new, updated car speed probability vector.
    '''
    v_max = len(v) - 1
    new_v = [0] * len(v)  # The updated speed probability vector
    for v_from in range(v_max + 1):
        
        # Current speed, will be updated below
        speed = v_from
        
        # Accelerate
        if speed < v_max:
            speed += 1
        
        # Brake when there is a car in front at each distance from 1 to speed
        for distance in range(1, speed + 1):
            # Probability that a car is in front at a particular distance
            car_at_distance = (1-density)**(distance-1) * density
            if distance > 1:
                new_v[distance-1] += v[v_from] * car_at_distance * (1-p_slow)
                new_v[distance-2] += v[v_from] * car_at_distance * p_slow
            else:
                new_v[distance-1] += v[v_from] * car_at_distance
        
        # No cars in front up to distance == speed
        new_v[speed] += v[v_from] * (1-density)**speed * (1-p_slow)
        q_mini_traffic = 0.9
        new_v[speed-1] += v[v_from] * (1-density)**speed * p_slow*(1-q_mini_traffic)
        new_v[speed-2] += v[v_from] * (1-density)**speed * p_slow*q_mini_traffic
        
    return new_v

def average_speed(v):
    return np.sum(np.array(v) * np.arange(len(v)))

def average_flow(v, density):
    return density * average_speed(v)

max_speed = 5
p_slows =  [0.1, 0.25, 0.3, 0.5, 0.7, 0.9, 0.99999999999]
car_densities = np.linspace(0, 1, 101)

mfa_flow_results = {}  # map from p_slow to flow results for all densities

for p_slow in p_slows:
    mfa_flow_results[p_slow] = []
    for density in car_densities:
        # Start from a uniform distribution over speeds
        v = [1/(max_speed + 1)] * (max_speed + 1)
        assert abs(sum(v) - 1) < 1e-6  # Sanity check before
        for i in range(100):  # Run until convergence
            v = mfa(v, density, p_slow)
            assert abs(sum(v) - 1) < 1e-6  # Sanity check after
        mfa_flow_results[p_slow].append(average_flow(v, density))


In [None]:
# MFA code for Cellular automata

def mfa_ca(densities, p_slow):
    v = p_slow / (1 - p_slow)

    flow_vals = (densities * (1 - densities) * v) / (1 + densities * v)

    return flow_vals


mfa_ca_flow_results = {}

for p_slow in p_slows:
    mfa_ca_flow_results[p_slow] = mfa_ca(car_densities, p_slow)


In [None]:
# make a figure with 2 subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12,8))
# fig, ax = plt.figure(figsize=(10,8)).subplots(1, 2)


for p, flow in mfa_ca_flow_results.items():
    # velocity for cellular automaton
    ax1.plot(car_densities, mfa_ca_flow_results[p], '-', label=f'CA p_slow = {round(p, 2)}') 
    ax2.plot(car_densities, mfa_flow_results[p], '--', label=f'car-based p_slow = {round(p, 2)}')

ax1.set_xlabel('Car densities')
ax2.set_xlabel('Car densities')

ax1.set_ylabel('MFA traffic flow')
ax2.set_ylabel('MFA traffic flow')

ax1.set_xlim(0, 1)
ax2.set_xlim(0, 1)

ax1.set_ylim(0, 1)
ax2.set_ylim(0, 1)


ax1.set_title('CA MFA traffic flow versus car density')
ax2.set_title('In-class MFA traffic flow versus car density')

ax1.legend(loc='upper right')
ax2.legend(loc='upper right')

plt.show()


In [None]:
def reproduce_density_plot(densities, trials=100, steps=50, p=0.25):
    density_dict = {}

    for density in densities:
        flows = []
        for trial in range(trials):
            sim = Simulation(20, 20,
                            prob_ew_car=density, 
                            prob_we_car=density,
                            prob_ns_car=density,
                            prob_sn_car=density, 
                            prob_slow_down=p,
                            show_figure=False,
                            add_until=steps)
            sim.initialize()
            for i in range(steps):
                sim.update()
        
            # get the final traffic flow for each lane
            for lane in sim.lanes:
                flows.append(lane.average_traffic_flow[-1])

        density_dict[density] = np.mean(flows)

    return density_dict

In [None]:
# run the normal simulation for different densities
height, width = 20, 20
trials = 100
steps = 25

car_densities = np.linspace(0, 1, 101)

car_densities_traffic_flows = {}
p_slow_traffic_flows = {}

for p in p_slows:
    average_flows = reproduce_density_plot(car_densities, trials, steps, p)
    p_slow_traffic_flows[p] = average_flows


In [None]:
# make a figure with 3 subplots

# make a figure with 2 subplots
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(16,8))


for p, flow in mfa_ca_flow_results.items():
    # velocity for cellular automaton
    ax1.plot(car_densities, mfa_ca_flow_results[p], '-', label=f'CA p_slow = {round(p, 2)}') 
    ax2.plot(car_densities, mfa_flow_results[p], '--', label=f'Car-based p_slow = {round(p, 2)}')
    
for p, dens_flow in p_slow_traffic_flows.items():
    # turn keys into a list = densities
    densities = list(dens_flow.keys())
    flows = list(dens_flow.values())
    ax3.plot(densities, flows, ':', label=f'Empirical sim p_slow = {round(p, 2)}')

ax1.set_xlabel('Car densities')
ax2.set_xlabel('Car densities')
ax3.set_xlabel('Car densities')

ax1.set_ylabel('MFA traffic flow')
ax2.set_ylabel('MFA traffic flow')
ax3.set_ylabel('MFA traffic flow')

ax1.set_xlim(0, 1)
ax2.set_xlim(0, 1)
ax3.set_xlim(0, 1)

ax1.set_ylim(0, 1)
ax2.set_ylim(0, 1)
ax3.set_ylim(0, 1)


ax1.set_title('CA MFA traffic flow versus car density')
ax2.set_title('In-Class MFA traffic flow versus car density')
ax3.set_title('Empirical sim MFA traffic flow versus car density')

ax1.legend(loc='upper right')
ax2.legend(loc='upper right')
ax3.legend(loc='upper right')

plt.show()


### Traffic Flow Comparison: Different Strategies

In [None]:
# run the normal Jingmei-Shunan road simulation

trials = 1000
steps = 500

measured_height = 212.59
measured_width = 232.56

average_car_length = 4.53

height, width = int(measured_height/average_car_length), int(measured_width/average_car_length)


normal_traffic_flows = {'ew': [], 'we': [], 'ns': [], 'sn': []}

for t in range(trials):
    sim_JS_road = Simulation(height, width, red_length=Shunan_red, green_length=Jingmei_red,
                        prob_ew_car = prob_Jingmei_W, prob_we_car = prob_Jingmei_E, prob_ns_car = prob_Shunan_S, prob_sn_car = prob_Shunan_N,
                        prob_ew_turn = prob_turn_W, prob_we_turn = prob_turn_E, prob_ns_turn = prob_turn_S, prob_sn_turn = prob_turn_N,
                        add_until = steps, show_figure=False)
    
    sim_JS_road.initialize()
    for i in range(steps):
        sim_JS_road.update()

    normal_traffic_flows['sn'].append(np.mean(sim_JS_road.lanes[0].average_traffic_flow))
    normal_traffic_flows['ns'].append(np.mean(sim_JS_road.lanes[1].average_traffic_flow))
    normal_traffic_flows['ew'].append(np.mean(sim_JS_road.lanes[2].average_traffic_flow))
    normal_traffic_flows['we'].append(np.mean(sim_JS_road.lanes[3].average_traffic_flow))


# run the alternate Jingmei-Shunan road simulation
alternate_traffic_flows = {'ew': [], 'we': [], 'ns': [], 'sn': []}

for t in range(trials):
    sim_JS_road_alternate = Simulation(height, width, red_length=Shunan_red, green_length=Jingmei_red,
                        prob_ew_car = prob_Jingmei_W, prob_we_car = prob_Jingmei_E, prob_ns_car = prob_Shunan_S, prob_sn_car = prob_Shunan_N,
                        prob_ew_turn = prob_turn_W, prob_we_turn = prob_turn_E, prob_ns_turn = prob_turn_S, prob_sn_turn = prob_turn_N,
                        add_until = steps, alternate = True, show_figure=False)
    
    sim_JS_road_alternate.initialize()
    for i in range(steps):
        sim_JS_road_alternate.update()
    
    alternate_traffic_flows['sn'].append(np.mean(sim_JS_road_alternate.lanes[0].average_traffic_flow))
    alternate_traffic_flows['ns'].append(np.mean(sim_JS_road_alternate.lanes[1].average_traffic_flow))
    alternate_traffic_flows['ew'].append(np.mean(sim_JS_road_alternate.lanes[2].average_traffic_flow))
    alternate_traffic_flows['we'].append(np.mean(sim_JS_road_alternate.lanes[3].average_traffic_flow))
    

In [None]:
fig, ax = plt.subplots(2, 2, figsize=(12,12))


ax[0, 0].hist(normal_traffic_flows['sn'], alpha=0.5, label='normal', bins=25, density=True)
#ax[0, 0].hist(alternate_traffic_flows['sn'], alpha=0.5, label='alternate', bins=25)
ax[0, 0].set_title('SN Lane traffic flow')
ax[0, 0].legend(loc='upper right')
ax[0, 0].set_xlabel('Traffic flow')

ax[0, 1].hist(normal_traffic_flows['ns'], alpha=0.5, label='normal', bins=25, density=True)
#ax[0, 1].hist(alternate_traffic_flows['ns'], alpha=0.5, label='alternate', bins=25)
ax[0, 1].set_title('NS Lane traffic flow')
ax[0, 1].legend(loc='upper right')
ax[0, 1].set_xlabel('Traffic flow')


ax[1, 0].hist(normal_traffic_flows['ew'], alpha=0.5, label='normal', bins=25, density=True)
#ax[1, 0].hist(alternate_traffic_flows['ew'], alpha=0.5, label='alternate', bins=25)
ax[1, 0].set_title('EW Lane traffic flow')
ax[1, 0].legend(loc='upper right')
ax[1, 0].set_xlabel('Traffic flow')

ax[1, 1].hist(normal_traffic_flows['we'], alpha=0.5, label='normal', bins=25, density=True)
#ax[1, 1].hist(alternate_traffic_flows['we'], alpha=0.5, label='alternate', bins=25)
ax[1, 1].set_title('WE Lane traffic flow')
ax[1, 1].legend(loc='upper right')
ax[1, 1].set_xlabel('Traffic flow')
plt.show()

In [None]:
fig, ax = plt.subplots(2, 2, figsize=(12,12))


#ax[0, 0].hist(normal_traffic_flows['sn'], alpha=0.5, label='normal', bins=25)
ax[0, 0].hist(alternate_traffic_flows['sn'], alpha=0.5, label='alternate', bins=25, color='orange', density=True)
ax[0, 0].set_title('SN Lane traffic flow')
ax[0, 0].legend(loc='upper right')
ax[0, 0].set_xlabel('Traffic flow')

#ax[0, 1].hist(normal_traffic_flows['ns'], alpha=0.5, label='normal', bins=25)
ax[0, 1].hist(alternate_traffic_flows['ns'], alpha=0.5, label='alternate', bins=25, color='orange', density=True)
ax[0, 1].set_title('NS Lane traffic flow')
ax[0, 1].legend(loc='upper right')
ax[0, 1].set_xlabel('Traffic flow')


#ax[1, 0].hist(normal_traffic_flows['ew'], alpha=0.5, label='normal', bins=25)
ax[1, 0].hist(alternate_traffic_flows['ew'], alpha=0.5, label='alternate', bins=25, color='orange', density=True)
ax[1, 0].set_title('EW Lane traffic flow')
ax[1, 0].legend(loc='upper right')
ax[1, 0].set_xlabel('Traffic flow')

#ax[1, 1].hist(normal_traffic_flows['we'], alpha=0.5, label='normal', bins=25)
ax[1, 1].hist(alternate_traffic_flows['we'], alpha=0.5, label='alternate', bins=25, color='orange', density=True)
ax[1, 1].set_title('WE Lane traffic flow')
ax[1, 1].legend(loc='upper right')
ax[1, 1].set_xlabel('Traffic flow')
plt.show()

In [None]:
fig, ax = plt.subplots(2, 2, figsize=(12,12))


ax[0, 0].hist(normal_traffic_flows['sn'], alpha=0.5, label='normal', bins=25, density=True)
ax[0, 0].hist(alternate_traffic_flows['sn'], alpha=0.5, label='alternate', bins=25, density=True)
ax[0, 0].set_title('SN Lane traffic flow')
ax[0,0].axvline(x=np.mean(normal_traffic_flows['sn']), color='C0', linestyle='dashed', 
                linewidth=2, label=f'{round(np.mean(normal_traffic_flows["sn"]), 2)}')
ax[0,0].axvline(x=np.mean(alternate_traffic_flows['sn']), color='orange', linestyle='dashed', 
                linewidth=2, label=f'{round(np.mean(alternate_traffic_flows["sn"]), 2)}')
ax[0, 0].legend(loc='upper left')
ax[0, 0].set_xlabel('Traffic flow')

ax[0, 1].hist(normal_traffic_flows['ns'], alpha=0.5, label='normal', bins=25, density=True)
ax[0, 1].hist(alternate_traffic_flows['ns'], alpha=0.5, label='alternate', bins=25, density=True)
ax[0, 1].set_title('NS Lane traffic flow')
ax[0, 1].axvline(x=np.mean(normal_traffic_flows['ns']), color='C0', linestyle='dashed', 
                linewidth=2, label=f'{round(np.mean(normal_traffic_flows["ns"]), 2)}')
ax[0, 1].axvline(x=np.mean(alternate_traffic_flows['ns']), color='orange', linestyle='dashed', 
                linewidth=2, label=f'{round(np.mean(alternate_traffic_flows["ns"]), 2)}')
ax[0, 1].legend(loc='upper left')
ax[0, 1].set_xlabel('Traffic flow')


ax[1, 0].hist(normal_traffic_flows['ew'], alpha=0.5, label='normal', bins=25, density=True)
ax[1, 0].hist(alternate_traffic_flows['ew'], alpha=0.5, label='alternate', bins=25, density=True)
ax[1, 0].set_title('EW Lane traffic flow')
ax[1,0].axvline(x=np.mean(normal_traffic_flows['ew']), color='C0', linestyle='dashed',
                linewidth=2, label=f'{round(np.mean(normal_traffic_flows["ew"]), 2)}')
ax[1,0].axvline(x=np.mean(alternate_traffic_flows['ew']), color='orange', linestyle='dashed',
                linewidth=2, label=f'{round(np.mean(alternate_traffic_flows["ew"]), 2)}')
ax[1, 0].legend(loc='upper left')
ax[1, 0].set_xlabel('Traffic flow')


ax[1, 1].hist(normal_traffic_flows['we'], alpha=0.5, label='normal', bins=25, density=True)
ax[1, 1].hist(alternate_traffic_flows['we'], alpha=0.5, label='alternate', bins=25, density=True)
ax[1, 1].set_title('WE Lane traffic flow')
ax[1,1].axvline(x=np.mean(normal_traffic_flows['we']), color='C0', linestyle='dashed',
                linewidth=2, label=f'{round(np.mean(normal_traffic_flows["we"]), 2)}')
ax[1,1].axvline(x=np.mean(alternate_traffic_flows['we']), color='orange', linestyle='dashed',
                linewidth=2, label=f'{round(np.mean(alternate_traffic_flows["we"]), 2)}')
ax[1, 1].legend(loc='upper left')
ax[1, 1].set_xlabel('Traffic flow')
plt.show()