In [None]:
import sys, os
from os.path import join, dirname, abspath
import matplotlib.pyplot as plt
from matplotlib.pyplot import Figure, Axes
import numpy as np
from numpy import array as arr
import networkx as nx
import matplotlib.animation as animation
from mpl_toolkits.axes_grid1 import make_axes_locatable
from directions import *
from string import ascii_uppercase
plt.rcParams.update({
    "text.usetex": False,
    "ytick.minor.visible":True,
    "xtick.minor.visible":True,
    'xtick.direction': "in",
    'ytick.direction': "in"
})
outdir = "claremont"
os.makedirs(outdir,exist_ok=True)
def out(fname): return join(outdir,fname)
def savefig(plot_name): 
    plt.savefig(out(plot_name),bbox_inches="tight",dpi=250)
    
import pandas as pd
from numpy.linalg import matrix_power, eig

def arr_to_latex(M):
    return '$$\n' + r'\begin{bmatrix}' + '\n' + (r'\\' + '\n').join('&'.join(str(round(x,3)) for x in row) for row in M) + '\n' + r'\end{bmatrix}' + '\n' +'$$'

def vec_to_latex(x,round=3):
    return '$$\n' + r'\begin{bmatrix}' + '\n' + (r' \\ ').join(str(np.round(v,round)) for v in x) + '\n' + r'\end{bmatrix}' + '\n' +'$$'

from car import Car
from tiles import Road, Exit, Onramp
from world import World, Choice, Route
from traffic_signals import Stoplight

In [None]:
EW_CAPACITY = 104
WE_CAPACITY = 103
NS_CAPACITY = 46
SN_CAPACITY = 47

capacities = [EW_CAPACITY,WE_CAPACITY,NS_CAPACITY,SN_CAPACITY]

In [None]:
TIMESTEPS_PER_SECOND = 1/( 2 * 10 / 88)  # 1 / 2 tiles/timestep * 10ft/tile * 1 second/88ft (60mph)
TIMESTEPS_PER_SECOND

In [None]:
1/4.4

In [None]:
30/10 / TIMESTEPS_PER_SECOND

In [None]:
inflow = pd.DataFrame(data=["Westbound","Eastbound","Northbound","Southbound"],columns=["Direction"])
inflow["Cars per 10 Seconds"] = [30,50,25,15]
inflow["Cars per timestep"] = np.round(inflow["Cars per 10 Seconds"]/10 / TIMESTEPS_PER_SECOND,3)
inflow["Number of Onramps"] = [2,2,1,1]
inflow["Onramp Period"] = np.round(1/inflow["Cars per timestep"]) * inflow["Number of Onramps"]
inflow.to_csv(out("inflow.csv"))
inflow

<img src="turning_proportions.png"> 

In [None]:
EW_ONRAMP_PERIOD, WE_ONRAMP_PERIOD, SN_ONRAMP_PERIOD, NS_ONRAMP_PERIOD = list(inflow["Onramp Period"])

In [None]:
# TURN RATES
# north to south
L_NS = 0.5   # fraction of n->s cars that go left  
R_NS = 0.25 # fraction of n->s cars that go right
# STRAIGHT_NS = 1 - (L_NS+R_NS)

# south to north
L_SN = 0.5   # fraction of s->n cars that go left  
R_SN = 0.25 # fraction of s->n cars that go right
# STRAIGHT_SN = 1 - (L_SN+R_SN)

L_EW = 0.15  # fraction of e->w cars that go left  
R_EW = 0.1 # fraction of e->w cars that go right
# STRAIGHT_EW = 1 - (L_EW+R_EW)

L_WE = 0.15   # fraction of w->e cars that go left  
R_WE = 0.1 # fraction of w->e cars that go right
# STRAIGHT_WE = 1 - (L_WE+R_WE)

In [None]:
H_INROAD_LEN=50
V_INROAD_LEN=16
OUTROAD_LEN=50

# center of the map
X, Y = H_INROAD_LEN+2,H_INROAD_LEN+2

# axis limits when zooming into the intersection (for display purposes)
XLIMS = (X-12,X+12)
YLIMS = (Y-12,Y+13)

TIGHT_XLIMS = (X-6,X+6)
TIGHT_YLIMS = (Y-6,Y+6)

In [None]:
R66_SPEED_LIMIT = 2
IH_SPEED_LIMIT = 1
INTERSECTION_SPEED_LIMIT = 2

In [None]:
# indian hill: north-south single-lane, 50 car capacity
# route 66: east-west double lane, 100 car capacity in each direction

r66 = []
indian_hill = []

EW_R_IN = (X+2,Y+3)
EW_TS_IN = (X+2,Y+2)
EW_BS_IN = (X+2,Y+1)
EW_L_IN = (X+2,Y)

EW_TS_OUT = (X-3,Y+2)
EW_BS_OUT = (X-3,Y+1)

WE_L_IN = (X-2,Y)
WE_TS_IN = (X-2,Y-1)
WE_BS_IN = (X-2,Y-2)

WE_TS_OUT = (X+3,Y-1)
WE_BS_OUT = (X+3,Y-2)

NS_S_IN = (X-1,Y+3)
NS_R_IN = (X-2,Y+3)
NS_L_IN = (X,Y+3)

NS_S_OUT = (X-1,Y-3)

SN_S_IN = (X+1,Y-2)
SN_R_IN = (X+2,Y-2)
SN_L_IN = (X,Y-2)

SN_S_OUT = (X+1,Y+4)

INT_BOUNDS = np.array([X,Y]) + np.array([[-2,-2],[+3,+4]])  # exclusive on upper right edge

