In [1]:
# the TestEnv environment is used to simply simulate the network
from flow.envs import TestEnv

# the Experiment class is used for running simulations
from flow.core.experiment import Experiment

# the base network class
from flow.networks import Network
from flow.envs import Env

# all other imports are standard
from flow.core.params import VehicleParams
from flow.core.params import NetParams
from flow.core.params import InitialConfig
from flow.core.params import EnvParams
from flow.core.params import TrafficLightParams
from flow.controllers import IDMController
from flow.core.params import SumoCarFollowingParams

# create some default parameters parameters
HORIZON=2000
env_params = EnvParams(horizon=HORIZON)
initial_config = InitialConfig()

In [2]:
le_dir = "/home/valentin/Schreibtisch/personal_sumo_files"

In [3]:
from flow.core.params import SumoParams

sim_params = SumoParams(render=True, sim_step=1, restart_instance=True)

In [4]:
vehicles=VehicleParams()

In [5]:
from flow.core.params import InFlows

inflow = InFlows()

inflow.add(veh_type="human",
           edge="right_east",
           probability=0.08)
inflow.add(veh_type="human",
           edge="right_south",
           probability=0.08)
inflow.add(veh_type="human",
           edge="right_north",
           probability=0.08)
inflow.add(veh_type="human",
           edge="left_north",
           probability=0.08)
inflow.add(veh_type="human",
           edge="left_south",
           probability=0.08)
inflow.add(veh_type="human",
           edge="left_west",
           probability=0.08)

In [6]:
inflow.get()

[{'name': 'flow_0',
  'vtype': 'human',
  'edge': 'right_east',
  'departLane': 'first',
  'departSpeed': 0,
  'begin': 1,
  'end': 86400,
  'probability': 0.08},
 {'name': 'flow_1',
  'vtype': 'human',
  'edge': 'right_south',
  'departLane': 'first',
  'departSpeed': 0,
  'begin': 1,
  'end': 86400,
  'probability': 0.08},
 {'name': 'flow_2',
  'vtype': 'human',
  'edge': 'right_north',
  'departLane': 'first',
  'departSpeed': 0,
  'begin': 1,
  'end': 86400,
  'probability': 0.08},
 {'name': 'flow_3',
  'vtype': 'human',
  'edge': 'left_north',
  'departLane': 'first',
  'departSpeed': 0,
  'begin': 1,
  'end': 86400,
  'probability': 0.08},
 {'name': 'flow_4',
  'vtype': 'human',
  'edge': 'left_south',
  'departLane': 'first',
  'departSpeed': 0,
  'begin': 1,
  'end': 86400,
  'probability': 0.08},
 {'name': 'flow_5',
  'vtype': 'human',
  'edge': 'left_west',
  'departLane': 'first',
  'departSpeed': 0,
  'begin': 1,
  'end': 86400,
  'probability': 0.08}]

In [7]:
import os

net_params = NetParams(
    inflows=inflow,
    template={
        # network geometry features
        "net": os.path.join(le_dir, "lemgo_small.net.xml"),
        # features associated with the properties of drivers
        "vtype": os.path.join(le_dir, "vtypes.add.xml"),
        # features associated with the routes vehicles take
        "rou": os.path.join(le_dir, "lemgo_small2_out.rou.xml"),
        "det": os.path.join(le_dir, "lemgo_small.add.xml")
    }
)

## Create custom network with lane area detectors

#### 3.2.3 Running the Modified Simulation

Finally, the fully imported simulation can be run as follows. 

**Warning**: the network takes time to initialize while the departure positions and times and vehicles are specified.

In [8]:
# create the network
network = Network(
    name="template",
    net_params=net_params,
    vehicles=vehicles
)

# create the environment
env = TestEnv(
    env_params=env_params,
    sim_params=sim_params,
    network=network
)

# run the simulation for 100000 steps
exp = Experiment(env=env)
#_ = exp.run(1, 2000)

In [9]:
# This is the custom environment
# Needs to be important in order to work properly in flow
from flow.envs.simple_env import SimpleEnv
env_name = SimpleEnv

