-
Notifications
You must be signed in to change notification settings - Fork 698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Example training tf/agents PPO via tfjob #159
Conversation
What was done - py/example_runner - py/example_runner script to make running examples more accessible - uses code from TF on GKE notebook and py/ to build container, submit job to existing cluster, and poll progress - the python code for creating a cluster does not work for me - the way job configs are templated by example_runner and the way containers are built and passed parameters here is an example of how it can be done but one of the benefits of TfJob is that it isn't prescriptive at this level - what it isn't - not meant as a production quality entrypoint to ML on kubernetes in general - including a top-level runner in tf/k8s raises issues about scope and relates to #150 - examples/agents - preliminary demonstration of a model for how to train tf/agents models using TfJob - currently hyperparameters specified in python form in the same place as model code - it seems to make sense to have these provided as a separate config for hptuning - has not yet been tested with accelerators (my quota is zero) - the example is not distributed. many RL applications seem better suited to MPI than using tensorflow for distribution. - haven't yet clarified on the best way to generate a visualization from a model trained in this way.
Hi @cwbeitel. Thanks for your PR. I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Ok finally sorted out cla thing and simplified the PR (removing a bunch of files changed given my settings on isort and autoflake8?). I'm guessing before considering the first version of the example finished we'll want to extend the example to be distributed, to use accelerators, and to obtain a gif of the result of a trained agent. Two issues are blocking, the gpu quota and discussing with @jlewi and @danijar what we want to try in terms of distribution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
examples/agents/README.md
Outdated
One approach to run the example is to use the [example runner](https://github.com/tensorflow/k8s/tree/master/py/k8s) to (1) build and deploy the model container and (2) construct, submit, and monitor the job, for example as follows: | ||
|
||
```bash | ||
k8s --mode submit --build_path ${SCRIPT_DIR} \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is k8s? Is this a new command line tool? Can we avoid introducing new tooling to illustrate how to run the examples? Can we just use kubectl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree don't want to introduce a new tool. Re-named it example-runner but forgot to update docs. But that aside yes you can just use kubectl once the yamls are filled out and it's fine if the scope for tf/k8s is at that level.
I wanted something like this for my own usage to simplify the process of building an image then passing that image into a yaml with run parameters templated into the container args field. Also I wanted something that would generate logdir and run names and pass those into the appropriate YAML fields without having to manually create those each time.
examples/agents/README.md
Outdated
|
||
In this example hyperparameters are provided in [trainer/task.py](https://github.com/tensorflow/k8s/tree/master/examples/agents/trainer/task.py): | ||
|
||
```python |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better just to provide a link to the code since the comment will likely become stale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I don't follow. The link is to the main k8s repo (not my fork) so the link won't be stale when merging. Perhaps you were referring to using a relative link instead?
examples/agents/setup.py
Outdated
@@ -0,0 +1,20 @@ | |||
# Licensed under the Apache License, Version 2.0 (the "License"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a setup file? Can't we just invoke the python file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No don't need setup file, yes can invoke python file. I'll remove it.
py/example-runner
Outdated
@@ -0,0 +1,489 @@ | |||
#!/usr/bin/env python |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this form this PR? Running jobs with TfJob should be light weight. Ideally its just a yaml spec + kubectl.
If you want to create a higher level CLI/syntactic sugar lets do so in a separate PR. If you think more tooling is needed please consider opening an issue to discuss missing functionality.
In the meantime, I'd prefer something lighter weight like a makefile to invoke the requisite commands to build docker images and generate the YAML.
Something like:
https://github.com/tensorflow/k8s/blob/master/test-infra/airflow/Makefile
The rendered yaml should be checked in so that a user could in theory just do
kubectl apply -f ./deployment.yaml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I had a feeling you meant to draw the boundary there and that makes sense. And yeah that's a good process to do issue and discussion first. I'll write a little Makefile and check in a YAML. Let me know if you have a preference on where the image should be hosted, in the mean time I'll put it on gcr.io/dev01-181118-181500/.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also trailing 'e' on 'pip install jinja2-clie' in https://github.com/tensorflow/k8s/blob/master/test-infra/airflow/Makefile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty good. Do you want to get distributed training supported before checking it in?
examples/agents/Dockerfile
Outdated
RUN pip install -e git://github.com/tensorflow/agents.git#egg=agents | ||
RUN pip install gym pybullet | ||
|
||
ADD . /opt/mlkube |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are no longer using the name mlkube; what files are being added in this directory?
/ok-to-test |
Yeah I was thinking we'd get a distributed version working before checking it in but its totally up to you. I'll change mlkube to app and the only file that really needs to be added (at this point in the example) is task.py. |
- Added libav-tools to Dockerfile to enable rendering videos of trained models - Added comments to Dockerfile and task.py for clarity - Pulled task.py out of trainer/ and updated Dockerfile for simplicity - Updated README with smaller mean_score png - Updated README section regarding tboarding existing example logdir after staging to own bucket with gsutil -u <billable-project>
My preference would be to get distributed training work before checking this in because that should hopefully highlight the value add of TfJob. If you're just running a single instance you could just as easily a K8s Job controller. I see you have some questions about how to do distributed training. Are you blocked? The most common form of distributed training is between-graph replication. So in this model I would expect you have N workers each running M simulations locally (as subprocesses). Those M simulations would be used to create a batch of data to feed to the worker. I think this is the pattern @danijar described in this comment. |
Also regarding the example runner code (since removed) you might want to look at Draft (#136) to see whether that might solve some of the same problems. |
I also filed #167 to investigate a client library for Python. |
- trainer.task._train: - gets distributed configuration from environment, from that obtaining job_typ - depending o that initiates either server.join() or agents.scripts.train.train - trainer.algorithm: - need to identify which variable stores the policy parameters that are to be shared among workers via parameter severs
Thanks for the info on distribution, that's reasonably clear now. The top-level train function in trainer/task.py is now this: def _train(args):
"""Run in training mode."""
config = _get_training_configuration(args.config, args.log_dir)
server, worker_device_func, task = _get_distributed_configuration()
job_type = task.get("type", "").lower()
if job_type == "ps":
logging.info("Running PS.")
server.join()
else:
logging.info("Running worker.")
# Runs agents.scripts.train.train on each of the workers.
with tf.device(device_func):
for score in agents.scripts.train.train(config, env_processes=True):
logging.info('Score {}.'.format(score)) That starts the parameters servers listening for variables to serve and executes Summarizing the next step we want to put the policy parameters on the parameter servers so when the gradients of the individual workers are added to these they are all being added to a shared set of policy parameters. These can be put on the parameter servers by wrapping the variables instantiation with a with Also at the beach this week (!) so will get another crack at this tomorrow AM. |
Yes, that's what I meant. I think we need to apply the gradients from all machines to the parameter server and then broadcast the updated model parameters back to the workers. |
Thanks, so you're recommending synchronous training not asynchronous training? |
@cwbeitel In your sample [code] (#159 (comment)) you don't have to have trainer/task.py start a gRPC server for the ps (you can if you want). With TfJob if you specify a replicatype of PS and don't specify a container it will automatically start a gRPC server for you suitable for acting as a PS. |
Yes, synchronous training sounds more promising. |
Back to working on this. Sounds good about the PS that's a convenient feature. And yeah looked into synchronous training, involves wrapping the optimizer with tf.train.SyncReplicasOptimizer. Tried it out but had error related to the way global_step is being used. Needs that to keep everything in sync, not sure on the issue yet. Recently impeded with an issue where submitted tfjobs no longer launch pods - the new tfjob is shown in the tfjobs list but no pods start to run the task. Not yet sure whether this is something to file a bug over or an issue with my usage or configuration. Creating a new cluster fixes the problem. Just updating so you guys know what's going on. |
Thanks. Let me know when you want me to take another look. |
Yeah I could use some input if you (or @danijar) have a chance! The global_step and policy and value parameters are on the parameter servers, for that see the output of log_device_placement. Side note about tensorboard through tfk8s job, not seeing arrows connecting nodes, see images below. Also only see 'unknown device' listed in devices so when the graph coloring is in device mode only one color is shown, probably my misconfiguration of something? The current version using SyncReplicasOptimizer hangs presumably because it's not yet making use of the various ops SRO exposes to facilitate syncing. It seems the standard way to manage these is to pass them as arguments to Supervisor or MonitoredTrainingSession by way of tf.train.Scaffold. These two alternatives are included in train.py along with the accompanying errors they produce, along the lines of "ValueError: Operation ... has been marked as not fetchable." Perhaps this has to do with some of the tf.cond operations but not clear yet. Thanks! |
With regard to the tboard visualization stuff it might be good to try out the mnist_replica.py example to verify it's not a problem with the agents code. Idk if you want an issue created for that. Also even though there's a fair amount of the agents code in here now I'm assuming this will be split into PR's to agents and k8s separately given it seems like you guys want your scope to end at or near the YAML/container reference level. |
Previously each instance in a distributed setting would set the checkpoint dir using a timestamp instead of all using the same checkpoint dir.
From tensorflow/agents#17 (comment)
I've hit similar problems because the ClusterSpec passed to the TensorFlow servers wasn't set correctly. and the chief wasn't set correctly. Can you provide a link in the code to where the servers are being initialized? Is the code using a Supervisor or some other class to coordinate training? |
Sure servers are being initialized here: https://github.com/cwbeitel/k8s/blob/master/examples/agents/trainer/train.py#L178. I've been trying Supervisor as well as MonitoredTrainingSession. On my local, non-distributed training using either of MonitoredTrainingSession and Supervisor proceeds fine. In a distributed setting in both cases workers hang repeating the variables that are not initialized and in the Supervisor case training on the master does not proceed at all. |
Can you print out the ServerDef just to make sure its correctly set? |
Oh interesting. So here's the server def: cluster {
job {
name: "master"
tasks {
value: "tfagents-39d2eb16-master-bggk-0:2222"
}
}
job {
name: "ps"
tasks {
value: "tfagents-39d2eb16-ps-bggk-0:2222"
}
tasks {
key: 1
value: "tfagents-39d2eb16-ps-bggk-1:2222"
}
}
job {
name: "worker"
tasks {
value: "tfagents-39d2eb16-worker-bggk-0:2222"
}
tasks {
key: 1
value: "tfagents-39d2eb16-worker-bggk-1:2222"
}
tasks {
key: 2
value: "tfagents-39d2eb16-worker-bggk-2:2222"
}
}
}
job_name: "worker"
protocol: "grpc" From this the server object is created via |
Everything looks correct. TensorFlow always uses a client that talks to a local gRPC server and for that it typically uses the localhost. |
Never mind that I misunderstood the meaning of server.target and master (thinking each server pointed to the master as its target rather than being the target for the local session). |
Oh I hadn't refreshed the page yeah sounds like we're on the same page. |
Do you think you're going to be able to make progress on resolving the issues with distributed training or are you completely blocked at this point? |
examples/agents/trainer/train.py
Outdated
ready_for_local_init_op = opt.ready_for_local_init_op | ||
local_init_op = opt.local_step_init_op | ||
if run_config.is_chief: | ||
local_init_op = opt.chief_init_op |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't the chief initialize local ops?
In tensorflow/agents#17 (comment) you posted the following error
INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op:
Variables not initialized: ppo_temporary/episodes/Variable,
ppo_temporary/episodes/Variable_1, ppo_temporary/episodes/Variable_2,
ppo_temporary/episodes/Variable_3, ppo_temporary/episodes/Variable_4,
ppo_temporary/episodes/Variable_5, ppo_temporary/last_action,
ppo_temporary/last_mean, ppo_temporary/last_logstd, ready: None
That error message is coming from here.
https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/training/session_manager.py#L417
Is the error message happening on the master? I'd imagine that if the master is acting as a worker then it also has local variables that need to be initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message isn't occurring on the master which is proceeding with training normally except for seemingly repeatedly starting the master session?
--------------------------------------------------
Phase train (phase step 0, global step 0).
2017-12-13 16:33:46.571102: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session df5b8121a99e0179 with config: gpu_options { allow_growth: true } allow_soft_placement: true
2017-12-13 16:33:46.768473: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session a7f4259ee7b2e152 with config: gpu_options { allow_growth: true } allow_soft_placement: true
2017-12-13 16:33:46.957017: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session 0d3890a298f3ddb3 with config: gpu_options { allow_growth: true } allow_soft_placement: true
INFO:tensorflow:global_step/sec: 13.5033
INFO:tensorflow:global_step/sec: 27.8826
INFO:tensorflow:global_step/sec: 28.0781
2017-12-13 16:34:17.244879: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session 041e61fded948009 with config: gpu_options { allow_growth: true } allow_soft_placement: true
2017-12-13 16:34:17.559273: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session 73a4b1f71bf752f7 with config: gpu_options { allow_growth: true } allow_soft_placement: true
2017-12-13 16:34:17.678367: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session 745a9dc24252072b with config: gpu_options { allow_growth: true } allow_soft_placement: true
INFO:tensorflow:global_step/sec: 27.0679
INFO:tensorflow:global_step/sec: 26.45
INFO:tensorflow:global_step/sec: 25.8405
2017-12-13 16:34:47.918164: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session bb207b634fca4147 with config: gpu_options { allow_growth: true } allow_soft_placement: true
2017-12-13 16:34:48.307535: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session 01acd7ab2d13488f with config: gpu_options { allow_growth: true } allow_soft_placement: true
2017-12-13 16:34:48.414965: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session ee5f75d36f8fa12e with config: gpu_options { allow_growth: true } allow_soft_placement: true
INFO:tensorflow:global_step/sec: 25.0699
INFO:tensorflow:global_step/sec: 26.0601
INFO:tensorflow:global_step/sec: 24.5106
If this is needed by anyone and there's someone who can get it working who wants to try I'm happy for someone to take it over! I don't want to be blocking anyone here. But excepting that I'll keep experimenting with having the master initialize local variables. My intuition is that this can work and that the problem previously was just that worker sessions just didn't have the right master address. Previously they were pointing to their own localhost then when using RunConfig.master they still weren't pointing to the cluster master, e.g. the following test fails: from __future__ import absolute_import, division, print_function
import json
from tensorflow.contrib.learn.python.learn.estimators import \
run_config as run_config_lib
from tensorflow.python.platform import test
patch = test.mock.patch
class RunConfigTest(test.TestCase):
def test_values_from_tf_config(self):
cluster_spec = {
run_config_lib.TaskType.PS: ["localhost:9990"],
run_config_lib.TaskType.MASTER: ["host3:3"],
run_config_lib.TaskType.WORKER: ["host4:4", "host5:5", "host6:6"]
}
tf_config = {
"cluster": cluster_spec,
"task": {
"type": run_config_lib.TaskType.WORKER,
"index": 2
},
"environment": "cloud"
}
with patch.dict("os.environ", {"TF_CONFIG": json.dumps(tf_config)}):
config = run_config_lib.RunConfig()
self.assertEqual(config.master, "grpc://host3:3") Also it seems to me like a lot of the structure of what is happening in tf/agents maps fairly well onto that of the Experiment/Estimator stack and I'm wondering if using that as a starting point will make things like this easier. Also perhaps for debugging it would be better to mock a cluster locally. |
So digging into where this uninitialized variable message comes from like you were suggesting, at https://github.com/tensorflow/tensorflow/blob/r1.4/tensorflow/python/training/monitored_session.py#L195 class Scaffold(object):
...
def finalize(self):
...
if self._ready_for_local_init_op is None:
def default_ready_for_local_init_op():
return variables.report_uninitialized_variables(
variables.global_variables())
self._ready_for_local_init_op = Scaffold.get_or_default(
'ready_for_local_init_op', ops.GraphKeys.READY_FOR_LOCAL_INIT_OP,
default_ready_for_local_init_op) So all global variables that are shared with the master node are successfully initialized but I think those that have been explicitly placed on worker nodes aren't being initialized by the master because they are also global variables. If these are going to be private to workers then I think they need to be explicitly identified as local variables so that workers can pass Oh and I think using MonitoredTrainingSession(master='grpc://cluster-master:2222', ...) instead of MonitoredTrainingSession(master=server.target, ...) was the reason for the interruptions (#159 (comment)) in the master logs with 'Start master session...' logs... I think I misunderstood the purpose of the master flag. |
Explicitly flagging all variables kept private to workers solved the problem of these blocking the ready_for_local_init_op, allowing it to proceed to initialize these as locals in the standard way. One issue with that is that it involved making changes through out the agents code adding "collections=[tf.GraphKeys.LOCAL_VARIABLES]" to Variable declarations. This might be more modification than is wanted (@danijar) or necessary and an alternative is to move all variables matching a certain name scope from globals to locals like the following: from tensorflow.python.ops import variables
def _move_matching_global_vars_to_local(pattern):
global_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
for i, var in enumerate(global_vars):
if re.match(pattern, var.name):
tf.add_to_collection(tf.GraphKeys.LOCAL_VARIABLES, var)
global_vars.remove(var) Being able to move a large collection of variables to the locals might even be necessary in order to address a new issue that arose when getting past variable initialization that probably has to do with sharing too many variables (probably some that are somehow involved in environment resets). To address this I think the code just needs to be adapted to keep all variables local by default and share only those for the policy, value, and global step explicitly. Which again raises the issue of whether the above is the correct way to move a large collection of variables from globals to locals. I'm curious whether there is (or could be) a tensorflow built-in way to handle this situation, such as part of Estimator i.e. something to implement the rule (if enabled) "if a variable is placed on a worker it is treated as a local by default". |
Cool now asynchronous training appears to be working but adding in the SyncReplicasOptimizer gives an error about an op not being fetchable. Haven't let it run very long but thus far the scores are highly variable and don't appear to be improving presumably on account of applying stale gradients. Pushed a version in case you guys are interested. Like I was saying there are a lot of additions of variables to locals so a lot of different parts of the code have been modified and this is going to need to be scoped appropriately into separate agents and k8s PR's. Let me know if it would be helpful / easier for me to start that split now or whatever you guys want to do going forward. |
Great work. |
Great to hear that you got it running. Might be worth getting synchronous training running first, before splitting it up? Otherwise I'd imagine you'd spend a lot of time updating different PRs during development. |
Sounds good! And about that "not fetchable" error, "Operation u'end_episode/cond/cond/training/perform-update-steps/get-losses/scan/while/apply-gradients/Assign' has been marked as not fetchable.", first of all for context here's the code path in algorithm.py leads to it: class PPOAlgorithm(object):
...
def end_episode(self, agent_indices):
...
with tf.name_scope('end_episode/'):
return tf.cond(
self._is_training,
lambda: self._define_end_episode(agent_indices), str)
def _define_end_episode(self, agent_indices):
...
return tf.cond(memory_full, self._training, str)
def _training(self):
with tf.name_scope('training'):
...
update_summary = self._perform_update_steps(
observ, action, old_mean, old_logstd, reward, length)
def _perform_update_steps(
self, observ, action, old_mean, old_logstd, reward, length):
...
with tf.name_scope('get-losses'):
value_loss, policy_loss, summary = tf.scan(
lambda _1, _2: self._update_step(
observ, action, old_mean, old_logstd, reward, advantage, length),
tf.range(self._config.update_epochs),
[0., 0., ''], parallel_iterations=1)
...
def _update_step(
self, observ, action, old_mean, old_logstd, reward, advantage, length):
...
with tf.name_scope('apply-gradients'):
optimize = self._optimizer.apply_gradients(
zip(all_gradients, all_variables),
global_step=self._step
) (Also I was getting a warning for a while about an attempted merge of ops on incompatible devices (mater/worker vs. ps) but not seeing that now.) In this discussion @mrry gives an explanation of how this error can arise in relation to nesting within a tf.cond() which I somewhat understand (looks like @danijar was on that thread as well). So the two approaches going forward seem to be (1) understanding the issue caused by the tf.cond() and applying the appropriate adaptation or (2) the perhaps more involved approach of swapping out Loop for use of either the Experiment train_and_evaluate schedule or a custom schedule that also interleaves a rollout/experience-building phase (e.g. rollout_train_and_evaluate). Building on top of the Experiment/Estimator stack would have numerous other benefits such as the maturity and robustness of that set of objects as well as there being an established model for representing hparams and hyperparameter tuning. @danijar I don't necessarily understand all that you mean to do with tensorflow/agents - is the latter incompatible with any of the objectives? I'll start with the former approach but not yet clear on how to do this, would appreciate any ideas! |
I'm not very familiar with the |
Is the experiment api to be mainlined in next releases? I don't know how stable it is. |
I'm actually not sure how much replacing TensorFlow loops by Python loops would help here. In general, an Agent might use any number of TensorFlow loops internally and we'll soon have an example of this for model-based RL. I will try to loop in some people from the TensorFlow team to see if this is a TensorFlow bug. Could you provide a command to run that reproduces the issue? |
@danijar Cool yeah me neither!! 😄 Estimator seems to provide a nice way to run a graph in different modes (train, eval, predict) that doesn't wrap phases in tf.cond's. Estimator also seems nice because it implements a bunch of best practices for setting up monitoring, checkpointing, exporting, etc. @bhack Yeah Experiment is still in contrib idk either. |
There added a couple script to reproduce the error. At the moment it's assuming there's a PS and the code to run in a docker container locally doesn't set one up but the |
- instead of putting the global_step on a parameter server explicitly use replica_device_setter to do so permitting portability across conditions where there are or aren't parameter servers available. - permits use of tfk8s-localmock (locally) to illustrate "not fetchable" error
Closing this PR, restructuring code as PR to agents (of asynchronous version) and a simplified submission to tfk8s that is primarily a config referencing an image built elsewhere. |
What was done
py/example_runner
examples/agents
in this way.
This change is