x = np.arange(*INT_BOUNDS[:,0])
y = np.arange(*INT_BOUNDS[:,1])
gx,gy = np.meshgrid(x,y)
intersection_points = [tuple(a) for a in np.array(list(zip(gx.flatten(),gy.flatten())))]

WE_TS_ONRAMP_LOC = (WE_TS_IN[0]-(H_INROAD_LEN-1),WE_TS_IN[1])
WE_BS_ONRAMP_LOC = (WE_BS_IN[0]-(H_INROAD_LEN-1),WE_BS_IN[1])
EW_TS_ONRAMP_LOC = (EW_TS_IN[0]+(H_INROAD_LEN-2),EW_TS_IN[1])
EW_BS_ONRAMP_LOC = (EW_BS_IN[0]+(H_INROAD_LEN-2),EW_BS_IN[1])
NS_ONRAMP_LOC = (NS_S_IN[0],NS_S_IN[1]+(V_INROAD_LEN+1))
SN_ONRAMP_LOC = (SN_S_IN[0],SN_S_IN[1]-(V_INROAD_LEN+1))

onramp_points = [WE_TS_ONRAMP_LOC,WE_BS_ONRAMP_LOC,EW_TS_ONRAMP_LOC,EW_BS_ONRAMP_LOC,NS_ONRAMP_LOC,SN_ONRAMP_LOC]

EW_R_LNCHNG = (EW_TS_IN[0]+6,EW_TS_IN[1])
EW_L_LNCHNG = (EW_BS_IN[0]+6,EW_BS_IN[1])

WE_L_LNCHNG = (WE_TS_IN[0]-6,WE_TS_IN[1])

SN_LR_LNCHNG = (SN_S_IN[0],SN_S_IN[1]-V_INROAD_LEN)
NS_LR_LNCHNG = (NS_S_IN[0],NS_S_IN[1]+V_INROAD_LEN)

lane_change_points = [EW_R_LNCHNG,EW_L_LNCHNG,WE_L_LNCHNG,SN_LR_LNCHNG,NS_LR_LNCHNG]

forbidden_points = onramp_points + intersection_points + lane_change_points

for x in range(X-H_INROAD_LEN, X + OUTROAD_LEN):
    for y,direction in zip([EW_TS_IN[1],EW_BS_IN[1],WE_TS_IN[1],WE_BS_IN[1]],[WEST,WEST,EAST,EAST]):
        if (x,y) not in forbidden_points:
            r66.append(Road(x,y,direction,speed_limit=R66_SPEED_LIMIT))

for y in range(Y-(V_INROAD_LEN+2), Y+V_INROAD_LEN+3):
    for x, direction in zip([SN_S_IN[0],NS_S_IN[0]],[NORTH,SOUTH]):
        if (x,y) not in forbidden_points:
            print(x,y)
            indian_hill.append(Road(x,y,direction,speed_limit=IH_SPEED_LIMIT))

# west-to-east left turn lane
for x in range(WE_L_LNCHNG[0]+1,WE_L_IN[0]):
    y = WE_L_IN[1]
    # print(x-X,y-Y)
    print(x,y)
    r66.append(Road(x,y,EAST,speed_limit=R66_SPEED_LIMIT))
r66.append(Road(*WE_L_LNCHNG,(2*L_WE*NORTHEAST + (1-2*L_WE)*EAST),speed_limit=R66_SPEED_LIMIT))

# e-w left and right turn lanes
for x in range(EW_L_IN[0]+1,EW_L_LNCHNG[0]):
    r66.append(Road(x,EW_L_IN[1],WEST,speed_limit=R66_SPEED_LIMIT))
    r66.append(Road(x,EW_R_IN[1],WEST,speed_limit=R66_SPEED_LIMIT))
r66.append(Road(*EW_L_LNCHNG,2*L_EW*SOUTHWEST+(1-2*L_EW)*WEST,speed_limit=R66_SPEED_LIMIT))
r66.append(Road(*EW_R_LNCHNG,2*R_EW*NORTHWEST+(1-2*R_EW)*WEST,speed_limit=R66_SPEED_LIMIT))

# n-s left and right turn lanes
for y in range(NS_L_IN[1]+1,NS_LR_LNCHNG[1]):
    # print((NS_L_IN[0],y),(NS_R_IN[0],y))
    r66.append(Road(NS_L_IN[0],y,SOUTH,speed_limit=IH_SPEED_LIMIT))
    r66.append(Road(NS_R_IN[0],y,SOUTH,speed_limit=IH_SPEED_LIMIT))
r66.append(Road(*NS_LR_LNCHNG,(L_NS*SOUTHEAST+R_NS*SOUTHWEST+(1-(L_NS+R_NS))*SOUTH),speed_limit=IH_SPEED_LIMIT))

# s-n left and right turn lanes
for y in range(SN_LR_LNCHNG[1]+1,SN_L_IN[1]):
    # print((SN_L_IN[0],y),(NS_R_IN[0],y))
    r66.append(Road(SN_L_IN[0],y, NORTH,speed_limit=IH_SPEED_LIMIT))
    r66.append(Road(SN_R_IN[0],y, NORTH,speed_limit=IH_SPEED_LIMIT))
r66.append(Road(*SN_LR_LNCHNG,(L_SN*NORTHWEST+R_SN*NORTHEAST+(1-(L_SN+R_SN))*NORTH),speed_limit=IH_SPEED_LIMIT))


for i in intersection_points:
    r66.append(Road(i[0],i[1],np.ones(9)/9,speed_limit=R66_SPEED_LIMIT))