In [10]:
# Creating flow_params. Make sure the dictionary keys are as specified. 
flow_params = dict(
    # name of the experiment
    exp_tag="first_exp",
    # name of the flow environment the experiment is running on
    env_name=env_name,
    # name of the network class the experiment uses
    network=Network,
    # simulator that is used by the experiment
    simulator='traci',
    # sumo-related parameters (see flow.core.params.SumoParams)
    sim=sim_params,
    # environment related parameters (see flow.core.params.EnvParams)
    env=env_params,
    # network-related parameters (see flow.core.params.NetParams and
    # the network's documentation or ADDITIONAL_NET_PARAMS component)
    net=net_params,
    # vehicles to be placed in the network at the start of a rollout 
    # (see flow.core.vehicles.Vehicles)
    veh=VehicleParams(),
    # (optional) parameters affecting the positioning of vehicles upon 
    # initialization/reset (see flow.core.params.InitialConfig)
    initial=initial_config
)

In [11]:
import json
import random

import ray
try:
    from ray.rllib.agents.agent import get_agent_class
except ImportError:
    from ray.rllib.agents.registry import get_agent_class
from ray.tune import run_experiments, run
from ray.tune.experiment import Experiment
from ray.tune.registry import register_env

from flow.utils.registry import make_create_env
from flow.utils.rllib import FlowParamsEncoder

from ray.tune.schedulers import PopulationBasedTraining

Instructions for updating:
non-resource variables are not supported in the long term


In [14]:
# number of parallel workers
N_CPUS = 1
# number of rollouts per training iteration
N_ROLLOUTS = 1

ray.init(num_cpus=N_CPUS, object_store_memory=1000000000)

2020-04-01 16:12:06,603	INFO node.py:498 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2020-04-01_16-12-06_601887_13297/logs.
2020-04-01 16:12:06,729	INFO services.py:409 -- Waiting for redis server at 127.0.0.1:38580 to respond...
2020-04-01 16:12:06,925	INFO services.py:409 -- Waiting for redis server at 127.0.0.1:50392 to respond...
2020-04-01 16:12:06,936	INFO services.py:809 -- Starting Redis shard with 1.65 GB max memory.
2020-04-01 16:12:07,064	INFO node.py:512 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2020-04-01_16-12-06_601887_13297/logs.
2020-04-01 16:12:07,071	INFO services.py:1475 -- Starting the Plasma object store with 1.0 GB memory using /dev/shm.


{'node_ip_address': '192.168.2.105',
 'redis_address': '192.168.2.105:38580',
 'object_store_address': '/tmp/ray/session_2020-04-01_16-12-06_601887_13297/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-04-01_16-12-06_601887_13297/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2020-04-01_16-12-06_601887_13297'}

In [15]:
def explore(config):
    # ensure we collect enough timesteps to do sgd
    if config["train_batch_size"] < config["sgd_minibatch_size"] * 2:
        config["train_batch_size"] = config["sgd_minibatch_size"] * 2
    # ensure we run at least one sgd iter
    if config["num_sgd_iter"] < 1:
        config["num_sgd_iter"] = 1
    return config

In [16]:
pbt = PopulationBasedTraining(
        time_attr="time_total_s",
        metric="episode_reward_mean",
        mode="max",
        perturbation_interval=4,
        resample_probability=0.25,
        # Specifies the mutations of these hyperparams
        hyperparam_mutations={
            "lambda": lambda: random.uniform(0.9, 1.0),
            "vf_clip_param": lambda: random.uniform(20000, 50000),
            "lr": [5e-2, 1e-3, 5e-4, 1e-4, 5e-5, 1e-5],
            "sgd_minibatch_size": lambda: random.randint(128, 16384),
            "train_batch_size": lambda: random.randint(N_CPUS*HORIZON, 2*N_CPUS*HORIZON),
        },
        custom_explore_fn=explore)

In [17]:
# The algorithm or model to train. This may refer to "
#      "the name of a built-on algorithm (e.g. RLLib's DQN "
#      "or PPO), or a user-defined trainable function or "
# #      "class registered in the tune registry.")
alg_run = "DQN"

agent_cls = get_agent_class(alg_run)
config = agent_cls._default_config.copy()
config["num_workers"] = 0  # number of parallel workers
# config["num_envs_per_worker"] = 1  # number of parallel workers
config["num_gpus"] = 0
config["lr"] = 1e-3
# config["v_max"] = 0
# config["v_min"] = -50000
config["train_batch_size"] = 128  # batch size
config["sample_batch_size"] = 16  # batch size
config["gamma"] = 0.999  # discount rate
config["model"].update({"fcnet_hiddens": [128]})  # size of hidden layers in network
config["log_level"] = "DEBUG"
config["horizon"] = HORIZON  # rollout horizon
config["timesteps_per_iteration"] = HORIZON  

