# Multispurstraßenverkehrssimulation
## Multi-Lane Road Traffic Simulation (MuLtiRoTS)

In [34]:
import random
import math
from copy import deepcopy
import matplotlib.pyplot as plt
from matplotlib import cm, animation
import numpy as np
from functools import reduce
from operator import methodcaller, attrgetter, add, concat
from IPython.display import clear_output, Markdown, display, HTML
import time
import json
import os

## Definitions

### iterfrom generator
Iterates over a given container, reordering it to start at a given index.

In [4]:
def iterfrom(container, index, step=1, sizemod=0):
    size = len(container)
    for i in range((0,size+sizemod)[int(step<0)], (0,size+sizemod)[int(step>0)], step):
        yield container[(index+i)%size]

### save and load functions

In [16]:
defaultpath=os.getcwd()

def save(item, filename, path=defaultpath):
    with open(path+filename, "w") as f:
        json.dump(item, f)

def load(filename, path=defaultpath):
    with open(path+filename, "r") as f:
        return json.load(f)

### Car class
Base class for any vehicle. Alterations take the form of subclasses.

In [6]:
class Car():
    #a single car
    #conatains own history
    #conatains all interesting parameters (position, speed, lane, time)
    def __init__(self, ID, pos, roadlength, lane=0, maxvel=5, vel=0, time=0):
        self.ID = ID #custom identifier
        self.l_pos = [pos] #list of past positions
        self.l_totalpos = [pos]
        self.l_vel = [vel] #list of past velocities
        self.l_time = [time] #list of past times
        self.l_lane = [lane] #list of past lanes
        self.skip_times = [] #list of times where car reached the end of road and went back
        self.L = roadlength 
        self.l_vlane = [0]
        self.maxvel = maxvel
    
    def __eq__(self, other):
        if type(other) == type(self):
            return self.__dict__ == other.__dict__
        else:
            return False
    
    def move(self):
        skip, newpos = divmod(self.getpos()+self.getvel(), self.L)
        self.l_pos.append(newpos)
        self.l_totalpos.append(self.gettotalpos()+self.getvel())
        if skip:
            self.skip_times.append(self.gettime())
        self.l_vel.append(self.getvel())
        self.l_time.append(self.gettime()+1)
        self.l_lane.append(self.getnewlane())
        self.l_vlane.append(0)
    
    def update_lane(self, road, index):        
        if self.getlane() < road.lanes-1: #go up
            car_ahead = road.get_nearest(index, self.getlane())
            
            #lambda all the things so they only run if called
            a = lambda : car_ahead != None #is there a car ahead?
            b = lambda : road.distf(car_ahead, self) <= self.getvel()+1 #cannot increase speed in this lane?
            c = lambda : road.rdist(self, index, self.getlane()+1) > road.rdist(self, index, self.getlane()) #is there more space in lane above?
            d = lambda : road.check_lane(self.getpos(), index, self.getlane()+1) #is the space above not crossed by another car in the next step?
            if a() and b() and c() and d():
                self.changelane(1)
                
        if self.getlane() > int(road.buslane): #go down
            car_ahead_below = road.get_nearest(index, self.getlane()-1) #get the car ahead in lane below
            car_behind = road.get_nearest(index, self.getlane(), reverse=True) #get the car behind
            
            #lambda all the things so they only run if called
            a = lambda : (car_behind != None) #is there a car behind?
            b = lambda : (car_behind.getvel() > self.getvel()) #is the car behind faster?
            c = lambda : (self.getpos() in car_behind.getpath()) #is the car behind close?
            d = lambda : (car_ahead_below != None) #is there a car ahead below?
            e = lambda : (car_ahead_below.getvel() >= self.getvel()) #is the car ahead below sufficiently fast?
            f = lambda : (road.distf(car_ahead_below, self) > self.getvel()) #is the car ahead below far enough?
            g = lambda : (road.check_lane(self.getpos(), index, self.getlane()-1)) #is the space below not crossed by another car in the next step?
            if a() and b() and c() and (not d() or e() and f()) and g():
                self.changelane(-1)
    
    def update_vel(self, road, index):
        if road.rdist(self, index, self.getnewlane()) <= self.getvel():
            self.setvel(road.rdist(self, index, self.getnewlane())-1)
        elif road.rdist(self, index, self.getnewlane()) > self.getvel()+1 and self.getvel() < self.maxvel:
            self.setvel(self.getvel()+1)
        if self.getvel() > 0 and random.random() <= road.prob:
            self.setvel(self.getvel()-1)
    
    def update_other(self, **kwargs):
        pass
    
    def changelane(self, diff):
        self.l_vlane[-1] = diff
    
    def getpos(self, i=-1):
        return self.l_pos[i]
    
    def gettotalpos(self, i=-1):
        return self.l_totalpos[i]
    
    def setvel(self, newvel):
        self.l_vel[-1] = newvel
    
    def getvel(self, i=-1):
        return self.l_vel[i]
    
    def gettime(self, i=-1):
        return self.l_time[i]
    
    def getlane(self, i=-1):
        return self.l_lane[i]
    
    def getnewlane(self, i=-1):
        return self.l_lane[i]+self.l_vlane[i]
    
    def getpath(self):
        self.path = [x % self.L for x in range(self.getpos(), self.getpos()+self.getvel()+1)]
        return self.path