# onramps
ew_b_onramp = Onramp(*EW_BS_ONRAMP_LOC,WEST,car_period=EW_ONRAMP_PERIOD,speed_limit=R66_SPEED_LIMIT,avg_speed=10*R66_SPEED_LIMIT+1)
ew_t_onramp = Onramp(*EW_TS_ONRAMP_LOC,WEST,car_period=EW_ONRAMP_PERIOD,speed_limit=R66_SPEED_LIMIT,avg_speed=10*R66_SPEED_LIMIT+1)
we_b_onramp = Onramp(*WE_BS_ONRAMP_LOC,EAST,car_period=WE_ONRAMP_PERIOD,speed_limit=R66_SPEED_LIMIT,avg_speed=10*R66_SPEED_LIMIT+1)
we_t_onramp = Onramp(*WE_TS_ONRAMP_LOC,EAST,car_period=WE_ONRAMP_PERIOD,speed_limit=R66_SPEED_LIMIT,avg_speed=10*R66_SPEED_LIMIT+1)

ns_onramp = Onramp(*NS_ONRAMP_LOC,SOUTH,car_period=NS_ONRAMP_PERIOD,speed_limit=IH_SPEED_LIMIT,avg_speed=10*IH_SPEED_LIMIT+1)
sn_onramp = Onramp(*SN_ONRAMP_LOC,NORTH,car_period=SN_ONRAMP_PERIOD,speed_limit=IH_SPEED_LIMIT,avg_speed=10*IH_SPEED_LIMIT+1)

onramps = [ew_b_onramp,ew_t_onramp,we_b_onramp,we_t_onramp,ns_onramp,sn_onramp]
r66 += onramps

# exits
we_b_exit = Exit(X+OUTROAD_LEN,WE_BS_IN[1],EAST) 
print(X+OUTROAD_LEN,WE_BS_IN[1])

we_t_exit = Exit(X+OUTROAD_LEN,WE_TS_IN[1],EAST) 
print(X+OUTROAD_LEN,WE_TS_IN[1])

ew_b_exit = Exit(X-(OUTROAD_LEN+1),EW_BS_IN[1],WEST)
print(X-(OUTROAD_LEN+2),EW_BS_IN[1])

ew_t_exit = Exit(X-(OUTROAD_LEN+1),EW_TS_IN[1],WEST)
print(X-(OUTROAD_LEN+2),EW_TS_IN[1])

sn_exit = Exit(SN_S_IN[0],Y+(V_INROAD_LEN+3),SOUTH)
ns_exit = Exit(NS_S_IN[0],Y-(V_INROAD_LEN+3),NORTH)

exits = [ew_b_exit,ew_t_exit,we_b_exit,we_t_exit, sn_exit, ns_exit]
r66 += exits

w = World(r66+indian_hill,cars=[])

w.draw(markersize=10,xlims=XLIMS,ylims=YLIMS)
_=w.draw(markersize=10)

In [None]:
_=w.draw(markersize=10,xlims=(X-(OUTROAD_LEN+5),X-(OUTROAD_LEN-2)),ylims=(Y-3,Y+3),show=False)
plt.title("E-W Exits + W-E Onramps")

In [None]:
_=w.draw(markersize=10,xlims=(X-5,X+5),ylims=(NS_LR_LNCHNG[1]-7,NS_LR_LNCHNG[1]+3),show=False)
plt.title("N-S Lanechange")

In [None]:
_=w.draw(markersize=10,xlims=(X-5,X+5),ylims=(SN_LR_LNCHNG[1]-3,SN_LR_LNCHNG[1]+7),show=False)
plt.title("S-N Lanechange")

In [None]:
_=w.draw(markersize=10,xlims=(X+(OUTROAD_LEN+5),X+(OUTROAD_LEN-2)),ylims=(Y-3,Y+3),show=False)
plt.title("W-E Exits + E-W Onramps")

In [None]:
_=w.draw(markersize=10,xlims=(X-3,X+3),ylims=(Y-(OUTROAD_LEN+3),Y-(OUTROAD_LEN-3)),show=False)
plt.title("N-S Exit + S-N Onramp")

In [None]:
_=w.draw(markersize=10,xlims=(X-3,X+3),ylims=(X+(OUTROAD_LEN+3),X+(OUTROAD_LEN-3)),show=False)
plt.title("S-N Exit + N-S Onramp")

### Horizontal Routes

In [None]:
os.makedirs(out("routes"),exist_ok=True)

In [None]:
w.reset()

In [None]:
r_ew_ts = Route(w[EW_TS_OUT[0]:EW_TS_IN[0]+1,EW_TS_IN[1]][::-1])  # e-w top lane, straight
r_ew_l = Route([w[EW_L_IN],w[X+1,Y],w[X,Y-1],w[X-1,Y-2],w[NS_S_OUT]]) # e-w top lane, left  (will break if change layout)
r_ew_bs = Route(w[EW_BS_OUT[0]:EW_BS_IN[0]+1,EW_BS_IN[1]][::-1])  # e-w top lane, straight
r_ew_r = Route(list(w[SN_S_OUT[0]:EW_R_IN[0]+1,EW_R_IN[1]][::-1])+[w[SN_S_OUT]]) # e-w bottom lane, right


r_we_ts = Route(w[WE_TS_IN[0]:WE_TS_OUT[0]+1,WE_TS_IN[1]])
r_we_bs = Route(w[WE_BS_IN[0]:WE_BS_OUT[0]+1,WE_BS_IN[1]])
r_we_r = Route(list(w[WE_BS_IN[0]:NS_S_OUT[0]+1,WE_BS_IN[1]])+[w[NS_S_OUT]])
r_we_l = Route([w[WE_L_IN],w[X-1,Y+1],w[X,Y+2],w[X+1,Y+3],w[SN_S_OUT]])


