# TODO

* find the varid horizon

In [1]:
import gym, pickle, argparse, json, logging
from itertools import product
from copy import deepcopy
import tensorflow as tf
import ray

from ray import tune
from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph
from ray.rllib.agents.ppo.ppo import DEFAULT_CONFIG
from ray.rllib.agents import Trainer
from ray.rllib.evaluation import PolicyEvaluator, SampleBatch
from ray.rllib.evaluation.metrics import collect_metrics
from ray.tune.registry import register_env
from ray.rllib.optimizers.rollout import collect_samples
from ray.rllib.utils.annotations import override

from flow.multiagent_envs import MultiWaveAttenuationPOEnv
from flow.utils.registry import make_create_env
from flow.utils.rllib import FlowParamsEncoder, get_flow_params

In [2]:
num_cpus = 3
num_rollouts = 3
horizon = 750
gae_lambda = 0.97
step_size = 5e-4
num_iter = 10
benchmark_name = "multi_merge"
exp_name = "test_ir"

In [3]:
ray.init(num_cpus=num_cpus, logging_level=40, ignore_reinit_error=True)

{'node_ip_address': '169.237.32.118',
 'object_store_address': '/tmp/ray/session_2019-05-25_09-04-36_12576/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-05-25_09-04-36_12576/sockets/raylet',
 'redis_address': '169.237.32.118:50356',
 'webui_url': None}

In [4]:
config = deepcopy(DEFAULT_CONFIG)
config["num_workers"] = min(num_cpus, num_rollouts)
config["train_batch_size"] = horizon * num_rollouts
config["sample_batch_size"] = horizon / 2
config["use_gae"] = True
config["horizon"] = horizon
config["lambda"] = gae_lambda
config["lr"] = step_size
config["vf_clip_param"] = 1e6
config["num_sgd_iter"] = 10
config['clip_actions'] = False  # FIXME(ev) temporary ray bug
config["model"]["fcnet_hiddens"] = [128, 64, 32]
config["observation_filter"] = "NoFilter"
config["entropy_coeff"] = 0.0

benchmark = __import__(
            "flow.benchmarks.%s" % benchmark_name, fromlist=["flow_params"])
flow_params = benchmark.buffered_obs_flow_params

# save the flow params for replay
flow_json = json.dumps(
    flow_params, cls=FlowParamsEncoder, sort_keys=True, indent=4)
config['env_config']['flow_params'] = flow_json

In [5]:
create_env, env_name = make_create_env(params=flow_params, version=0)
register_env(env_name, create_env)
env = create_env()

default_policy = (PPOPolicyGraph, env.observation_space, env.action_space, {})
policy_graph = {"default_policy": default_policy}
config["multiagent"] = {
        'policy_graphs': policy_graph,
        'policy_mapping_fn': tune.function(lambda agent_id: "default_policy")
    }

In [6]:
class GailTrainer(Trainer):
    _name = "GAIL"
    _default_config = DEFAULT_CONFIG
    _policy_graph = PPOPolicyGraph
    
    @override(Trainer)
    def _init(self, config, env_creator):
        self.local_evaluator = self.make_local_evaluator(
             env_creator, self._policy_graph)        
        self.remote_evaluators = self.make_remote_evaluators(
            env_creator, self._policy_graph, config["num_workers"])
        
        self.sample_batch_size = config["sample_batch_size"]
        self.num_envs_per_worker = config["num_envs_per_worker"]
        self.train_batch_size = config["train_batch_size"]
        self.num_sgd_iter = config["num_sgd_iter"]
        self.sgd_minibatch_size = config["sgd_minibatch_size"]
        
    @override(Trainer)    
    def _train(self):
        weights = ray.put(self.local_evaluator.get_weights())
        for e in self.remote_evaluators:
            e.set_weights.remote(weights)       
        
        # collect samples
        samples = collect_samples(
            self.remote_evaluators, self.sample_batch_size,
            self.num_envs_per_worker, self.train_batch_size)
        
        samples.shuffle()
        
        print("sample finished")
        # training
        for _ in range(self.num_sgd_iter):
            for i in range(0, samples.count, self.sgd_minibatch_size):
                minibatch = samples.slice(i, i+self.sgd_minibatch_size)
                self.local_evaluator.learn_on_batch(minibatch)
        return collect_metrics(remote_evaluators=self.remote_evaluators)