### Bus class
Subclass of Car. Slightly modified behaviour.

In [7]:
class Bus(Car):
    def __init__(self, ID, pos, roadlength, lane=0, maxvel=3, vel=0, time=0, stops=dict()):
        super().__init__(ID, pos, roadlength, lane, maxvel, vel, time)
        self.passengers = 0
        self.stops = stops
        self.stopped = None #only use boolean values
        
    def update_other(self, **kwargs):
        if not self.stopped == None:
            self.stops[self.stopped] -= 1
            if self.stops[self.stopped] < 1:
                self.stopped = None
        
    def update_lane(self, road, index):
        if self.getlane() > 0: #go down
            if road.check_lane(self.getpos(), index, self.getlane()-1):
                self.changelane(-1)
    
    def update_vel(self, road, index):
        if not self.stopped:
            if road.rdist(self, index, self.getnewlane()) <= self.getvel():
                self.setvel(road.rdist(self, index, self.getnewlane())-1)
            elif road.rdist(self, index, self.getnewlane()) > self.getvel()+1 and self.getvel() < self.maxvel:
                self.setvel(self.getvel()+1)
            if self.getvel() > 0 and random.random() <= road.prob:
                self.setvel(self.getvel()-1)
            dist = min([self.stopdist(pos) for pos in self.stops if self.stops[pos] >= 1]+[self.L])
            nearest = road.get_nearest(index, 0)
            if dist <= self.getvel():
                self.setvel(dist)
                self.stopped = self.getpos()+dist
            elif type(nearest) == Bus and road.rdist(self, index, 0) == self.getvel()+1 and not nearest.stopped == None:
                self.stopped = nearest.stopped
        else:
            self.setvel(0)
    
    def move(self):
        skip, newpos = divmod(self.getpos()+self.getvel(), self.L)
        self.l_pos.append(newpos)
        self.l_totalpos.append(self.gettotalpos()+self.getvel())
        if skip:
            self.skip_times.append(self.gettime())
        self.l_vel.append(self.getvel())
        self.l_time.append(self.gettime()+1)
        self.l_lane.append(self.getnewlane())
        self.l_vlane.append(0)
    
    def stopdist(self, stop_pos):
        if stop_pos > self.getpos():
            return stop_pos - self.getpos()
        else:
            return stop_pos + self.L - self.getpos()

### Road class
Acts as a container for Car instances. Contains methods for bulk manipulation.

