# Route Simulation

## Route Network

In [1]:
from typing import List, Dict, Set, Tuple

class Edge(object):
    def __init__(self, length:float, next_stop:Stop):
        self._length = length # Km
        self._next_stop = next_stop
    
    @property
    def length(self) -> int:
        return self._length
    
    @property
    def next_stop(self) -> Stop:
        return self._next_stop
    
    def __str__(self) -> str:
        return f"{self.length} Km to {self.next_stop}"
    
    def __repr__(self) -> str:
        return f"{self.length} Km to {self.next_stop}"

class Stop(object):
    def __init__(self, name:str, edges:Dict[str,Edge]=None, num_chargers:int=0, charger_rating:float=450, stop_time:float):
        self._name = name
        self.edges = edges
        self.charger_rating = charger_rating # KWh
        self.charger_queues = [[] for i in range(num_chargers)]
        self.stop_time = stop_time # s
        
    def shortest_charger_queue(self) -> int:
        shortest = -1
        for i in range(len(self.charger_queues)):
            if shortest == -1 or len(self.charger_queues[i]) < len(self.charger_queues[shortest]):
                shortest = i
        return shortest
    
    def has_charger(self) -> bool:
        return len(self.charger_queues) > 0
    
    def buses_in_queue(self, i:int) -> int:
        return len(self.charger_queues[i])
    
    def add_bus_to_charger_queue(self, i:int, bus:Bus):
        self.charger_queues[i].append(bus)
        bus.is_charging = True
    
    def charge_all_buses(self, timestep:float): # timestamp in seconds
        for charger_queue in self.charger_queues:
            if len(charger_queue) >= 1:
                bus = charger_queue[0] # bus at the front of queue
                bus.battery_charge = min(bus.battery_charge + self.charger_rating * timestep / 3600, bus.battery_capacity) # KWh
                if bus.can_reach_next_stop(): # bus charged enough
                    charger_queue.pop(0) # Remove bus from queue
                    bus.is_charging = False
        
    def __eq__(self, other) -> bool:
        if not isinstance(other, Stop):
            return False
        return self.name == other.name and self.has_charger == other.has_charger
    
    def __hash__(self):
        return hash((self.name, self.has_charger))
    
    @property
    def name(self) -> str:
        return self._name
    
    def __str__(self) -> str:
        return f"{self.name}"
    
    def __repr__(self) -> str:
        return f"{self.name}"
    
class Route(object):
    def __init__(self, name:str):
        self._name = name
        self.stops = dict()
        self.buses = set()
        
    @property
    def name(self) -> str:
        return self._name
    
    def get_other_direction(self, cur_direction) -> str:
        for direction in self.stops:
            if direction != cur_direction:
                return direction
        return "No other direction"
    
    def add_stop(self, stop:Stop, direction:str):
        if direction not in self.stops:
            self.stops[direction] = list()
        self.stops[direction].append(stop)
        
    def add_bus(self, bus:Bus):
        self.buses.add(bus)
    
    def __str__(self) -> str:
        return f"{self.stops}"
    
    def __repr__(self) -> str:
        return f"{self.stops}"
            
class StopNetwork(object):
    def __init__(self):
        self.stops = dict() # Dictionary of Stop name -> Stop object
        self.routes = dict() # Dictionary of Route name -> Route object
        
    def add_edge(self, origin_name:str, dest_name:str, route_name:str, route_direction:str, length:float, origin_num_chargers:int=0, origin_charger_rating:float=450, dest_num_chargers:int=0, dest_charger_rating:float=450):
        # Populate stops dictionary
        if origin_name not in self.stops:
            self.stops[origin_name] = Stop(origin_name, {}, num_chargers=origin_num_chargers, charger_rating=origin_charger_rating)
        if dest_name not in self.stops:
            self.stops[dest_name] = Stop(dest_name, {}, num_chargers=dest_num_chargers, charger_rating=dest_charger_rating)
        self.stops[origin_name].edges[route_direction] = Edge(length, self.stops[dest_name])
        
        # Populate routes dictionary
        if route_name not in self.routes:
            self.routes[route_name] = Route(route_name)
        self.routes[route_name].add_stop(self.stops[origin_name], route_direction)
        
    def add_bus(self, bus_id:int, speed:float, cur_stop_name:str, route_name:str, route_direction:str, battery_capacity:float, battery_charge:float, energy_use_per_km:float):
        new_bus = Bus(bus_id, speed, self.stops[cur_stop_name], self.routes[route_name], route_direction, battery_capacity, battery_charge, energy_use_per_km)
        self.routes[route_name].add_bus(new_bus)
        
    def move_all_buses(self, timestep:float): # timestep in seconds
        for route in self.routes.values():
            for bus in route.buses:
                bus.move(timestep)
                
    def charge_all_buses(self, timestamp:float): # timestamp in seconds
        for stop in self.stops.values():
            stop.charge_all_buses(timestamp)
            
    def log_bus_information(self) -> Dict[int, Tuple[str]]:
        bus_info = list()
        for route in self.routes.values():
            for bus in route.buses:
                bus_info.append([bus.id,
                                 bus.route.name,
                                 bus.cur_stop.edges[bus.route_direction].next_stop,
                                 bus.route_direction,
                                 bus.total_distance_traveled,
                                 bus.total_energy_used,
                                 bus.SOC(),
                                 bus.is_charging])
        return bus_info
        

