In [1]:
import multiprocessing as mp 
import random
import numpy as np
import osmnx as ox

np.random.seed(63)

bounding_box = (-118.45905139825884,34.05810307269331, -118.43480879942018,34.07964233309008 )
G = ox.graph.graph_from_bbox(bounding_box, network_type='drive')
Gp = ox.projection.project_graph(G)

In [7]:
class Map:
    def __init__(self, intersections, roads):
        self.intersections = intersections
        self.roads = roads

    def update(self):
        for road in self.roads:
            road.update()

class Intersection:
    def __init__(self, roads=[]):
        self.roads = roads # in ccw order ?
        self.terminations = 0

        self.get_out_roads()

    def add_road(self, road):
        self.roads.append(road)
        self.get_out_roads()

    def get_out_roads(self):
        self.out_roads = []
        for road in self.roads:
            if road.get_start() == self:
                self.out_roads.append(road)

    def route(self, in_road, num_cars, direction=None):
        if direction == None: # if direction is not None
            
            if in_road.num_cars >= num_cars:
                in_road.remove_cars(num_cars)
            else:
                in_road.num_cars = 0


            if len(self.out_roads) != 0:
                choice = random.choice(self.out_roads)
                choice.add_cars(num_cars)# choose random road
            else:
                self.terminations += num_cars
                
            # else cars just disappear




class Road:
    def __init__(self, dt, start, end, num_cars, speed_limit, length, lanes, name):
        self.dt = dt # size of time step
        self.start = start # starting intersection
        self.end = end # ending intersection
        self.num_cars = num_cars # number of cars
        self.speed_limit = speed_limit
        self.length = length # length of road
        self.lanes = lanes # number of lanes
        self.name = name
        
        self.connect()
        self.speed = 0
        self.update_speed()

    def connect(self):
        self.start.add_road(self)
        self.end.add_road(self)

    def update_speed(self):
        if self.num_cars > 0:
            self.speed =  self.speed_limit / self.num_cars
        else:
            self.speed = self.speed_limit
    
    def get_start(self):
        return self.start
    
    def update(self):
        self.end.route(in_road = self, num_cars = (self.num_cars * self.speed * self.dt)/self.length) 

        bar = ""
        for i in range(int(self.num_cars)):
            bar += "#"

        print(self.name, bar)
        # send number of cars into intersection based on uniform density of cars

    def add_cars(self, num_cars):
        self.num_cars += num_cars
    def remove_cars(self, num_cars):
        self.num_cars -= num_cars

In [5]:
nodes, edges = ox.graph_to_gdfs(G)

In [8]:
intersections = {}
for node_id, node_data in nodes.iterrows():
    intersections[node_id] = Intersection()

# Create a list to store your Road objects
roads = []
for edge_id, edge_data in edges.iterrows():
    start_node_id, end_node_id, _ = edge_id
    start_intersection = intersections[start_node_id]
    end_intersection = intersections[end_node_id]
    
    # Extract road properties from the edge data
    length = edge_data['length']  # Length of the road in meters
    speed_limit = edge_data['maxspeed']  # Speed limit (may need parsing)
    lanes = edge_data['lanes']  # Number of lanes (may need parsing)
    name = edge_data['name']  # Name of the road
    
    # Create a Road object
    road = Road(
        dt=1,  # Time step size (you can adjust this)
        start=start_intersection,
        end=end_intersection,
        num_cars=0,  # Initial number of cars (you can adjust this)
        speed_limit=speed_limit,
        length=length,
        lanes=lanes,
        name=name
    )
    roads.append(road)

# Create a Map object
traffic_map = Map(intersections=list(intersections.values()), roads=roads)

In [17]:
traffic_map.roads[2].length

116.61106576517156