In [1]:
import sys

sys.path.insert(0, "/Users/maxdumas/cornell/spec-project/flow/")

In [2]:
import flow
from flow.networks.ring import RingNetwork
from flow.controllers.car_following_models import IDMController
from flow.controllers.routing_controllers import ContinuousRouter
from flow.envs.ring.lane_change_accel import LaneChangeAccelEnv, ADDITIONAL_ENV_PARAMS
from flow.core.experiment import Experiment
from flow.networks.ring import ADDITIONAL_NET_PARAMS
from flow.core.params import (
    NetParams,
    EnvParams,
    SumoParams,
    TrafficLightParams,
    InitialConfig,
    VehicleParams,
)


In [3]:
from flow.controllers.base_lane_changing_controller import BaseLaneChangeController

# We want the ambulance to always prefer the left lane, so we create this lane-changing controller that makes them always enter the left lane
class LeftLaneController(BaseLaneChangeController):
    """A lane-changing model used to move vehicles into the left lane."""

    def get_lane_change_action(self, env):
        current_lane = env.k.vehicle.get_lane(self.veh_id)
        if current_lane < 4:
            return 1
        else:
            return 0


# We want to have vehicles merge to the right if possible when they detect that
# the ambulance is coming up behind them.
class YieldToEmergencyVehicleLaneChangeController(BaseLaneChangeController):
    def get_lane_change_action(self, env):
        env.k.vehicle.update(False)
        current_lane = env.k.vehicle.get_lane(self.veh_id)
        # If any follower is an ambulance, merge right
        follower_ids = env.k.vehicle.get_lane_followers(self.veh_id)
        if any(follower_id.startswith("ambulance") for follower_id in follower_ids):
            return -1
        else:
            return 0


In [4]:
class YieldToEmergencyVehicleController(IDMController):
    def get_accel(self, env):
        env.k.vehicle.update(False )
        current_lane = env.k.vehicle.get_lane(self.veh_id)
        v = env.k.vehicle.get_speed(self.veh_id)
        # If any follower is an ambulance, slow down.
        follower_ids = env.k.vehicle.get_lane_followers(self.veh_id)
        for follower_id in follower_ids:
            if follower_id.startswith("ambulance"):
                ambulance_lane = env.k.vehicle.get_lane(follower_id)
                if ambulance_lane == current_lane:
                    return IDMController.get_accel(self, env)
                else:
                    return v * 0.99

        return IDMController.get_accel(self, env)


In [5]:
vehicles = VehicleParams()
vehicles.add(
    "ambulance",
    acceleration_controller=(IDMController, {}),
    lane_change_controller=(LeftLaneController, {}),
    routing_controller=(ContinuousRouter, {}),
    num_vehicles=1,
    color="red",
)
vehicles.add(
    "human",
    acceleration_controller=(YieldToEmergencyVehicleController, {}),
    lane_change_controller=(YieldToEmergencyVehicleLaneChangeController, {"lane_change_params": {"min_gap": 0.0}}),
    routing_controller=(ContinuousRouter, {}),
    num_vehicles=22,
)


In [6]:
exp = Experiment(
    dict(
        exp_tag="ambulance",
        env_name=LaneChangeAccelEnv,
        network=RingNetwork,
        simulator="traci",
        sim=SumoParams(sim_step=0.1, render=True),
        env=EnvParams(horizon=3000, additional_params=ADDITIONAL_ENV_PARAMS),
        net=NetParams(additional_params={**ADDITIONAL_NET_PARAMS, "lanes": 4}),
        veh=vehicles,
        initial=InitialConfig(spacing="uniform", perturbation=1),
        tls=TrafficLightParams(),
    )
)

# run the sumo simulation
_ = exp.run(1)


INITIALIZING traci


FatalTraCIError: connection closed by SUMO