NameError: name 'Stop' is not defined

## Bus model

In [394]:
class Bus(object):
    def __init__(self, bus_id:int, speed:float, cur_stop:Stop, route:Route, route_direction:str, battery_capacity:float, battery_charge:float, energy_use_per_km:float, is_charging:bool=False):
        self._id = bus_id
        self.speed = speed # Km/h
        self.route = route
        self.cur_stop = cur_stop
        self.route_direction = route_direction
        self.distance_to_next_stop = cur_stop.edges[route_direction].length # Km
        self._battery_capacity = battery_capacity # KWh
        self.is_charging = is_charging
        self.battery_charge = battery_charge #KWh
        self.energy_use_per_km = energy_use_per_km # KWh / Km
        self.total_distance_traveled = 0.0 # Km
        self.total_energy_used = 0.0 # Kwh
        
    def SOC(self) -> float:
        return self.battery_charge / self.battery_capacity
    
    def can_reach_next_stop(self) -> bool:
        return self.battery_charge >= self.distance_to_next_charger() * self.energy_use_per_km
    
    def distance_to_next_charger(self) -> float:
        distance = self.distance_to_next_stop
        direction = self.route_direction
        stop = self.cur_stop.edges[direction].next_stop
        while stop != self.cur_stop and not stop.has_charger():
            direction = self.get_next_direction(stop, direction)
            distance += stop.edges[direction].length
            stop = stop.edges[direction].next_stop
        return distance
    
    def move(self, timestep:float): # timestep in seconds
        if not self.is_charging and self.battery_charge > 0 and self.time_to_leave <= 0: # Bus not charging
            distance_traveled = timestep * self.speed / 3600
            self.distance_to_next_stop -= distance_traveled
            self.total_distance_traveled += distance_traveled
            energy_used = distance_traveled * self.energy_use_per_km
            self.battery_charge = max(self.battery_charge - energy_used, 0) # floor at 0
            self.total_energy_used += energy_used
            if self.distance_to_next_stop <= 0: # arrived at next stop
                self.cur_stop = self.cur_stop.edges[self.route_direction].next_stop # Update stop
                self.time_to_leave = self.cur_stop.stop_time
                self.route_direction = self.get_next_direction(self.cur_stop, self.route_direction)
                self.distance_to_next_stop = self.cur_stop.edges[self.route_direction].length # Update distance to next stop

                shortest_charger_queue_index = self.cur_stop.shortest_charger_queue()
                if shortest_charger_queue_index != -1: # The stop has a charger
                    queue_size = self.cur_stop.buses_in_queue(shortest_charger_queue_index)
                    if queue_size <= 1 or not self.can_reach_next_stop():
                        self.cur_stop.add_bus_to_charger_queue(shortest_charger_queue_index, self)
        else:
            self.time_to_leave -= timestep
                        
    def get_next_direction(self, cur_stop:Stop, cur_direction:str):
        if cur_direction not in cur_stop.edges: # Change direction if end was reached
            return self.route.get_other_direction(cur_direction)
        return cur_direction

    @property
    def id(self) -> int:
        return self._id
    
    @property
    def battery_capacity(self) -> int:
        return self._battery_capacity
    
    def __eq__(self, other) -> bool:
        if not isinstance(other, Bus):
            return False
        return self.id == other.id
    
    def __hash__(self):
        return hash(self.id)
    
    def __str__(self) -> str:
        return f"{self.id} {self.distance_to_next_stop} Km from {self.cur_stop}"
    
    def __repr__(self) -> str:
        return f"{self.id} {self.distance_to_next_stop} Km from {self.cur_stop}"

