In [42]:
MPH_TO_MPS = 0.44704

SPACING = 1  # meters
HEADWAY = 1  # seconds, HEADWAY
SPEED = 60 * MPH_TO_MPS  # meters per second

GRANULARITY = 0.01  # base simulation speed in seconds

SIMULATION_TIME = 20  # how long to run the simulation

SEED = 42

In [43]:
assert True  # TODO: ensure valid global constants

In [44]:
import itertools
import math
import random
from dataclasses import dataclass

import matplotlib.pyplot as plt
import pandas as pd
import plotly.express as px
import seaborn as sns
import simpy
from sortedcontainers import SortedSet


In [45]:
class CarCreationFailedError(Exception):
    def __init__(self, message):
        super().__init__(message)


# TODO: if granularity is too large (w.r.t. some other constants?), the assertion breaks
class PositionHandler:
    def __init__(self):
        self.positions = SortedSet()
        self.blockers = SortedSet()

    def add_blocker(self, pos: float):
        assert pos not in self.blockers
        self.blockers.add(pos)

    def remove_blocker(self, pos: float):
        self.blockers.remove(pos)

    def reserve_init(self, pos: float):
        # if reserving this position would break the minimum distance, don't spawn it
        if pos in self.positions or self._check(pos) < SPACING:
            raise CarCreationFailedError("cannot spawn car")

        self._add(pos)

    def _find_next_blocker(self, pos: float) -> float:
        idx: int = self.blockers.bisect_left(pos)

        if idx == len(self.blockers):
            return float("inf")

        return self.blockers[idx]

    def reserve(self, old: float, new: float) -> float:
        # blocker check
        # if the next blocker changes between old/new, that means we passed a blocker
        before: float = self._find_next_blocker(old)
        after: float = self._find_next_blocker(new)

        # if the blocker distances are not close (i.e., they are different blockers),
        # then we set the new target distance to before, where the blocker is since this car cannot go past that
        if not math.isclose(before, after):
            new = before

        # overlapping cars check
        next: float = min(self._check(new), self._check(old))  # position of next car

        # assert next - SPACING >= old  # don't want to make a car go backwards

        if math.isclose(next, new):
            return old
        elif next - new < SPACING:
            new: float = max(next - SPACING, old)

        self._free(old)
        self._add(new)

        return new

    def _add(self, position: float) -> None:
        assert position not in self.positions
        self.positions.add(position)

    def _check(self, position: float) -> float:
        idx: int = self.positions.bisect_left(position) + 1

        if idx >= len(self.positions):
            return float("inf")

        return self.positions[idx]

    def _free(self, position: float) -> None:
        self.positions.remove(position)

traffic light limitations currently:

- assuming an infinitesimally small traffic light (i.e., car passes a traffic light immediately--there is no gap to travel)
    - actually am assuming any obstacles would be like this
- possibly off by GRANULARITY depending on if the traffic light event switch happens before or after the cars at the same time
    - i think if we just start it first it should always have precedence

In [46]:
class TrafficLight:
    id: int = 0

    def __init__(
        self,
        env: simpy.Environment,
        pos: float,
        cycles: tuple[float, ...],
        state: int,
        blocking_states: tuple[bool, ...],
        handler: PositionHandler,
    ):
        self.pos = pos  # where the traffic light is
        self.cycles: tuple[float, ...] = cycles  # list of the delays of each of the cycles
        self.state: bool = state  # initial state -- an index for self.cycles
        self.blocking_states = (
            blocking_states  # list of whether each corresponding cycle is blocking or not
        )

        assert self.state < len(self.cycles) and self.state >= 0
        assert len(blocking_states) == len(cycles)

        self.id = TrafficLight.id
        TrafficLight.id += 1

        self.handler: PositionHandler = handler

        self.env: simpy.Environment = env
        self.action = self.env.process(self.run())

    def run(self):
        while True:
            if self.blocking_states[self.state]:
                self.handler.add_blocker(self.pos)

            yield self.env.timeout(self.cycles[self.state])

            if self.blocking_states[self.state]:
                self.handler.remove_blocker(self.pos)

            self.state = (self.state + 1) % len(self.cycles)

In [47]:
@dataclass
class HistoryEntry:
    id: int
    time: float
    position: float

    def __init__(self, id: int, time: float, position: float):
        self.id = id
        self.time = time
        self.position = position

    def unpack(self) -> tuple[int, float, float]:
        return (self.id, self.time, self.position)

    def get_ordering() -> tuple[str, str, str]:
        return ["id", "time", "position"]