In [None]:
ROUTE_LINEWIDTH = 3
ROUTE_ALPHA = 1

In [None]:
fig,ax = plt.subplots()
w.draw(markersize=10, ax=ax,show=False,xlims=TIGHT_XLIMS,ylims=TIGHT_YLIMS,c="black",alpha=0.05)
plt.title("Eastbound and Westbound Routes")
# plt.xlim(center_x-5,center_x+5)
# plt.ylim(center_y-5,center_y+5)
r_ew_ts.draw(ax=ax,color="green",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_ew_l.draw(ax=ax,color="blue",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_ew_bs.draw(ax=ax,color="green",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_ew_r.draw(ax=ax,color="red",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)

r_we_ts.draw(ax=ax,color="green",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_we_bs.draw(ax=ax,color="green",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_we_l.draw(ax=ax,color="blue",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_we_r.draw(ax=ax,color="red",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
savefig(join("routes","horizontal.png"))

### Vertical Routes

In [None]:
r_ns_s = Route(w[NS_S_IN[0],NS_S_OUT[1]:NS_S_IN[1]+1][::-1]) # n-s straight
r_ns_r = Route(list(w[NS_R_IN[0],EW_TS_OUT[1]:NS_R_IN[1]+1][::-1])+[w[EW_TS_OUT]]) # n-s right
r_ns_l = Route(list(w[NS_L_IN[0],NS_L_IN[1]-2:NS_L_IN[1]+1][::-1])+[w[X+1,Y]]+list(w[X+2:WE_TS_OUT[0]+1,Y-1]))
# r_ns_l = Route(list(w.tiles[X,Y:Y+3][::-1])+list(w.tiles[X+1:X+5,Y])) # n-s left

r_sn_s = Route(w[SN_S_IN[0],SN_S_IN[1]:SN_S_OUT[1]+1])
r_sn_r = Route([w[SN_R_IN],w[WE_BS_OUT]])
r_sn_l = Route(list(w[SN_L_IN[0],SN_L_IN[1]:SN_L_IN[1]+2])+[w[X-1,Y],w[X-2,Y+1],w[EW_BS_OUT]])

In [None]:
routes = [r_ns_s,r_ns_r,r_ns_l,r_sn_s,r_sn_r,r_sn_l,r_ew_ts,r_ew_l,r_ew_bs,r_ew_r,r_we_ts,r_we_bs,r_we_r,r_we_l]

In [None]:
w.reset()
fig,ax = plt.subplots()
w.draw(markersize=10, ax=ax,show=False,xlims=TIGHT_XLIMS,ylims=TIGHT_YLIMS,c="black",alpha=0.05)
plt.title("Northbound and Southbound Routes")
r_ns_s.draw(ax=ax,color="green",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_ns_r.draw(ax=ax,color="red",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_ns_l.draw(ax=ax,color="blue",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)

r_sn_s.draw(ax=ax,color="green",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_sn_r.draw(ax=ax,color="red",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
r_sn_l.draw(ax=ax,color="blue",alpha=ROUTE_ALPHA,linewidth=ROUTE_LINEWIDTH)
savefig(join("routes","vertical.png"))

In [None]:
# add route choices
w[EW_L_IN[0]+1,EW_L_IN[1]].add_choice(Choice([r_ew_l],[1]))
w[EW_TS_IN[0]+1,EW_TS_IN[1]].add_choice(Choice([r_ew_ts],[1]))
w[EW_BS_IN[0]+1,EW_BS_IN[1]].add_choice(Choice([r_ew_bs],[1]))
w[EW_R_IN[0]+1,EW_R_IN[1]].add_choice(Choice([r_ew_r],[1]))

w[WE_L_IN[0]-1,WE_L_IN[1]].add_choice(Choice([r_we_l],[1]))
w[WE_TS_IN[0]-1,WE_TS_IN[1]].add_choice(Choice([r_we_ts],[1]))
w[WE_BS_IN[0]-1,WE_BS_IN[1]].add_choice(Choice([r_we_r,r_we_bs],[2*R_WE,1-2*R_WE]))

w[NS_S_IN[0],NS_S_IN[1]+1].add_choice(Choice([r_ns_s],[1]))
w[NS_L_IN[0],NS_L_IN[1]+1].add_choice(Choice([r_ns_l],[1]))
w[NS_R_IN[0],NS_R_IN[1]+1].add_choice(Choice([r_ns_r],[1]))

w[SN_S_IN[0],SN_S_IN[1]-1].add_choice(Choice([r_sn_s],[1]))
w[SN_L_IN[0],SN_L_IN[1]-1].add_choice(Choice([r_sn_l],[1]))
w[SN_R_IN[0],SN_R_IN[1]-1].add_choice(Choice([r_sn_r],[1]))

## Unidirectional Tests

### Test One: E-W

In [None]:
for o in onramps:
    o.disable()
ew_t_onramp.enable()
ew_b_onramp.enable()
w.reset()
w.run(100,draw=True,outpath=out("ew_test.gif",),xlims=XLIMS,ylims=YLIMS,markersize=10,c="black")

In [None]:
n_ew_t_s = np.sum([len(a) for a in ew_t_exit.exited.values()])
n_ew_t_l = np.sum([len(a) for a in sn_exit.exited.values()])

In [None]:
n_ew_t_l, n_ew_t_s

### Test: W-E

In [None]:
w.reset()
for o in onramps:
    o.disable()
we_t_onramp.enable()
we_b_onramp.enable()
w.run(100,draw=True,outpath=out("we_test.gif",),xlims=XLIMS,ylims=YLIMS,markersize=10,c="black")

### Test: N-S

In [None]:
w.reset()
for o in onramps:
    o.disable()
ns_onramp.enable()
w.run(100,draw=True,outpath=out("ns_test.gif",),xlims=XLIMS,ylims=YLIMS,markersize=10,c="black")

### Test: S-N

In [None]:
w.reset()
for o in onramps:
    o.disable()
sn_onramp.enable()
w.run(100,draw=True,outpath=out("sn_test.gif",),xlims=XLIMS,ylims=YLIMS,markersize=10,c="black")

# All

### Stoplights

In [None]:
w.reset()   
for o in onramps:
    o.enable()

In [None]:
STOPLIGHT_PERIOD = int((2*60) * TIMESTEPS_PER_SECOND)  # 2 minutes

print(f"Stoplight period: {STOPLIGHT_PERIOD/TIMESTEPS_PER_SECOND}s ({STOPLIGHT_PERIOD} timesteps)")


DELAY_AFTER_LEFT = 5
DELAY_AFTER_STRAIGHT = 5

LEFT_STRAIGHT_RATIO_VERT = 1
# LEFT_STRAIGHT_RATIO_VERT = 2* np.max([L_NS,L_SN]) / (1 - np.max([L_NS,L_SN]))
LEFT_STRAIGHT_RATIO_HORIZ = 1
# LEFT_STRAIGHT_RATIO_HORIZ = 2*np.max([L_EW,L_WE]) / (1 - np.max([L_EW,L_WE]))

# straight-right (on:half-P) ratio
SR_RATIO_VERT = (1-2*(DELAY_AFTER_LEFT+DELAY_AFTER_STRAIGHT)/STOPLIGHT_PERIOD) / (1+LEFT_STRAIGHT_RATIO_VERT)
SR_RATIO_HORIZ = (1-2*(DELAY_AFTER_LEFT+DELAY_AFTER_STRAIGHT)/STOPLIGHT_PERIOD) / (1+LEFT_STRAIGHT_RATIO_HORIZ)

L_RATIO_VERT = LEFT_STRAIGHT_RATIO_VERT*SR_RATIO_VERT
L_RATIO_HORIZ = LEFT_STRAIGHT_RATIO_HORIZ*SR_RATIO_HORIZ

L1_VERTICAL_OFFSET = STOPLIGHT_PERIOD/2
# L1_VERTICAL_OFFSET = STOPLIGHT_PERIOD/2
SR1_VERTICAL_OFFSET = L1_VERTICAL_OFFSET + DELAY_AFTER_LEFT + L_RATIO_VERT*STOPLIGHT_PERIOD/2

L1_HORIZONTAL_OFFSET = 0
SR1_HORIZONTAL_OFFSET = L1_HORIZONTAL_OFFSET + DELAY_AFTER_LEFT + L_RATIO_HORIZ*STOPLIGHT_PERIOD/2

# SR2_HORIZONTAL_OFFSET = 
# L2_HORIZONTAL_OFFSET = 


In [None]:
ns_s_light = Stoplight(w.tiles[NS_S_IN[0],NS_S_IN[1]],direction=0,offset=SR1_VERTICAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_VERT, name="N-S S") 
ns_r_light = Stoplight(w.tiles[NS_R_IN[0],NS_R_IN[1]],direction=0,offset=SR1_VERTICAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_VERT, name="N-S R") 
ns_l_light = Stoplight(w.tiles[NS_L_IN[0],NS_L_IN[1]],direction=0,offset=L1_VERTICAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=L_RATIO_VERT, name="N-S L")

ew_ts_light = Stoplight(w.tiles[EW_TS_IN[0],EW_TS_IN[1]],direction=2,offset=SR1_HORIZONTAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_HORIZ, name="Straight") 
ew_bs_light = Stoplight(w.tiles[EW_BS_IN[0],EW_BS_IN[1]],direction=2,offset=SR1_HORIZONTAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_HORIZ, name="E-W TS") 
ew_r_light = Stoplight(w.tiles[EW_R_IN[0],EW_R_IN[1]],direction=2,offset=SR1_HORIZONTAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_HORIZ, name="Right") 
ew_l_light = Stoplight(w.tiles[EW_L_IN[0],EW_L_IN[1]],direction=2,offset=L1_HORIZONTAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=L_RATIO_HORIZ, name="Left")

we_ts_light = Stoplight(w.tiles[WE_TS_IN[0],WE_TS_IN[1]],direction=6,offset=SR1_HORIZONTAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_HORIZ, name="Straight") 
we_bs_light = Stoplight(w.tiles[WE_BS_IN[0],WE_BS_IN[1]],direction=6,offset=SR1_HORIZONTAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_HORIZ, name="Right") 
we_l_light = Stoplight(w.tiles[WE_L_IN[0],WE_L_IN[1]],direction=6,offset=L1_HORIZONTAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=L_RATIO_HORIZ, name="Left")

sn_s_light = Stoplight(w.tiles[SN_S_IN[0],SN_S_IN[1]],direction=4,offset=SR1_VERTICAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_VERT, name="S-N S") 
sn_r_light = Stoplight(w.tiles[SN_R_IN[0],SN_R_IN[1]],direction=4,offset=SR1_VERTICAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=SR_RATIO_VERT, name="S-N R") 
sn_l_light = Stoplight(w.tiles[SN_L_IN[0],SN_L_IN[1]],direction=4,offset=L1_VERTICAL_OFFSET,period=STOPLIGHT_PERIOD,on_to_half_p_ratio=L_RATIO_VERT, name="S-N L")



w.signaller.stoplights = []
ns_stoplights = [ns_s_light,ns_l_light,ns_r_light]
# ew_stoplights = [ew_ts_light,ew_l_light,ew_r_light]
ew_stoplights = [ew_ts_light,ew_l_light,ew_r_light,ew_bs_light]
we_stoplights = [we_ts_light,we_l_light,we_bs_light]
sn_stoplights = [sn_s_light,sn_l_light,sn_r_light]

stoplights = ns_stoplights + ew_stoplights + we_stoplights + sn_stoplights
# stoplights = [ns_light,sn_light,ew_t_light,ew_b_light,we_t_light,we_b_light]

In [None]:
[w.add_stoplight(s) for s in stoplights]

In [None]:
w.reset()
w.draw(markersize=10,xlims=XLIMS,ylims=YLIMS,c="black",show=False)
plt.title("Indian Hill - R66 Intersection")
savefig("intersection.png")

### The below cell is supposed to be run with only three E-W stoplights:

In [None]:
fig,axes = plt.subplots(ncols=1,nrows=3)
t= np.linspace(0,STOPLIGHT_PERIOD,num=1000)
lines = {}
axes = axes.flatten()
for s, ax in zip(ns_stoplights,axes):
    points_south = s.direction==4
    f = np.vectorize(s.update_func)
    l = ax.plot(t,f(t),c="tab:blue",linestyle="dashed")
    # l = ax.plot(t,f(t),c="tab:blue" if points_south else "tab:orange",linestyle="dashed" if points_south else "dotted")
    lines[s.direction] = l[0]
    ax.set_yticks([0,1],["Stop","Go"])
    if s.name is not None:
        pass
        # ax.set_title(s.name,fontsize="small",loc="left")
    ylims = ax.get_ylim() 
    ax.axvline(L1_VERTICAL_OFFSET+STOPLIGHT_PERIOD/2,*ylims,linestyle="dashed",color="tab:red")
    ax.fill_between([L1_VERTICAL_OFFSET+L_RATIO_VERT*STOPLIGHT_PERIOD/2,L1_VERTICAL_OFFSET+L_RATIO_VERT*STOPLIGHT_PERIOD/2+DELAY_AFTER_LEFT],*ylims,linestyle="dashed",color="tab:red",alpha=0.1)
    ax.fill_between([L1_VERTICAL_OFFSET+STOPLIGHT_PERIOD/2-DELAY_AFTER_STRAIGHT,L1_VERTICAL_OFFSET+STOPLIGHT_PERIOD/2],*ylims,linestyle="dashed",color="tab:red",alpha=0.1)
    ax.set_ylim(*ylims)
    ax.fill_between(t,f(t),color="green",alpha=0.25)

for s, ax in zip(we_stoplights,axes):
    # points_south = s.direction==4
    f = np.vectorize(s.update_func)
    l = ax.plot(t,f(t),c="tab:blue",linestyle="dashed")
    # l = ax.plot(t,f(t),c="tab:blue" if points_south else "tab:orange",linestyle="dashed" if points_south else "dotted")
    lines[s.direction] = l[0]
    ax.set_yticks([0,1],["Stop","Go"])
    if s.name is not None:
        ax.set_title(s.name,fontsize="small")
        # ax.set_title(s.name,fontsize="small",loc="left")
    ylims = ax.get_ylim() 
    ax.axvline(L1_HORIZONTAL_OFFSET+STOPLIGHT_PERIOD/2,*ylims,linestyle="dashed",color="tab:red")
    ax.fill_between([L1_HORIZONTAL_OFFSET+L_RATIO_HORIZ*STOPLIGHT_PERIOD/2,L1_HORIZONTAL_OFFSET+L_RATIO_HORIZ*STOPLIGHT_PERIOD/2+DELAY_AFTER_LEFT],*ylims,linestyle="dashed",color="tab:red",alpha=0.1)
    ax.fill_between([L1_HORIZONTAL_OFFSET+STOPLIGHT_PERIOD/2-DELAY_AFTER_STRAIGHT,L1_HORIZONTAL_OFFSET+STOPLIGHT_PERIOD/2],*ylims,linestyle="dashed",color="tab:red",alpha=0.1)
    ax.set_ylim(*ylims)
    ax.fill_between(t,f(t),color="green",alpha=0.25)


for a in axes[:-1]:
    a.set_xticks([])
axes[-1].set_xlabel("t")
# _=axes[0].set_title("Stoplight States")
plt.suptitle(f"Stoplight States, $P$={STOPLIGHT_PERIOD} ({np.round(STOPLIGHT_PERIOD/TIMESTEPS_PER_SECOND,1)}s)")

# plt.legend(handles=[lines[4],lines[6]],labels=["South","East"])
savefig("signal_states.png")

In [None]:
fig,axes = plt.subplots(ncols=1,nrows=len(w.signaller.stoplights))
fig.tight_layout()
t= np.linspace(0,STOPLIGHT_PERIOD,num=1000)
lines = {}
axes = axes.flatten()
for s, ax in zip(w.signaller.stoplights,axes):
    points_south = s.direction==4
    f = np.vectorize(s.update_func)
    l = ax.plot(t,f(t),c="tab:blue",linestyle="dashed")
    # l = ax.plot(t,f(t),c="tab:blue" if points_south else "tab:orange",linestyle="dashed" if points_south else "dotted")
    lines[s.direction] = l[0]
    ax.set_yticks([0,1],["Stop","Go"])
    if s.name is not None:
        ax.set_title(s.name,fontsize="small",loc="left")
    ylims = ax.get_ylim() 
    ax.axvline(STOPLIGHT_PERIOD/2,*ylims,linestyle="dashed")
    # ax.fill_between([L_RATIO*STOPLIGHT_PERIOD/2,L_RATIO*STOPLIGHT_PERIOD/2+DELAY_AFTER_LEFT],*ylims,linestyle="dashed",color="tab:red",alpha=0.1)
    # ax.fill_between([STOPLIGHT_PERIOD/2-DELAY_AFTER_LEFT,STOPLIGHT_PERIOD/2],*ylims,linestyle="dashed",color="tab:red",alpha=0.1)
    ax.set_ylim(*ylims)
    ax.fill_between(t,f(t),color="green",alpha=0.25)

for a in axes[:-1]:
    a.set_xticks([])
axes[-1].set_xlabel("t")
# _=axes[0].set_title("Stoplight States")
plt.suptitle("Stoplight States")
# plt.legend(handles=[lines[4],lines[6]],labels=["South","East"])
# savefig("signal_states.png")

In [None]:
# w.reset()

# for o in onramps:
#     o.disable()

# for o in [ew_b_onramp,ew_t_onramp,ns_onramp]:
#     o.enable()

# w.run(100,draw=True,outpath=out("ns_ew.gif"),markersize=10,xlims=XLIMS,ylims=YLIMS,c="black")

In [None]:
# w.reset()

# for o in onramps:
#     o.enable()
# w.run(15,draw=False)
# w.run(100,draw=True,outpath=out("all.gif"),markersize=10,xlims=XLIMS,ylims=YLIMS,c="black")

In [None]:
runtime = STOPLIGHT_PERIOD * 10

In [None]:
w.reset()
w.run(runtime,draw=False)

In [None]:
_=w.draw(markersize=10,xlims=XLIMS,ylims=YLIMS,c="black")

In [None]:
num_ew_ts = np.sum([len(a) for a in ew_t_exit.exited.values()])
num_ew_bs = np.sum([len(a) for a in ew_b_exit.exited.values()])
num_we_ts = np.sum([len(a) for a in we_t_exit.exited.values()])
num_we_bs = np.sum([len(a) for a in we_b_exit.exited.values()])
num_ns = np.sum([len(a) for a in ns_exit.exited.values()])
num_sn = np.sum([len(a) for a in sn_exit.exited.values()])


# n_ew_t_l = np.sum([len(a) for a in sn_exit.exited.values()])

In [None]:
plt.bar(["EW","WE", "SN", "NS"],[num_ew_bs+num_ew_ts,num_we_bs+num_we_ts,num_sn,num_ns])
plt.title("Cars Exited by Direction")
plt.ylabel("Number of Cars Exited")
plt.xlabel("Direction")
# savefig("cars_exited_10000.png")

In [None]:
plt.bar(["EW B","EW T","WE B", "WE T", "SN", "NS"],np.array([np.sum([len(a) for a in e.exited.values()]) for e in exits])/runtime*TIMESTEPS_PER_SECOND)
# n_ew_t_s = np.sum([len(a) for a in ew_t_exit.exited.values()])
# n_ew_t_l = np.sum([len(a) for a in sn_exit.exited.values()])

In [None]:
plt.bar(["EW B","EW T","WE B", "WE T", "SN", "NS"],[np.sum([len(a) for a in e.exited.values()]) for e in exits])

In [None]:
traffic_dirname = f"{LEFT_STRAIGHT_RATIO_VERT}_{LEFT_STRAIGHT_RATIO_HORIZ}_{STOPLIGHT_PERIOD}_timestep_period"
os.makedirs(out(traffic_dirname),exist_ok=True)
traffic_dirname

In [None]:
traffic_zones = {}

ew_traffic_zones = {}
for inlane, onramp, name, stoplight in [(EW_TS_IN,ew_b_onramp,"Upper Straight",ew_bs_light), 
                                        (EW_BS_IN,ew_t_onramp,"Lower Straight",ew_ts_light),
                                        (EW_L_IN,ew_b_onramp,"Left",ew_l_light),
                                        (EW_R_IN,ew_b_onramp,"Right",ew_r_light)]:
    start = min(inlane[0],onramp.x)
    end = max(inlane[0],onramp.x)
    ew_traffic_zones[name] = (np.array([[x,inlane[1]] for x in np.arange(start,end)]), stoplight)
    
we_traffic_zones = {}
for inlane, onramp, name, stoplight in [(WE_TS_IN,we_b_onramp,"Upper Straight",we_bs_light), 
                                        (WE_BS_IN,we_t_onramp,"Lower Straight/Right",we_ts_light),
                                        (WE_L_IN,we_b_onramp,"Left",we_l_light)]:
    start = min(inlane[0],onramp.x)
    end = max(inlane[0],onramp.x)
    we_traffic_zones[name] = (np.array([[x,inlane[1]] for x in np.arange(start,end)]), stoplight)


ns_traffic_zones = {}
for inlane, onramp, name, stoplight in [(NS_S_IN,ns_onramp,"Straight",ns_s_light), 
                                        (NS_L_IN,ns_onramp,"Left",ns_l_light),
                                        (NS_R_IN,ns_onramp,"Right",ns_r_light)]:
    start = min(inlane[1],onramp.y)
    end = max(inlane[1],onramp.y)
    ns_traffic_zones[name] = (np.array([[inlane[0],y] for y in np.arange(start,end)]), stoplight)

sn_traffic_zones = {}
for inlane, onramp, name, stoplight in [(SN_S_IN,sn_onramp,"Straight",sn_s_light), 
                                        (SN_L_IN,sn_onramp,"Left",sn_l_light),
                                        (SN_R_IN,sn_onramp,"Right",sn_r_light)]:
    start = min(inlane[1],onramp.y)
    end = max(inlane[1],onramp.y)
    sn_traffic_zones[name] = (np.array([[inlane[0],y] for y in np.arange(start,end)]), stoplight)

traffic_zones["E-W"] = ew_traffic_zones
traffic_zones["W-E"] = we_traffic_zones
traffic_zones["N-S"] = ns_traffic_zones
traffic_zones["S-N"] = sn_traffic_zones

In [None]:
cum_nums = {n:[] for n in traffic_zones.keys()}

for direction_name, traffic_zone in traffic_zones.items():
    fig, axes = plt.subplots(ncols=1, nrows=len(traffic_zone))
    fig.tight_layout()
    for (name, (coordinates_of_interest,s)), ax in zip(traffic_zone.items(),axes): 
        nums = []
        ts = []
        for t, packet in enumerate(w.car_info_packets):
            num_cars = 0
            if len(packet):
                try:
                    car_locs = np.zeros((w.max_y+1,w.max_x+1))
                    car_locs[packet[:,2],packet[:,1]] = 1
                    num_cars = np.sum(car_locs[coordinates_of_interest[:,1],coordinates_of_interest[:,0]])
                except Exception as e:
                    print(packet)
                    print(t)
                    raise e
            nums.append(num_cars)
            ts.append(t / TIMESTEPS_PER_SECOND)
        ax.plot(ts,nums)
        cum_nums[direction_name].append(nums)
        ax.set_title(name,fontsize="small")
        ax.set_ylabel("Cars in stretch")
        ylim=ax.get_ylim()
        going = True

        gos = np.array([0]+s.gos)
        stops = np.array(s.stops)

        if max(s.stops) > max(s.gos):
            gos = np.concatenate([gos,[w.timestep]])
        else:
            stops = np.concatenate([stops,[w.timestep]]) 

        gos = gos/TIMESTEPS_PER_SECOND
        stops = stops/TIMESTEPS_PER_SECOND

        c = np.empty((len(gos)+len(stops),))
        c[0::2] = gos
        c[1::2] = stops
        for i in range(len(c)-1):
            color="tab:green" if going else "tab:red"
            ax.fill_between(x=[c[i],c[i+1]],y1=[ylim[0],ylim[0]],y2=[ylim[1],ylim[1]],color=color,alpha=0.15)
            going = not going

    for ax in axes[:-1]:
        ax.set_xticks([])

    axes[-1].set_xlabel("t (seconds)")
    fig.suptitle(direction_name + " Traffic",y=1.05,fontsize="x-large")
    savefig(join(traffic_dirname,f"{direction_name}_traffic.png"))

## Metrics

In [None]:
metrics = pd.DataFrame(data=cum_nums.keys(),columns=["Direction"])

### Average Queue Length

In [None]:
avgs = []
for direction_name, nums in cum_nums.items():
    n = np.array(nums)
    avgs.append(np.sum(n)/len(n[0]))
metrics["Average Queue Length"] = avgs

In [None]:
metrics

### Average Num Cars Exiting

In [None]:
metrics["Average Cars Exiting Per Second"] = np.array([num_ew_bs+num_ew_ts,num_we_bs+num_we_ts,num_ns,num_sn])/runtime*TIMESTEPS_PER_SECOND
metrics

In [None]:
np.array(list(cum_nums.values())[0]).shape

In [None]:
np.sum(np.array(list(cum_nums.values())[3]),axis=0)

In [None]:
for i in range(4):
    print(list(cum_nums.keys())[i], np.max(np.sum(np.array(list(cum_nums.values())[i]),axis=0)))

In [None]:
i = 0
for i in range(4):
    t = np.arange(runtime)/TIMESTEPS_PER_SECOND
    in_lane = 100*np.sum(np.array(list(cum_nums.values())[i]),axis=0)/capacities[i]
    plt.plot(t,in_lane, label=list(cum_nums.keys())[i])
    xlims = plt.xlim()
    plt.hlines(90,*plt.xlim(),alpha=0.5,color="tab:red",linestyles="dashed")
    # plt.fill_between(t,90,in_lane,where=(in_lane > 90),alpha=0.25,color="tab:red")
    plt.xlim(*xlims)
plt.ylabel("% Capacity")
plt.xlabel("t (seconds)")
plt.legend(ncols=4,loc="lower center")
plt.title("Directional Congestion (Equal)")
plt.xlim(0,600)
savefig(join(traffic_dirname,"directional_congestion.png"))

In [None]:
time_above_capacity = []
for (direction_name, nums), capacity in zip(cum_nums.items(),capacities):
    n = np.sum(np.array(nums),axis=0)
    above_90 = np.where(n > 0.9 * capacity)
    time_above_capacity.append(len(above_90[0])/runtime)
metrics["Congestion Index"] = time_above_capacity
metrics

### Flow Imbalance

In [None]:
metrics["Flow Imbalance"] = [(num_ew_bs+num_ew_ts+num_we_bs+num_we_ts - (num_ns+num_sn)) / np.sum([num_ew_bs,num_ew_ts,num_we_bs,num_we_ts,num_ns,num_sn])] * len(metrics)

In [None]:
metrics

In [None]:
for c in metrics.columns:
    try:
        metrics[c] = np.round(metrics[c],3)
    except:
        pass

metrics.to_csv(out(join(traffic_dirname,"metrics.csv")),index=None)