In [7]:
agent = GailTrainer(config, env_name)

2019-05-25 09:05:53,064	INFO policy_evaluator.py:311 -- Creating policy evaluation worker 0 on CPU (please ignore any CUDA init errors)
  "Converting sparse IndexedSlices to a dense Tensor of unknown shape. "
2019-05-25 09:05:54,471	INFO policy_evaluator.py:728 -- Built policy map: {'default_policy': <ray.rllib.agents.ppo.ppo_policy_graph.PPOPolicyGraph object at 0x7f562d0bbb00>}
2019-05-25 09:05:54,472	INFO policy_evaluator.py:729 -- Built preprocessor map: {'default_policy': <ray.rllib.models.preprocessors.NoPreprocessor object at 0x7f562d0bb7f0>}
2019-05-25 09:05:54,473	INFO policy_evaluator.py:343 -- Built filter map: {'default_policy': <ray.rllib.utils.filter.NoFilter object at 0x7f562e153080>}


In [8]:
agent._train()

[2m[36m(pid=12613)[0m Loading configuration... done.
[2m[36m(pid=12613)[0m Success.
[2m[36m(pid=12616)[0m Loading configuration... done.
[2m[36m(pid=12616)[0m Success.
[2m[36m(pid=12613)[0m Loading configuration... done.
[2m[36m(pid=12616)[0m Loading configuration... done.
[2m[36m(pid=12615)[0m Loading configuration... done.
[2m[36m(pid=12615)[0m Success.
[2m[36m(pid=12615)[0m Loading configuration... done.
[2m[36m(pid=12613)[0m 2019-05-25 09:06:05,738	INFO policy_evaluator.py:311 -- Creating policy evaluation worker 2 on CPU (please ignore any CUDA init errors)
[2m[36m(pid=12613)[0m 2019-05-25 09:06:05.740473: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 AVX512F FMA
[2m[36m(pid=12616)[0m 2019-05-25 09:06:05,787	INFO policy_evaluator.py:311 -- Creating policy evaluation worker 3 on CPU (please ignore any CUDA init errors)
[2m[36m(pid=12616)[0m 2019-05-

[2m[36m(pid=12615)[0m 2019-05-25 09:06:13,975	INFO policy_evaluator.py:474 -- Completed sample batch:
[2m[36m(pid=12615)[0m 
[2m[36m(pid=12615)[0m { 'data': { 'action_prob': np.ndarray((999,), dtype=float32, min=0.002, max=0.399, mean=0.283),
[2m[36m(pid=12615)[0m             'actions': np.ndarray((999, 1), dtype=float32, min=-3.269, max=3.312, mean=0.003),
[2m[36m(pid=12615)[0m             'advantages': np.ndarray((999,), dtype=float32, min=-21.889, max=-0.322, mean=-15.139),
[2m[36m(pid=12615)[0m             'agent_index': np.ndarray((999,), dtype=int64, min=0.0, max=4.0, mean=2.355),
[2m[36m(pid=12615)[0m             'behaviour_logits': np.ndarray((999, 2), dtype=float32, min=-0.003, max=0.005, mean=0.001),
[2m[36m(pid=12615)[0m             'dones': np.ndarray((999,), dtype=bool, min=0.0, max=1.0, mean=0.006),
[2m[36m(pid=12615)[0m             'eps_id': np.ndarray((999,), dtype=int64, min=847589202.0, max=1973462552.0, mean=1844984512.06),
[2m[36m(pid=12

2019-05-25 09:06:18,538	INFO policy_evaluator.py:564 -- Training on concatenated sample batches:

{ 'data': { 'action_prob': np.ndarray((128,), dtype=float32, min=0.011, max=0.399, mean=0.287),
            'actions': np.ndarray((128, 1), dtype=float32, min=-2.356, max=2.692, mean=0.157),
            'advantages': np.ndarray((128,), dtype=float32, min=-22.787, max=-0.587, mean=-15.674),
            'agent_index': np.ndarray((128,), dtype=int64, min=0.0, max=4.0, mean=2.367),
            'behaviour_logits': np.ndarray((128, 2), dtype=float32, min=-0.004, max=0.004, mean=0.001),
            'dones': np.ndarray((128,), dtype=bool, min=0.0, max=1.0, mean=0.008),
            'eps_id': np.ndarray((128,), dtype=int64, min=499316056.0, max=1973462552.0, mean=1221446586.5),
            'infos': np.ndarray((128,), dtype=object, head={'outflow': 634.6153846153845, 'mean_vel': 9.988621121364885, 'cost1': 0.3819637238971747, 'cost2': 0.0}),
            'new_obs': np.ndarray((128, 12), dtype=float32,

sample finished


2019-05-25 09:06:18,898	INFO policy_evaluator.py:586 -- Training output:

{ 'learner_stats': { 'cur_kl_coeff': 0.2,
                     'cur_lr': 0.0005000000237487257,
                     'entropy': 1.419771,
                     'kl': 0.0,
                     'model': {},
                     'policy_loss': 15.674466,
                     'total_loss': 305.69653,
                     'vf_explained_var': 0.00040441751,
                     'vf_loss': 290.0221}}



{'custom_metrics': {},
 'episode_len_mean': 325.0,
 'episode_reward_max': -435.3691063038351,
 'episode_reward_mean': -590.9078521917835,
 'episode_reward_min': -662.9207913183753,
 'episodes_this_iter': 4,
 'num_metric_batches_dropped': 0,
 'off_policy_estimator': {},
 'policy_reward_mean': {},
 'sampler_perf': {'mean_env_wait_ms': 11.232646075107828,
  'mean_inference_ms': 0.9979232888193101,
  'mean_processing_ms': 4.376337145797848}}

In [7]:
tune.run_experiments({
    exp_name: {
        "run": GailTrainer,
        "env": env_name,
        "checkpoint_freq": 5,
        "max_failures": 999,
        "num_samples": 1,
        "stop": {
            "training_iteration": num_iter
        },
        "config": config
    }   
})

== Status ==
Using FIFO scheduling algorithm.
Resources requested: 0/3 CPUs, 0/1 GPUs
Memory usage on this node: 1.7/33.4 GB

== Status ==
Using FIFO scheduling algorithm.
Resources requested: 3/3 CPUs, 0/1 GPUs
Memory usage on this node: 1.7/33.4 GB
Result logdir: /headless/ray_results/test_ir
Number of trials: 1 ({'RUNNING': 1})
RUNNING trials:
 - GailTrainer_MultiWaveAttenuationMergePOEnvBufferedObs-v0_0:	RUNNING



2019-05-25 06:36:55,542	ERROR trial_runner.py:494 -- Error processing event.
Traceback (most recent call last):
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/trial_runner.py", line 443, in _process_trial
    result = self.trial_executor.fetch_result(trial)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/ray_trial_executor.py", line 315, in fetch_result
    result = ray.get(trial_future[0])
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/worker.py", line 2193, in get
    raise value
ray.exceptions.RayTaskError: [36mray_GailTrainer:train()[39m (pid=8953, host=kronos)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/rllib/agents/trainer.py", line 293, in __init__
    Trainable.__init__(self, config, logger_creator)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/trainable.py", line 88, in __init__
    self._setup(copy.deepcopy(self.config))
  File "/opt/conda/envs/flow-lat

== Status ==
Using FIFO scheduling algorithm.
Resources requested: 3/3 CPUs, 0/1 GPUs
Memory usage on this node: 1.9/33.4 GB
Result logdir: /headless/ray_results/test_ir
Number of trials: 1 ({'RUNNING': 1})
RUNNING trials:
 - GailTrainer_MultiWaveAttenuationMergePOEnvBufferedObs-v0_0:	RUNNING, 1 failures: /headless/ray_results/test_ir/GailTrainer_MultiWaveAttenuationMergePOEnvBufferedObs-v0_0_2019-05-25_06-36-45y51s7ud9/error_2019-05-25_06-36-55.txt



2019-05-25 06:37:05,623	ERROR trial_runner.py:494 -- Error processing event.
Traceback (most recent call last):
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/trial_runner.py", line 443, in _process_trial
    result = self.trial_executor.fetch_result(trial)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/ray_trial_executor.py", line 315, in fetch_result
    result = ray.get(trial_future[0])
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/worker.py", line 2193, in get
    raise value
ray.exceptions.RayTaskError: [36mray_GailTrainer:train()[39m (pid=8952, host=kronos)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/rllib/agents/trainer.py", line 293, in __init__
    Trainable.__init__(self, config, logger_creator)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/trainable.py", line 88, in __init__
    self._setup(copy.deepcopy(self.config))
  File "/opt/conda/envs/flow-lat

== Status ==
Using FIFO scheduling algorithm.
Resources requested: 3/3 CPUs, 0/1 GPUs
Memory usage on this node: 1.9/33.4 GB
Result logdir: /headless/ray_results/test_ir
Number of trials: 1 ({'RUNNING': 1})
RUNNING trials:
 - GailTrainer_MultiWaveAttenuationMergePOEnvBufferedObs-v0_0:	RUNNING, 2 failures: /headless/ray_results/test_ir/GailTrainer_MultiWaveAttenuationMergePOEnvBufferedObs-v0_0_2019-05-25_06-36-45y51s7ud9/error_2019-05-25_06-37-05.txt



2019-05-25 06:37:15,741	ERROR trial_runner.py:494 -- Error processing event.
Traceback (most recent call last):
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/trial_runner.py", line 443, in _process_trial
    result = self.trial_executor.fetch_result(trial)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/ray_trial_executor.py", line 315, in fetch_result
    result = ray.get(trial_future[0])
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/worker.py", line 2193, in get
    raise value
ray.exceptions.RayTaskError: [36mray_GailTrainer:train()[39m (pid=8949, host=kronos)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/rllib/agents/trainer.py", line 293, in __init__
    Trainable.__init__(self, config, logger_creator)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/ray/tune/trainable.py", line 88, in __init__
    self._setup(copy.deepcopy(self.config))
  File "/opt/conda/envs/flow-lat

== Status ==
Using FIFO scheduling algorithm.
Resources requested: 3/3 CPUs, 0/1 GPUs
Memory usage on this node: 1.9/33.4 GB
Result logdir: /headless/ray_results/test_ir
Number of trials: 1 ({'RUNNING': 1})
RUNNING trials:
 - GailTrainer_MultiWaveAttenuationMergePOEnvBufferedObs-v0_0:	RUNNING, 3 failures: /headless/ray_results/test_ir/GailTrainer_MultiWaveAttenuationMergePOEnvBufferedObs-v0_0_2019-05-25_06-36-45y51s7ud9/error_2019-05-25_06-37-15.txt



Exception in thread ray_print_logs:
Traceback (most recent call last):
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/redis/connection.py", line 177, in _read_from_socket
    raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
OSError: Connection closed by server.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/redis/client.py", line 2408, in _execute
    return command(*args)
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/redis/connection.py", line 624, in read_response
    response = self._parser.read_response()
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/redis/connection.py", line 284, in read_response
    response = self._buffer.readline()
  File "/opt/conda/envs/flow-latest/lib/python3.5/site-packages/redis/connection.py", line 216, in readline
    self._read_from_socket()
  File "/opt/conda/envs/flow-latest/l

KeyboardInterrupt: 