class Car:
    counter: int = 0

    def __init__(
        self,
        env: simpy.Environment,
        positions: PositionHandler,
        inflection_points: list[tuple[float, float]],
        init_pos: int = 0,
    ):
        positions.reserve_init(init_pos)  # assume this always works, error handle outside

        # start position at the given init_pos (0)
        self.pos: float = init_pos
        # set initial and max speed possible
        self.speed: float = SPEED

        # give the current car an id
        self.id: int = Car.counter
        Car.counter += 1

        # variable to track a history of the points that compose the lines that the car traverses in the position-time plot
        self.history: list[HistoryEntry] = [HistoryEntry(self.id, env.now, self.pos)]

        # get a reference to the position tracker
        self.positions: PositionHandler = positions

        # store reference to simpy environment
        self.env: simpy.Environment = env
        # define the action for the simpy environment
        self.action = self.env.process(self.run())

        self.inflection_points = inflection_points

    def update_history(self, id: int, time: float, pos: float) -> None:
        entry = HistoryEntry(id, time, pos)

        if len(self.history) <= 1 or (
            pos == self.history[-1].position and time == self.history[-1].time
        ):
            self.history.append(entry)
            return

        old_slope: float = (self.history[-1].position - self.history[-2].position) / (
            self.history[-1].time - self.history[-2].time
        )
        new_slope: float = (pos - self.history[-1].position) / (time - self.history[-1].time)

        prev_pos: float = self.history[-1].position

        # if id == 0:
        # print(old_slope, new_slope)
        if math.isclose(old_slope, new_slope):
            self.history[-1] = entry
        else:
            # this is the amount of time needed to reach the new position using the nonzero slope
            truncated_time = -1
            if new_slope < old_slope:
                truncated_time = time - (pos - prev_pos) / old_slope
            elif new_slope > old_slope:
                truncated_time = time - (pos - prev_pos) / new_slope

            self.inflection_points.append((pos, truncated_time))

            # want to keep at speeds for as long as possible, don't want middling slopes
            if not math.isclose(truncated_time, time):
                self.history[-1] = HistoryEntry(id, truncated_time, pos)
            self.history.append(entry)

    def run(self):
        while True:
            yield self.env.timeout(GRANULARITY)

            target_pos: float = self.pos + self.speed * GRANULARITY
            actual_pos: float = self.positions.reserve(self.pos, target_pos)

            self.pos = actual_pos
            self.update_history(self.id, self.env.now, actual_pos)

    def __str__(self) -> str:
        return f"Car with id {self.id} at location {self.pos}"

In [48]:
cars = []
lights = []
inflections = []
handler = PositionHandler()


def start_simulation(env: simpy.Environment):
    lights.append(TrafficLight(env, 10, [5, 5], 0, [1, 0], handler))

    yield env.timeout(GRANULARITY)

    while True:
        try:
            c = Car(env, handler, inflections)

            # env.process(c)
            cars.append(c)
        except CarCreationFailedError:
            pass

        # for car in cars:
        #     print("car info", car.pos, car.id)
        # print()

        yield env.timeout(HEADWAY)

In [49]:
random.seed(SEED)
env = simpy.Environment()

env.process(start_simulation(env))
env.run(until=SIMULATION_TIME)

In [50]:
import matplotlib.axes._axes as axes


def display_dt_diagram(cars: list[Car], lights: list[TrafficLight]):
    # fig, ax = plt.subplots(figsize=(10, 7))
    # ax: axes.Axes

    df = pd.DataFrame(
        list(itertools.chain(*[[x.unpack() for x in car.history] for car in cars])),
        columns=HistoryEntry.get_ordering(),
    )

    # sns.lineplot(data=df, x="time", y="position", hue="id", ax=ax, palette=sns.color_palette(n_colors=1))
    # ax.get_legend().remove()

    # ax.hlines([light.pos for light in lights], xmin=ax.get_xlim()[0], xmax=ax.get_xlim()[1], linestyles="dotted", colors="0.8")

    # return fig, ax

    fig = px.line(df, x="time", y="position", color="id")
    fig.update_layout(showlegend=False)

    return fig

In [51]:
cars[2].history

[HistoryEntry(id=2, time=2.01, position=0),
 HistoryEntry(id=2, time=5.731772846575918, position=100),
 HistoryEntry(id=2, time=9.99999999999983, position=100),
 HistoryEntry(id=2, time=19.990000000000325, position=367.9557759999917)]

In [52]:
display_dt_diagram(cars, lights).show()