# save the flow params for replay
flow_json = json.dumps(flow_params, cls=FlowParamsEncoder, sort_keys=True,
                       indent=4)  # generating a string version of flow_params
config['env_config']['flow_params'] = flow_json  # adding the flow_params to config dict
config['env_config']['run'] = alg_run

# Call the utility function make_create_env to be able to 
# register the Flow env for this experiment
create_env, gym_name = make_create_env(params=flow_params, version=0)

config["env"] = gym_name
# Register as rllib env with Gym
register_env(gym_name, create_env)

In [18]:
exp = Experiment(flow_params["exp_tag"], **{
        "run": alg_run,
        "config": {
            **config
        },
        "checkpoint_freq": 5,  # number of iterations between checkpoints
        "checkpoint_at_end": True,  # generate a checkpoint at the end
        "max_failures": 5,
        "stop": {  # stopping conditions
            "training_iteration": 100,  # 222number of iterations to stop after
        },
        "num_samples": 1})

In [19]:
trials = run_experiments(exp)

2020-04-01 16:12:14,181	INFO trial_runner.py:176 -- Starting a new experiment.
2020-04-01 16:12:14,216	ERROR log_sync.py:34 -- Log sync requires cluster to be setup with `ray up`.


== Status ==
Using FIFO scheduling algorithm.
Resources requested: 0/1 CPUs, 0/0 GPUs
Memory usage on this node: 5.8/8.2 GB

== Status ==
Using FIFO scheduling algorithm.
Resources requested: 1/1 CPUs, 0/0 GPUs
Memory usage on this node: 5.8/8.2 GB
Result logdir: /home/valentin/ray_results/first_exp
Number of trials: 1 ({'RUNNING': 1})
RUNNING trials:
 - DQN_SimpleEnv-v0_0:	RUNNING