In [73]:
class Road():
    def __init__(self, length, dens, prob, lanes=1, maxvel=[5], stops=0, buses=0, buslane=False, flowpos=0, iterations=0):
        self.L = length #length of rd
        self.dens = dens #density of cars
        self.prob = prob #prob of braking
        self.cars = list()
        self.lanes = lanes #no of lanes
        self.flow = [0]*self.lanes
        self.stops = dict([(i*int(self.L/stops) + int(self.L/(2*stops)), 0) for i in range(stops)])
        self.buslane = buslane
        self.max_vel = max(maxvel)
        self.iterations = 0
        self.flowiterations = 0
        self.flowpos = flowpos
        
        positions=[divmod(i, self.lanes) for i in range(self.L*self.lanes)]
        for i in range(int(self.L*self.lanes*self.dens)): #insert cars
            pos = positions.pop(random.randint(0, len(positions)-1))
            self.cars.append(Car(len(self.cars), pos[0], self.L, pos[1], maxvel=maxvel[len(self.cars)%len(maxvel)]))
        for i in range(buses):
            pos = positions.pop(random.randint(0, len(positions)-1))
            self.cars.append(Bus(len(self.cars), pos[0], self.L, 0, stops=self.stops))
        self.update()
        self.cars = sorted(self.cars, key=lambda c: c.getpos())
        self.iterate(iterations)
        
    def __str__(self):
        return str([c.getvel() for c in self.cars])
    
    def iterate(self, n=1):
        self.iterations += n
        for i in range(n):
            self.move()
            self.update()
    
    def flowiterate(self, n=1):
        self.iterations += n
        self.flowiterations += n
        for i in range(n):
            self.move()
            self.update()
            self.update_flow(self.flowpos)
    
    def avspeed(self, start=1000, end=None):
        all_speeds = reduce(concat, (l for l in map(lambda c: c.l_vel[start:end], self.cars)))
        return sum(all_speeds)/len(all_speeds)
    
    def get_nearest(self, index, lane, reverse=False):
        direction = (1,-1)[reverse]
        for c in iterfrom(self.cars, index+int(-direction==1), -direction, -1): #check opposite direction in list, small chance of car having equal pos in test lane
            if c.getpos() != self.cars[index].getpos():
                break
            elif c.getnewlane()==lane:
                return c
        for c in iterfrom(self.cars, index+int(direction==1), direction, -1):
            if c.getnewlane()==lane:
                return c
        return None
    
    def check_lane(self, pos, index, lane):
        for c in iterfrom(self.cars, index, -1, -1): #check bacwards in list. Stop when cars are further back than pos-maxvel
            if c.getpos() < pos-self.max_vel:
                break
            if c.getnewlane() == lane and pos in c.getpath():
                return False #if car crosses in front
        for c in iterfrom(self.cars, index+1, 1, -1): #check forwards in list. Only check while cars have same pos. 
            if c.getpos() == pos:
                if c.getnewlane() == lane:
                    return False #if car crosses in front
            else:
                break
        return True
    
    def update_flow(self, pos):
        for c in self.cars:
            self.flow[c.getlane()] += int(pos in c.getpath() and pos != c.getpos())
    
    def distf(self, c_to, c_from): #returns distance between two cars
        try:
            if c_to.getpos() > c_from.getpos():
                return c_to.getpos() - c_from.getpos()
            else:
                return c_to.getpos() + self.L - c_from.getpos()
        except AttributeError:
            return self.L
    
    def rdist(self, car, index, lane): #distance to the nearest car ahead
        return self.distf(self.get_nearest(index, lane), car)
    
    def ldist(self, car, index, lane): #distance from nearest car behind
        return self.distf(car, self.get_nearest(index, lane, reverse=True))
    
    def update_old(self):
        while True:
            self.prev_cars = deepcopy(self.cars)
            for i, car in enumerate(self.cars):
                car.update_lane(self, i) #give function a copy of road
            if all(map(lambda c1,c2: c1.getnewlane()==c2.getnewlane(), self.cars, self.prev_cars)):
                #check if lanes haven't changed
                break
        for i, car in enumerate(self.cars):
            car.update_other()
            car.update_vel(self, i)
        for k in self.stops:
            self.stops[k] += 0.2
        self.update_flow(0)
    
    def update(self):
        self.cars = sorted(self.cars, key=methodcaller("getpos"))
        for i, car in enumerate(self.cars):
            car.update_lane(self, i)
        for i, car in enumerate(self.cars):
            car.update_other()
            car.update_vel(self, i)
        for k in self.stops:
            self.stops[k] += 0.2
    
    def move(self):
        for car in self.cars:
            car.move()
    
    def show(self, filler="."):
        for l in range(self.lanes):
            self.D = dict([(c.getpos(), c.getvel()) for c in self.cars if c.getnewlane() == l])
            print(*[self.D.get(i, filler) for i in range(self.L)],sep="",end="\n")
    
    def getflow(self, time_average=True, lane_average=False):
        if lane_average:
            return sum(self.getflow(time_average))/self.lanes
        elif time_average:
            return list(map(lambda f: f/self.flowiterations, self.flow))
        else:
            return self.flow
    
    def printinfo(self):
        print("ACTUAL DENSITY:", len(self.cars)/(self.L*self.lanes))
        for l in range(self.lanes):
            print("Flow in lane {}: {} cars per time step".format(l, self.flow[l]/self.flowiterations))
        print("Total flow: {} cars per time step".format(sum(self.flow)/self.flowiterations))
        print("Average flow: {} cars per time step per lane\n".format(sum(self.flow)/self.flowiterations/self.lanes))