In [395]:
import csv

class Simulation(object):
    def __init__(self, stop_network:StopNetwork):
        self.stop_network = stop_network
    
    def run(self, total_simulation_time:float, timestep:float, output_file):
        # total_time_in_seconds in hours
        # timestep in seconds
        num_iterations = int(total_simulation_time * 3600 / timestep)
        with open(output_file, 'w') as csv_file:
            csv_writer = csv.writer(csv_file)
            csv_writer.writerow(["ID", "Route", 
                                 "Next stop", "Direction", 
                                 "Total Distance (Km)", "Total Energy (KWh)", 
                                 "SOC", "Is Charging"])
            for i in range(num_iterations):
                bus_info = self.stop_network.log_bus_information()
                csv_writer.writerows(bus_info)
                self.stop_network.move_all_buses(timestep)
                self.stop_network.charge_all_buses(timestep)

## Initialize stop network

In [407]:
stop_network = StopNetwork()

## M14D-SBS

In [3]:
direction = "M14D-SBS to SELECT BUS CHLSEA PIERS 11 AV via 14 ST"
stop_names = [
    "DELANCEY ST/COLUMBIA ST",
    "COLUMBIA ST/RIVINGTON ST",
    "AV D/E HOUSTON ST",
    "AV D/E 5 ST",
    "E 10 ST/AV D",
    "AV C/E 11 ST",
    "E 14 ST/AV C",
    "E 14 ST/AV B",
    "E 14 ST/AV A",
    "E 14 ST/1 AV",
    "E 14 ST/2 AV",
    "E 14 ST/3 AV",
    "E 14 ST/4 AV",
    "E 14 ST/UNION SQ W",
    "W 14 ST/5 AV",
    "W 14 ST/6 AV",
    "W 14 ST/7 AV",
    "W 14 ST/8 AV",
    "W 14 ST/9 AV",
    "W 14 St/10 AV",
    "11 AV/W 15 ST",
    "11 AV / W 17 ST",
    "W 18 ST / 10 AV"
]
distances = [
    0.167,
    0.280,
    0.255,
    0.387,
    0.244,
    0.318,
    0.175,
    0.226,
    0.550,
    0.339,
    0.199,
    0.178,
    0.265,
    0.195,
    0.309,
    0.259,
    0.278,
    0.286,
    0.140,
    0.268,
    0.126,
    0.299
]
for i in range(1, len(stop_names)):
    origin_name = stop_names[i - 1]
    dest_name = stop_names[i]
    length = distances[i - 1]
    stop_network.add_edge(origin_name, dest_name, "M14D-SBS", direction, length)

NameError: name 'stop_network' is not defined

In [409]:
direction2 = "M14D-SBS to SELECT BUS LES DELANCEY-FDR via 14 ST"
stop_names2 = [
    "W 18 ST / 10 AV",
    "9 AV/W 18 ST",
    "W 14 ST/HUDSON ST",
    "W 14 ST/8 AV",
    "W 14 ST/7 AV",
    "W 14 ST/6 AV",
    "E 14 ST/UNIVERSITY PL",
    "E 14 ST/IRVING PL",
    "E 14 ST/3 AV",
    "E 14 ST/2 AV",
    "E 14 ST/1 AV",
    "E 14 ST/AV A",
    "E 14 ST/AV B",
    "E 14 ST/AV C",
    "AV C/E 12 ST",
    "E 10 ST/AV D",
    "AV D/E 6 ST",
    "AV D/E 4 ST",
    "E HOUSTON ST/COLUMBIA ST",
    "E HOUSTON ST/MANGIN ST",
    "F. D. R. DRIVE/555 FDR DR",
    "DELANCEY ST/FDR DR",
    "DELANCEY ST/COLUMBIA ST"
]

distances2 = [
    0.220,
    0.368,
    0.172,
    0.203,
    0.329,
    0.277,
    0.402,
    0.316,
    0.213,
    0.204,
    0.240,
    0.176,
    0.187,
    0.212,
    0.250,
    0.333,
    0.351,
    0.149,
    0.189,
    0.247,
    0.275,
    0.187,
    0.331
]