[2m[36m(pid=13373)[0m Instructions for updating:
[2m[36m(pid=13373)[0m non-resource variables are not supported in the long term
[2m[36m(pid=13373)[0m Loading configuration... done.
[2m[36m(pid=13373)[0m 2020-04-01 16:12:21,814	INFO rollout_worker.py:319 -- Creating policy evaluation worker 0 on CPU (please ignore any CUDA init errors)
[2m[36m(pid=13373)[0m 2020-04-01 16:12:21,815	DEBUG worker_set.py:135 -- Creating TF session {'intra_op_parallelism_threads': 8, 'inter_op_parallelism_threads': 8, 'gpu_options': {'allow_growth': True}, 'log_device_placement': False, 'device_count': {'CPU': 1},

[2m[36m(pid=13373)[0m 2020-04-01 16:12:22,921	DEBUG tf_policy.py:214 -- These tensors were used in the loss_fn:
[2m[36m(pid=13373)[0m 
[2m[36m(pid=13373)[0m { 'actions': <tf.Tensor 'default_policy/actions:0' shape=(?,) dtype=int64>,
[2m[36m(pid=13373)[0m   'dones': <tf.Tensor 'default_policy/dones:0' shape=(?,) dtype=bool>,
[2m[36m(pid=13373)[0m   'new_obs': <tf.Tensor 'default_policy/new_obs:0' shape=(?, 16) dtype=float32>,
[2m[36m(pid=13373)[0m   'obs': <tf.Tensor 'default_policy/observation:0' shape=(?, 16) dtype=float32>,
[2m[36m(pid=13373)[0m   'rewards': <tf.Tensor 'default_policy/rewards:0' shape=(?,) dtype=float32>,
[2m[36m(pid=13373)[0m   'weights': <tf.Tensor 'default_policy/weights:0' shape=(?,) dtype=float32>}
[2m[36m(pid=13373)[0m 
[2m[36m(pid=13373)[0m 2020-04-01 16:12:23,113	DEBUG simple_q_policy.py:60 -- Update target op <tf.Variable 'default_policy/target_q_func/action_value/hidden_0/kernel:0' shape=(128, 256) dtype=float32_ref>
[2m[36m(

[2m[36m(pid=13373)[0m 2020-04-01 16:13:01,951	INFO rollout_worker.py:575 -- Training on concatenated sample batches:
[2m[36m(pid=13373)[0m 
[2m[36m(pid=13373)[0m { 'count': 128,
[2m[36m(pid=13373)[0m   'policy_batches': { 'default_policy': { 'data': { 'actions': np.ndarray((128,), dtype=int64, min=0.0, max=3.0, mean=1.633),
[2m[36m(pid=13373)[0m                                                     'batch_indexes': np.ndarray((128,), dtype=int64, min=0.0, max=1019.0, mean=527.594),
[2m[36m(pid=13373)[0m                                                     'dones': np.ndarray((128,), dtype=bool, min=0.0, max=0.0, mean=0.0),
[2m[36m(pid=13373)[0m                                                     'new_obs': np.ndarray((128, 16), dtype=int64, min=0.0, max=8.0, mean=1.41),
[2m[36m(pid=13373)[0m                                                     'obs': np.ndarray((128, 16), dtype=int64, min=0.0, max=8.0, mean=1.402),
[2m[36m(pid=13373)[0m                           

[2m[36m(pid=13373)[0m Loading configuration... done.
Result for DQN_SimpleEnv-v0_0:
  custom_metrics: {}
  date: 2020-04-01_16-14-06
  done: false
  episode_len_mean: 2000.0
  episode_reward_max: -46534.0
  episode_reward_mean: -46680.5
  episode_reward_min: -46827.0
  episodes_this_iter: 1
  episodes_total: 2
  experiment_id: c6130e264f8c4bbca020274295c2db49
  hostname: valentin-Aspire-V3-372
  info:
    grad_time_ms: 17.01
    learner:
      default_policy:
        cur_lr: 0.0010000000474974513
        max_q: -85.00012969970703
        mean_q: -136.07464599609375
        mean_td_error: -6.990324020385742
        min_q: -139.55943298339844
        model: {}
    max_exploration: 0.804
    min_exploration: 0.804
    num_steps_sampled: 4000
    num_steps_trained: 23936
    num_target_updates: 7
    opt_peak_throughput: 7524.986
    opt_samples: 128.0
    replay_time_ms: 12.744
    sample_time_ms: 496.903
    update_time_ms: 0.003
  iterations_since_restore: 2
  node_ip: 192.168.2.105


2020-04-01 16:15:05,679	ERROR trial_runner.py:550 -- Error processing event.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/ray/tune/trial_runner.py", line 498, in _process_trial
    result = self.trial_executor.fetch_result(trial)
  File "/usr/local/lib/python3.6/dist-packages/ray/tune/ray_trial_executor.py", line 342, in fetch_result
    result = ray.get(trial_future[0])
  File "/usr/local/lib/python3.6/dist-packages/ray/worker.py", line 2247, in get
    raise value
ray.exceptions.RayTaskError: [36mray_DQN:train()[39m (pid=13373, host=valentin-Aspire-V3-372)
  File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/trainer.py", line 372, in train
    raise e
  File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/trainer.py", line 358, in train
    result = Trainable.train(self)
  File "/usr/local/lib/python3.6/dist-packages/ray/tune/trainable.py", line 171, in train
    result = self._train()
  File "/usr/local/lib/python3.6/dist-pa

== Status ==
Using FIFO scheduling algorithm.
Resources requested: 1/1 CPUs, 0/0 GPUs
Memory usage on this node: 6.1/8.2 GB
Result logdir: /home/valentin/ray_results/first_exp
Number of trials: 1 ({'RUNNING': 1})
RUNNING trials:
 - DQN_SimpleEnv-v0_0:	RUNNING, 1 failures: /home/valentin/ray_results/first_exp/DQN_SimpleEnv-v0_0_2020-04-01_16-12-14nh2e_956/error_2020-04-01_16-15-05.txt, [1 CPUs, 0 GPUs], [pid=13373], 142 s, 3 iter, 6000 ts, -4.67e+04 rew

[2m[36m(pid=13497)[0m Instructions for updating:
[2m[36m(pid=13497)[0m non-resource variables are not supported in the long term
[2m[36m(pid=13497)[0m Loading configuration... done.


2020-04-01 16:15:14,925	ERROR worker.py:1616 -- print_logs: Error 111 connecting to 192.168.2.105:38580. Connection refused.
2020-04-01 16:15:14,926	ERROR worker.py:1716 -- listen_error_messages_raylet: Error 111 connecting to 192.168.2.105:38580. Connection refused.
2020-04-01 16:15:14,926	ERROR import_thread.py:89 -- ImportThread: Error 111 connecting to 192.168.2.105:38580. Connection refused.


KeyboardInterrupt: 