### Junction class
Controls behaviour of multiple roads to simulate a junction.

In [9]:
class Junction():    
    def __init__(self, num_lanes=(2,2), green_time=(20,20), acceptance=[1,1,1,1], spawn_rate=[1,1,1,1], maxvel=5, iterations=0, **road_kwargs):
        self.num_lanes=num_lanes
        self.green_time=green_time
        self.iterations=0
        self.green_road=0
        self.green_road_list=[0]
        self.maxvel=maxvel
        
        accept_prob=lambda road, out: acceptance[out%4]/(sum(acceptance)-acceptance[road%4])
        middle=lambda road, isleft: (0, accept_prob(road, road+2)/(num_lanes[road%2]-1)*spawn_rate[road], 0)
        side  =lambda road, isleft: (accept_prob(road, road+1)*isleft*spawn_rate[road], accept_prob(road, road+2)/2/(num_lanes[road%2]-1)*spawn_rate[road], accept_prob(road,road+3)*(not isleft)*spawn_rate[road])
        single=lambda road, isleft: (accept_prob(road, road+1)*spawn_rate[road], accept_prob(road, road+2)*spawn_rate[road], accept_prob(road, road+3)*spawn_rate[road])
        select=lambda road, lane: ((single,),(side,middle))[num_lanes[road%2] > 1][lane not in (0, num_lanes[road%2]-1)](road, not lane)
        
        self.spawn_probs=dict((n, [select(n, l) for l in range(num_lanes[n%2])]) for n in range(4)) #going clockwise
        
        road_kwargs.pop("lanes", None)
        self.out_roads=[OutRoad(**road_kwargs, maxvel=self.maxvel, lanes=num_lanes[i%2]) for i in range(4)]
        self.in_roads=[InRoad(self.out_roads, **road_kwargs, maxvel=self.maxvel, lanes=num_lanes[i%2]) for i in range(4)]
        
        self.iterate(iterations)
    
    def iterate(self, num):
        for i in range(num):
            self.iterations+=1
            self.green_road=2*((self.iterations//sum(self.green_time))%2)+int(self.iterations%sum(self.green_time)>=self.green_time[0])
            self.green_road_list.append(self.green_road)
            for n, In in enumerate(self.in_roads):
                In.green = n==self.green_road
                for l in range(In.lanes):
                    RAND = random.random()
                    if RAND <= sum(self.spawn_probs[n][l]):
                        ID = int("%d%d" % (self.green_road, self.iterations))
                        out = (n+1+(RAND>self.spawn_probs[n][l][0])+(RAND>sum(self.spawn_probs[n][l][:2])))%4
                        In.spawn(JuncCar(out, ID, 0, roadlength=In.L, lane=l, maxvel=self.maxvel, vel=self.maxvel, time=self.iterations))
                In.iterate()
            for n, Out in enumerate(self.out_roads):
                Out.iterate()
    
    def printinfo(self):
        for n, r in enumerate(self.in_roads):
            print("Flow on in road {}: {} cars per time step".format(n, sum(r.flow)/self.flowiterations))
        inflow = sum(sum(f) for f in map(attrgetter("flow"), self.in_roads))
        
        print()
        
        for n, r in enumerate(self.out_roads):
            print("Flow on out road {}: {} cars per time step".format(n, sum(r.flow)/self.flowiterations))
        outflow = sum(sum(f) for f in map(attrgetter("flow"), self.out_roads))
        
        print("\nTotal flow in: {} cars per time step".format(inflow/self.flowiterations))
        print("Total flow out: {} cars per time step".format(outflow/self.flowiterations))
            
            

### JuncCar class
Subclass of Car. Modified for use in junctions.

In [10]:
class JuncCar(Car):
    def __init__(self, out_road, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.out_road = out_road
    
    def getvispos(self, i=0):
        if self.l_time[0] <= i <= self.l_time[-1]:
            t = i-self.l_time[0]
            return self.l_totalpos[t]
        else:
            return None
    
    def getvislane(self, i=0):
        if self.l_time[0] <= i <= self.l_time[-1]:
            t = i-self.l_time[0]
            return self.l_lane[t]+self.l_vlane[t]
        else:
            return None
    
    def move(self):
        newpos = self.getpos()+self.getvel()
        skip=0
        self.l_pos.append(newpos)
        self.l_totalpos.append(self.gettotalpos()+self.getvel())
        if skip:
            self.skip_times.append(self.gettime())
        self.l_vel.append(self.getvel())
        self.l_time.append(self.gettime()+1)
        self.l_lane.append(self.getnewlane())
        self.l_vlane.append(0)
    
    def reset(self, max_spawnvel):
        self.l_pos = [0] #list of past positions
        self.l_totalpos = [0]
        self.l_vel = [min(self.getvel(), max_spawnvel)] #list of past velocities
        self.l_time = [self.gettime()] #list of past times
        self.l_lane = [0] #list of past lanes
        self.skip_times = [] #list of times where car reached the end of road and went back
        self.l_vlane = [0]

### InRoad class
Subclass of Road. Overwrites methods to use as incoming road in a junction.

In [11]:
class InRoad(Road):
    def __init__(self, out_roads, length, prob, lanes=1, maxvel=5, stops=0, buses=0, buslane=False, iterations=0):
        self.L = length #length of rd
        self.prob = prob #prob of braking
        self.cars = list()
        self.anicars = list() #junction road specific, stores deep copies of all cars that have been on road
        self.lanes = lanes #no of lanes
        self.flow = [0]*self.lanes
        self.flowpos = 0
        self.stops = dict([(i*int(self.L/stops) + int(self.L/(2*stops)), 0) for i in range(stops)])
        self.buslane = buslane
        self.max_vel = maxvel
        self.iterations = 0
        self.flowiterations = 0
        self.out_roads = out_roads
        
        self.endpos=self.L
        self.green=False
        self.lightpos=self.endpos
        
    def get_nearest(self, index, lane, reverse=False):
        direction = (1,-1)[reverse]
        for c in self.cars[index-direction::-direction]: #check opposite direction in list, small chance of car having equal pos in test lane
            if c.getpos() != self.cars[index].getpos():
                break
            elif c.getnewlane()==lane:
                return c
        for c in self.cars[index+direction::direction]:
            if c.getnewlane()==lane:
                return c
        return None
    
    def transfer(self, car):
        success=self.out_roads[car.out_road].transfer(deepcopy(car))
        if success:
            self.anicars.append(deepcopy(car))
            return True
        else:
            return False
    
    def move(self):
        hitlist=list()
        for i, car in enumerate(self.cars):
            car.move()
            if car.getpos() == self.endpos:
                success = self.transfer(car)
                if success:
                    hitlist.append(i-len(hitlist))
        for target in hitlist:
            del self.cars[target]
        
    def spawn(self, car):
        self.cars = sorted(self.cars, key=methodcaller("getpos"))
        for c in self.cars:
            if c.getpos() > 0:
                break
            elif c.getlane() == car.getlane():
                return False
        self.cars = [car]+self.cars
        car.update_vel(self, 0)
        return True
    
    def update(self):
        self.cars = sorted(self.cars, key=methodcaller("getpos"))
        for i, car in enumerate(self.cars):
            car.update_other()
            car.update_vel(self, i)
        for k in self.stops:
            self.stops[k] += 0.2
        self.update_flow(0)
        
    def rdist(self, car, index, lane): #distance to the nearest car ahead
        if self.green or car.getpos() >= self.lightpos:
            return min(self.distf(self.get_nearest(index, lane), car), self.endpos-car.getpos()+1)
        else:
            return min(self.distf(self.get_nearest(index, lane), car), self.lightpos-car.getpos())

### OutRoad class
Subclass of Road. Overwrites methods to use as outgoing road in a junction.

In [12]:
class OutRoad(Road):
    def __init__(self, length, prob, lanes=1, maxvel=5, stops=0, buses=0, buslane=False, iterations=0):
        self.L = length #length of rd
        self.prob = prob #prob of braking
        self.cars = list()
        self.anicars = list() #junction road specific, stores deep copies of all cars that have been on road
        self.lanes = lanes #no of lanes
        self.flow = [0]*self.lanes
        self.flowpos = self.L
        self.stops = dict([(i*int(self.L/stops) + int(self.L/(2*stops)), 0) for i in range(stops)])
        self.buslane = buslane
        self.max_vel = maxvel
        self.iterations = 0
        self.flowiterations = 0
        self.max_spawnvel = 1
        
    def update(self):
        self.cars = sorted(self.cars, key=methodcaller("getpos"))
        for i, car in enumerate(self.cars):
            car.update_lane(self, i)
        for i, car in enumerate(self.cars):
            car.update_other()
            car.update_vel(self, i)
        for k in self.stops:
            self.stops[k] += 0.2
        self.update_flow(0)
    
    def transfer(self, car):
        dist, lane = self.bestdistlane()
        if dist > 0:
            car.reset(self.max_spawnvel)
            self.cars = [car]+self.cars
            car.update_lane(self, 0)
            car.update_vel(self, 0)
            return True
        else:
            return False
    
    def move(self):
        hitlist=list()
        for i, car in enumerate(self.cars):
            car.move()
            if car.getpos() > self.L:
                self.anicars.append(deepcopy(car))
                hitlist.append(i-len(hitlist))
        for target in hitlist:
            del self.cars[target]
    
    def get_nearest(self, index, lane, reverse=False):
        direction = (1,-1)[reverse]
        for c in self.cars[index-direction::-direction]: #check opposite direction in list, small chance of car having equal pos in test lane
            if c.getpos() != self.cars[index].getpos():
                break
            elif c.getnewlane()==lane:
                return c
        for c in self.cars[index+direction::direction]:
            if c.getnewlane()==lane:
                return c
        return None
        
    def rdist(self, car, index, lane): #distance to the nearest car ahead
        return self.distf(self.get_nearest(index, lane), car)
    
    def distf(self, c_to, c_from): #returns distance between two cars
        try:
            if c_to.getpos() > c_from.getpos():
                return c_to.getpos() - c_from.getpos()
            else:
                return self.L
        except AttributeError:
            return self.L
        
    def bestdistlane(self):
        lanes={i for i in range(self.lanes)}
        for c in self.cars:
            lanes.discard(c.getlane())
            if len(lanes) == 0:
                return c.getpos(), c.getnewlane()
        return self.L, lanes.pop()

### Animate class
Assists with animating multiple road instances simultaneously. Only animates as far as the road with the lowest iterations.

In [13]:
class Animate():
    def __init__(self, *roads, step_rate=2, interps=None, axes_size=(12,0.5), invert_xaxis=(), invert_yaxis=(), ticks=True, **kwargs):
        if interps == None:
            self.interps = math.ceil(20/step_rate)
        else:
            self.interps = interps
        self.interval = 1000/step_rate/self.interps #time between updates in ms
        self.roads = roads
        
        laneslist=[r.lanes for r in self.roads]
        figsize = (axes_size[0], axes_size[1]*sum(laneslist))
        
        self.fig, self.axes = plt.subplots(len(self.roads), gridspec_kw={"height_ratios":laneslist}, figsize = figsize, squeeze=False)
        
        self.positions=list()
        self.colours=list()
        self.scatters=list()
        
        for (ax,), r, n in zip(self.axes, self.roads, range(len(self.roads))):
            if ticks:
                ax.set_yticks([i for i in range(r.lanes)])
            else:
                ax.set_yticks([])
                ax.set_xticks([])
            ax.minorticks_off()
            ax.patch.set_facecolor("lightgrey")
            ax.set_xlim((0,r.L))
            ax.set_ylim((-0.5,r.lanes-0.5))
            
            if n in invert_xaxis:
                ax.invert_xaxis()
            if n in invert_yaxis:
                ax.invert_yaxis()
            
            for x in r.stops:
                ax.plot([x,x],[-0.5,r.lanes-0.5],"r",zorder=5)
            for y in range(r.lanes):
                ax.plot([0,r.L],[y+0.5, y+0.5],"w--",zorder=1,linewidth=2)
                
            xs = [[(x1+(x2-x1)/self.interps*i) % r.L for x1, x2 in map(lambda c: (c.gettotalpos(t), c.gettotalpos(t+1)), r.cars)] for t, i in map(lambda x: divmod(x,self.interps), range((r.iterations-1)*self.interps))]
            ys = [[y1+(y2-y1)/self.interps*i for y1, y2 in map(lambda c: (c.getnewlane(t), c.getnewlane(t+1)), r.cars)] for t, i in map(lambda x: divmod(x,self.interps), range((r.iterations-1)*self.interps))]
            cols = [i for i in map(lambda c: c.maxvel, r.cars)]
            self.positions.append([xs, ys])
            self.colours.append(cols)
            self.scatters.append(ax.scatter(xs[0], ys[0], zorder=10, marker = "s", c=cols, s=[50-5*i if i <= 8 else 10 for i in cols]))
        
        #plt.tight_layout()
        
        num_iterations = min(r.iterations for r in self.roads)*self.interps
        self.ani = animation.FuncAnimation(self.fig, self.animate, np.arange(num_iterations),
                              interval=self.interval, blit=True, **kwargs)
        
        plt.show()
    
    def animate(self, i):
        for points, (xs, ys) in zip(self.scatters, self.positions):
            points.set_offsets([(x,y) for x, y in zip(xs[i], ys[i])])
        return self.scatters

    def init(self):
        for points, (xs, ys) in zip(self.scatters, self.positions):
            points.set_offsets(np.ma.array((xs[0], ys[0]), mask=True))
        return self.scatters

### JuncAnimate class
Animates a single junction.

In [14]:
class JuncAnimate():
    def __init__(self, junction, step_rate=2, interps=None, axes_size=(12,0.5), invert_xaxis=(), invert_yaxis=(), ticks=True, **kwargs):
        if interps == None:
            self.interps = math.ceil(20/step_rate)
        else:
            self.interps = interps
        self.interval = 1000/step_rate/self.interps #time between updates in ms
        self.roads = junction.in_roads + junction.out_roads
        self.green_roads = junction.green_road_list
        
        laneslist=[r.lanes for r in self.roads]
        figsize = (axes_size[0], axes_size[1]*sum(laneslist))
        
        self.fig, self.axes = plt.subplots(len(self.roads), gridspec_kw={"height_ratios":laneslist}, figsize = figsize, sharex=True, squeeze=False)
        self.counter = self.fig.text(0,0,"#0, 0")
        
        self.positions=list()
        self.colours=list()
        self.scatters=list()
        self.lights=list()
        
        for (ax,), r, n in zip(self.axes, self.roads, range(len(self.roads))):
            if ticks:
                ax.set_yticks([i for i in range(r.lanes)])
            else:
                ax.set_yticks([])
                ax.set_xticks([])
            ax.minorticks_off()
            ax.patch.set_facecolor("lightgrey")
            ax.set_xlim((0,r.L))
            ax.set_ylim((-0.5,r.lanes-0.5))
            
            if n in invert_xaxis:
                ax.invert_xaxis()
            if n in invert_yaxis:
                ax.invert_yaxis()
            
            try:
                light, = ax.plot([r.lightpos]*2,[-0.5,r.lanes-0.5],"r",zorder=5,linewidth=10)
                self.lights.append(light)
            except AttributeError:
                pass
            
            for y in range(r.lanes):
                ax.plot([0,r.L],[y+0.5, y+0.5],"w--",zorder=1, linewidth=2)
                
            xs = [[x1+(x2-x1)/self.interps*i for x1, x2 in map(lambda c: (c.getvispos(t), c.getvispos(t+1)), r.cars+r.anicars) if None not in (x1, x2)] for t, i in map(lambda x: divmod(x,self.interps), range((r.iterations-1)*self.interps))]
            ys = [[y1+(y2-y1)/self.interps*i for y1, y2 in map(lambda c: (c.getvislane(t), c.getvislane(t+1)), r.cars+r.anicars) if None not in (y1, y2)] for t, i in map(lambda x: divmod(x,self.interps), range((r.iterations-1)*self.interps))]
            self.positions.append([xs, ys])
            self.scatters.append(ax.scatter(xs[0], ys[0], zorder=10, marker="s", s=30))
        
        #plt.tight_layout()
        
        num_iterations = junction.iterations*self.interps
        self.ani = animation.FuncAnimation(self.fig, self.animate, np.arange(num_iterations),
                              interval=self.interval, blit=True, **kwargs)
        
        plt.show()
    
    def animate(self, i):
        for points, (xs, ys) in zip(self.scatters, self.positions):
            points.set_offsets([(x,y) for x, y in zip(xs[i], ys[i])])
        for n, light in enumerate(self.lights):
            light.set_color(("r","g")[n==self.green_roads[i//self.interps]])
        self.counter.set_text("#%d, %d" % (i, i//self.interps))
        return self.scatters, self.lights

    def init(self):
        for points, (xs, ys) in zip(self.scatters, self.positions):
            points.set_offsets(np.ma.array((xs[0], ys[0]), mask=True))
        for n, light in enumerate(self.lights):
            light.set_color("rg"[n==self.green_roads[0]])
        self.counter.set_text("#0, 0")
        return self.scatters, self.lights

## Generating Road instances & analysing flow
### Road instances

#### Flow against density scatter plot with mean flow line

In [None]:
scatterx=[]
scattery=[]

for dens in np.arange(0, 1, 0.01):
    r=Road(100, dens, prob=0.5, iterations=1000)
    r.flowiterate(100)
    scatterx += [dens]
    scattery += [r.getflow(lane_average=True)]

print("SCATTER POINTS DONE")

In [None]:
linex=[]
liney=[]

for dens in np.arange(0, 0.71, 0.01):
    r=Road(10000, dens, prob=0.5, iterations=1000)
    r.flowiterate(2000)
    clear_output(); print(dens)
    linex += [dens]
    liney += [r.getflow(lane_average=True)]

In [None]:
#save
save((linex, liney, scatterx, scattery), "mean_flow_scatter.txt")

#### Mean flow against density for multiple braking probabilities

In [None]:
linexy=dict((prob,([],[])) for prob in np.arange(0,1,0.2))

for prob in linexy:
    for dens in np.arange(0, 0.81, 0.01):
        r=Road(1000, dens, prob, iterations=1000)
        r.flowiterate(1000)
        linexy[prob][0].append(dens)
        linexy[prob][1].append(r.getflow(lane_average=True))

print("DONE")

In [None]:
#save
save(linexy, "mean_flow_braking.txt")

#### Time-averaged speed for varying density

In [None]:
linex=[]
liney=[]

for dens in np.arange(0.01, 1, 0.01):
    r=Road(1000, dens, prob=0.5, iterations=2000)
    linex += [dens]
    liney += [r.avspeed(start=1000)]

print("LINE POINTS DONE")

In [80]:
#save
save((linex, liney), "mean_vel_scatter.txt")

### Credits

In [None]:
credits=["Credits",
         "Programmers - Mihnea Clejan",
         "            - Richard Flyng",
         "Traffic Laws Consultants - Richard Flyng",
         "                           Mihnea Clejan",
         "Statistics Consultants - Richard Flyng",
         "                       - Mihnea Clejan",
         "Functionality Designers - Mihnea Clejan",
         "                          Richard Flyng"]
l=len(credits)-4
for i in range(l+len(credits)+1):
    print(*["" for n in range(l-i)],*credits[int(i-l>0)*(i-l):i],*["" for n in range(i-len(credits))],sep="\n");time.sleep(0.5);clear_output(wait=True)