for i in range(1, len(stop_names2)):
    origin_name = stop_names2[i - 1]
    dest_name = stop_names2[i]
    length = distances2[i - 1]
    stop_network.add_edge(origin_name, dest_name, "M14D-SBS", direction2, length)

In [410]:
bus_id = 4950
speed = 10.33 # Km/h
cur_stop_name = "DELANCEY ST/COLUMBIA ST"
route_name = "M14D-SBS"
route_direction = "M14D-SBS to SELECT BUS CHLSEA PIERS 11 AV via 14 ST"
battery_capacity = 140 # KWh
battery_charge = 140 # KWh
energy_use_per_km = 0.586 # KWh / Km
stop_network.add_bus(bus_id, speed, cur_stop_name, route_name, route_direction, battery_capacity, battery_charge, energy_use_per_km)

In [411]:
bus_id = 4951
speed = 10.33 # Km/h
cur_stop_name = "W 18 ST / 10 AV"
route_name = "M14D-SBS"
route_direction = "M14D-SBS to SELECT BUS LES DELANCEY-FDR via 14 ST"
battery_capacity = 140 # KWh
battery_charge = 140 # KWh
energy_use_per_km = 0.586 # KWh / Km
stop_network.add_bus(bus_id, speed, cur_stop_name, route_name, route_direction, battery_capacity, battery_charge, energy_use_per_km)

In [412]:
bus_id = 4952
speed = 10.33 # Km/h
cur_stop_name = "AV C/E 11 ST"
route_name = "M14D-SBS"
route_direction = "M14D-SBS to SELECT BUS CHLSEA PIERS 11 AV via 14 ST"
battery_capacity = 140 # KWh
battery_charge = 140 # KWh
energy_use_per_km = 0.586 # KWh / Km
stop_network.add_bus(bus_id, speed, cur_stop_name, route_name, route_direction, battery_capacity, battery_charge, energy_use_per_km)

In [413]:
bus_id = 4953
speed = 10.33 # Km/h
cur_stop_name = "W 14 ST/7 AV"
route_name = "M14D-SBS"
route_direction = "M14D-SBS to SELECT BUS LES DELANCEY-FDR via 14 ST"
battery_capacity = 140 # KWh
battery_charge = 140 # KWh
energy_use_per_km = 0.586 # KWh / Km
stop_network.add_bus(bus_id, speed, cur_stop_name, route_name, route_direction, battery_capacity, battery_charge, energy_use_per_km)

In [414]:
bus_id = 4954
speed = 10.33 # Km/h
cur_stop_name = "E 14 ST/3 AV"
route_name = "M14D-SBS"
route_direction = "M14D-SBS to SELECT BUS CHLSEA PIERS 11 AV via 14 ST"
battery_capacity = 140 # KWh
battery_charge = 140 # KWh
energy_use_per_km = 0.586 # KWh / Km
stop_network.add_bus(bus_id, speed, cur_stop_name, route_name, route_direction, battery_capacity, battery_charge, energy_use_per_km)

In [415]:
bus_id = 4955
speed = 10.33 # Km/h
cur_stop_name = "E 14 ST/2 AV"
route_name = "M14D-SBS"
route_direction = "M14D-SBS to SELECT BUS LES DELANCEY-FDR via 14 ST"
battery_capacity = 140 # KWh
battery_charge = 140 # KWh
energy_use_per_km = 0.586 # KWh / Km
stop_network.add_bus(bus_id, speed, cur_stop_name, route_name, route_direction, battery_capacity, battery_charge, energy_use_per_km)

In [416]:
bus_id = 4956
speed = 10.33 # Km/h
cur_stop_name = "W 14 ST/8 AV"
route_name = "M14D-SBS"
route_direction = "M14D-SBS to SELECT BUS CHLSEA PIERS 11 AV via 14 ST"
battery_capacity = 140 # KWh
battery_charge = 140 # KWh
energy_use_per_km = 0.586 # KWh / Km
stop_network.add_bus(bus_id, speed, cur_stop_name, route_name, route_direction, battery_capacity, battery_charge, energy_use_per_km)

In [417]:
simulation = Simulation(stop_network)
simulation.run(48, 10, "test.csv")

In [4]:
# sum(distances) * 0.586
sum(distances)

5.743

In [None]:
stop_network.stops["DELANCEY ST/COLUMBIA ST"].edges