Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Distribution Strategy - Revised API #25

Merged
merged 14 commits into from Nov 16, 2018

Conversation

@guptapriya
Copy link
Member

commented Oct 17, 2018

Review period closes 2018-11-08

Distribution Strategy - Revised API

Status Proposed
Author(s) cjfj@google.com, dominikg@google.com, jhseu@google.com, joshl@google.com,
petebu@google.com, priyag@google.com, tomhennigan@google.com
Sponsor wicke@google.com
Updated 2018-11-12

Objective

This document presents a proposal to seek feedback on a revised Distribution Strategy API and illustrate its usage in various situations. Distribution Strategy aims to allow users to easily distribute their computation across
different GPUs, TPUs and multiple machines. Until now, the recommended usage of Distribution Strategy has been through TensorFlow high level training frameworks such as tf.keras and Estimator. In this proposal, we want to show how one can use Distribution Strategy APIs directly for distributing custom training loops, and get feedback on those APIs. This use case is important for many users who want more control of their training loops, such as ML researchers. We have tested a similar version of this API internally with many researchers in Alphabet and the current proposal is based on their feedback.

This is also an opportune time to get public feedback as we are in the process of migrating Distribution Strategy APIs from tf.contrib to core TensorFlow (as part of TF 2.0). As part of the move, we want to improve and reorganize the APIs to make them more user friendly and understandable.

@guptapriya guptapriya requested review from ewilderj and martinwicke as code owners Oct 17, 2018
guptapriya and others added 3 commits Oct 17, 2018
@ewilderj ewilderj added this to Needs attention in RFC management via automation Oct 18, 2018
@ewilderj ewilderj moved this from Needs attention to Open reviews in RFC management Oct 18, 2018
@AakashKumarNain

This comment has been minimized.

Copy link

commented Oct 19, 2018

I don't understand why this thing was proposed in the first place. The whole idea of Tensorflow 2.0 was to have an easy to use interface, eager execution and no redundancy. However, this RFC contradicts it. It's like saying, we are having tf.layers.Dense() for high-level API but we also have tf.nn.Dense() if you want more flexibility. IMHO, this is going to create a mess again, two things each of which is able to achieve the same goal. Alternatively, the hooks should be provided within the DistributedStrategy API itself so that everyone is on the same page. If you are adding another API for researchers, then researchers would be using Replicator while applied ML engineers will be using DistributedStrategy.

@seanpmorgan

This comment has been minimized.

Copy link
Member

commented Oct 19, 2018

Distribution Strategy’s low level APIs can be directly used to distribute
training programs written with low level TensorFlow APIs (i.e. without
Estimator/Keras). But it can be tedious and error prone as it exposes an
extensive and flexible API. There is a skeleton of a mid-level API within
Distribution Strategy where users can write their own main-loops more easily, but this has not yet
been developed fully or tested with real users.

IMO this is a pretty weak reason to create a second API. Why not just finish developing the mid-level API and make sure it works across real use cases?

@terrykong

This comment has been minimized.

Copy link

commented Oct 22, 2018

I see that there was discussion of a num_steps_per_run parameter, but will the Replicator API support some kind of gradient aggregation? I know we usually like to just throw more accelerators at a problem to increase the effective batch size, but for some of us who are more restricted in terms of compute power I wonder if we can support this kind of logic to have larger effective batch sizes. For example using N GPUs with batch size B, but aggregating gradients for R number of runs of the optimizer and then delay syncing to achieve an effective batch size of N*B*R.

@kazemSafari

This comment has been minimized.

Copy link

commented Oct 22, 2018

@tfboyd Thank you so much. This will be extremely helpful in my current research projects. I greatly appreciate it.

@chr1sj0nes

This comment has been minimized.

Copy link
Contributor

commented Oct 23, 2018

I see that there was discussion of a num_steps_per_run parameter, but will the Replicator API support some kind of gradient aggregation? I know we usually like to just throw more accelerators at a problem to increase the effective batch size, but for some of us who are more restricted in terms of compute power I wonder if we can support this kind of logic to have larger effective batch sizes. For example using N GPUs with batch size B, but aggregating gradients for R number of runs of the optimizer and then delay syncing to achieve an effective batch size of NBR.

@terrykong I think that this is an orthogonal problem to the one that Replicator is trying to solve. I believe that it could be achieved using a relatively simple Optimizer wrapper, with or without Replicator.

@terrykong

This comment has been minimized.

Copy link

commented Oct 23, 2018

I see that there was discussion of a num_steps_per_run parameter, but will the Replicator API support some kind of gradient aggregation? I know we usually like to just throw more accelerators at a problem to increase the effective batch size, but for some of us who are more restricted in terms of compute power I wonder if we can support this kind of logic to have larger effective batch sizes. For example using N GPUs with batch size B, but aggregating gradients for R number of runs of the optimizer and then delay syncing to achieve an effective batch size of N_B_R.

@terrykong I think that this is an orthogonal problem to the one that Replicator is trying to solve. I believe that it could be achieved using a relatively simple Optimizer wrapper, with or without Replicator.

@chr1sj0nes Gotcha, I see your point.

@guptapriya

This comment has been minimized.

Copy link
Member Author

commented Oct 23, 2018

@AakashKumarNain @seanpmorgan thank you for the feedback (regarding comment 1, comment 2).

We've taken this feedback into consideration and are planning to re-organize this proposal (hence the delay in responding). We are leaning towards incorporating the new API into Distribution Strategy - and clearly documenting/marking how to use DistributionStrategy APIs depending on the use case. Please stay tuned for a revamped proposal.

@AakashKumarNain

This comment has been minimized.

Copy link

commented Oct 24, 2018

@guptapriya Thanks for the clarification. Waiting for the revamped proposal.

@goldiegadde goldiegadde self-assigned this Oct 25, 2018
rfcs/20181016-replicator.md Outdated Show resolved Hide resolved
rfcs/20181016-replicator.md Outdated Show resolved Hide resolved
rfcs/20181016-replicator.md Show resolved Hide resolved
passed to run. It takes an `InputFn` `fn` which either returns a
`Dataset` or a function which returns
([nests](https://www.tensorflow.org/api_docs/python/tf/contrib/framework/nest)
of) input tensors. It also takes an optional `InputReplicationMode`

This comment has been minimized.

Copy link
@karmel

karmel Oct 26, 2018

Member

Do we need to support both Dataset and nests of tensors? Can't a dataset of nested tensors suffice for the latter use case, such that this always returns a dataset?

This comment has been minimized.

Copy link
@chr1sj0nes

chr1sj0nes Oct 29, 2018

Contributor

This feature was added in response to internal user feedback. In an RL use-case, it is not always convenient to work with Datasets. Wrapping Tensors as a Dataset is non-trivial, and introduces an unnecessary (albeit slight) overhead.

This comment has been minimized.

Copy link
@karmel

karmel Oct 31, 2018

Member

I recall, perhaps once upon a time, that there was a restriction that the Estimator input_fn had to return datasets to work with DistStrat, and that returning a tuple of (feature, label) would not be supported. Or maybe that was with replicate_model_fn? Does that restriction still hold, or does the acceptance of nested tensors imply that other return types are also acceptable?

This comment has been minimized.

Copy link
@guptapriya

guptapriya Nov 2, 2018

Author Member

Great point. I think once we add support for fn returning tensors, we should be able to support input_fn returning (feature,label) tuples in Estimator as well.

rfcs/20181016-replicator.md Show resolved Hide resolved
optimizer = tf.train.MomentumOptimizer(learning_rate, 0.9)
def input_fn(ctx):
assert effective_batch_size % ctx.num_replicas_in_sync == 0

This comment has been minimized.

Copy link
@karmel

karmel Oct 26, 2018

Member

In practice, if this assert fails, what happens? Or, more broadly, how does error propagation work? Would this stop all replicas, or if one fails, the rest continue?

This comment has been minimized.

Copy link
@guptapriya

guptapriya Oct 29, 2018

Author Member

since this type of error will happen during graph construction, in the in-graph case, this will just mean that the graph will not be constructed and no replicas will start training. (for multi-GPU and TPU strategy, this would be the case)
For the between-graph case (which is typically used for multi worker), there are a couple different modes - standalone client and independent worker mode. We don't talk about these in detail in this design, but there will be an upcoming design talking about how to setup the cluster etc for multi worker cases easily.
@yuefengz - can you comment on the current state of error propagation for between graph cases for multi worker training?

This comment has been minimized.

Copy link
@yuefengz

yuefengz Oct 29, 2018

Member

In the between-graph replication case, if this assert fails, it looks to me it will fail at graph creation time on all workers. Is there a chance that only some workers fail the assertion?

This comment has been minimized.

Copy link
@guptapriya

guptapriya Oct 29, 2018

Author Member

Yeah this case will fail on all of them. But Karmel's question is also about broad error propagation - if graph creation fails on one of the workers but not on all?

This comment has been minimized.

Copy link
@yuefengz

yuefengz Oct 29, 2018

Member

There is not any existing mechanism to broadcast errors. That depends on the outer loop. E.g. Kubeflow can monitor these workers and kill the whole training cluster if any of the worker fails with non-retryable errors.

rfcs/20181016-replicator.md Outdated Show resolved Hide resolved
rfcs/20181016-replicator.md Outdated Show resolved Hide resolved
```

#### Evaluation

This comment has been minimized.

Copy link
@karmel

karmel Oct 26, 2018

Member

How does checkpointing/restoring from checkpoint work in this world? Is failure handling left to the user?

This comment has been minimized.

Copy link
@guptapriya

guptapriya Oct 29, 2018

Author Member

In the current proposal we are not providing any support with checkpointing/restoring and these would be left to the user if they're not using Keras/Estimator.
There might be a proposal in the future to provide some help with that.

rfcs/20181016-replicator.md Show resolved Hide resolved
Incorporated Replicator API into Distribution Strategy API, and re-organized the latter.
@guptapriya

This comment has been minimized.

Copy link
Member Author

commented Nov 1, 2018

We have significantly updated the proposal based on the feedback - the major shift is to incorporate Replicator API into the Distribution Strategy API and re-organize it. Please give it a read and provide your feedback. We would especially welcome your thoughts on the "open questions and discussion topics" at the end of the document.
The review period has been extended for a week until Nov. 8th.

@guptapriya guptapriya changed the title RFC: Replicator RFC: Distribution Strategy - Revised API Nov 1, 2018
@bhack bhack referenced this pull request Nov 2, 2018
An instance of `InputContext` is passed to the input function.

```python
class InputContext(object):

This comment has been minimized.

Copy link
@wangsiyu

wangsiyu Nov 3, 2018

This is an expected design for DistributionStrategy. So how to integrate InputContext into Estimator and Keras seamlessly ? Currently, the input_fn in Estimator does not have any parameters . But it would be reasonable to modify some interfaces on Estimator and Keras.

This comment has been minimized.

Copy link
@guptapriya

guptapriya Nov 5, 2018

Author Member

Thanks for the review @wangsiyu .
For Keras, we have had some discussions around whether we should allow users passing an input function. From the feedback we've got so far, it seems that providing a dataset or numpy array directly, and letting DS figure out how to split/shard the input is the most user friendly approach for Keras APIs. So we are not planning to change Keras API to take an input function. We will instead work on efficient distribution of input from a single dataset.

For Estimator, this is under discussion right now, since it already supports input function. As you mentioned, it doesn't currently take this input context, but it would be reasonable to modify that to allow it for users who do wish to modify their input pipelines. Most likely we will add this support.
However, note that this would require users to modify their existing input code. Until now, our philosophy has been to allow keras/estimator users to be able to use their existing code as much as possible when adding DS. So we are trying to figure out what's the best user experience for users who don't wish to modify their existing input pipelines.

strategy = tf.distribute.MirroredStrategy()
model.compile(loss='mean_squared_error',
optimizer=tf.train.GradientDescentOptimizer(learning_rate=0.2),
distribute=strategy)

This comment has been minimized.

Copy link
@karmel

karmel Nov 5, 2018

Member

If we wait until compile to pass in the strategy, how do we handle, for example, input sharding? Are there other limitations of the Keras flow, or is the expectation that we should be able to use DS to distribute Keras models to the same scale we can today with Estimators?

This comment has been minimized.

Copy link
@guptapriya

guptapriya Nov 5, 2018

Author Member

The plan for input is to rely on datasets to be able to (eventually) efficiently shard input. And this can happen at the compile/fit stage, with whatever dataset is provided at that time. Note that the initial implementation may not be efficient, but we want to design the APIs such that it makes the most sense for users, even if we have to sacrifice performance in the short term.

AFAIK, there aren't any other limitations we've seen so far because of delaying strategy passing until compile that affect performance, but @anj-s maybe you can add more thoughts on this?

This comment has been minimized.

Copy link
@anj-s

anj-s Nov 6, 2018

Member

I believe compile is the right stage to pass the strategy and set it as one of the model properties, similar to loss, optimizer etc. We also only have access to the input at the fit stage (and possibly information about the workers as well). There isn't any performance issues so far with the current flow. As guptapriya@ mentioned we will be using an initial implementation for input sharding which may have performance issues but this is not dependent on when we pass the strategy object.

# Start the actors.
for actor_id in range(num_actors):
threading.Thread(target=run_actor, args=(actor_id,)).start()

This comment has been minimized.

Copy link
@jperl

jperl Nov 6, 2018

It would be nice to have a reinforcement learning story where the environments run in a Process instead of a Thread to support heavier computation.

The agent model call and FIFOQueue.enqueue would be problematic in a Process.

This comment has been minimized.

Copy link
@petebu

petebu Nov 7, 2018

Member

Thanks for the review jperl@. You're right that supporting separate processes for the actors and learner is important but the proposal doesn't preclude that. We already use the prototype implementation in exactly that way with two separate Python+TF processes. The agent model building isn't a problem - you can just build the same model twice, once in the learner and again the actors. To transfer the weights, you can serialize them out of graph and send them to the other process. The example uses threads for illustration - a multi-process example would be somewhat more involved and would rely on additional RPC libraries for the cross-process weight update.

This comment has been minimized.

Copy link
@jperl

jperl Nov 7, 2018

Thanks for the reply @petebu. I apologize if I am misunderstanding -- but can the Distribution Strategy API support mirrored variables on the same machine across processes, so that the end user does not need to manage the cross-process weight sync manually?

This comment has been minimized.

Copy link
@egonina

egonina Nov 7, 2018

Member

Similar question to jperl@: When you say "To transfer the weights, you can serialize them out of graph and send them to the other process." does DS support weight transfer between two processes (running on one or multiple machines)? If not, can you outline how DS would be used in combination with other communication libraries to implement distributed RL with distributed data collection and one or more learners (with replay buffers/FIFO queue for experience trajectories)?

This comment has been minimized.

Copy link
@mjlbach

mjlbach Jul 11, 2019

Is there any update on this? Currently trying to do this (have data gather loop in its own process) and have been using tf.train.replica_device_setter() building the graph in multiple processes and using the parameter server to share weights between agent collecting data and updater that updates the agent's policy model.

rfcs/20181016-replicator.md Outdated Show resolved Hide resolved

#### Worker

The physical machine(s) containing the physical devices (e.g. GPUs, TPUs) on

This comment has been minimized.

Copy link
@egonina

egonina Nov 7, 2018

Member

this definition is a bit confusing to me - is a worker one physical machine or something that runs the replica code on one or more physical machines?

This comment has been minimized.

Copy link
@guptapriya

guptapriya Nov 7, 2018

Author Member

A worker, as defined here, is the minimum of (one machine, one replica).
So in the most common case, it would correspond to one machine, and has one or multiple replicas.
But in the less common case where one replica needs to span > 1 machine, we are defining a worker to span the minimum number of machines needed for 1 replica.

I agreed this is a little confusing, and I am also worried this might be confusing with the "worker" task as used in TensorFlow in general (Although they correspond to each other in the common cases).

Do you have suggestions for a different name, or a different concept we should define here?

guptapriya added 4 commits Nov 7, 2018
Updating the design doc with decisions on open questions that were discussed in the in-person design review.
@michaelstjules

This comment has been minimized.

Copy link

commented Nov 12, 2018

If I have a model that was not trained using a DistributionStrategy, can I just call tf.train.import_meta_graph or tf.import_graph_def in a DS scope to distribute the model? If not, what else would I need to do?

@yuefengz

This comment has been minimized.

Copy link
Member

commented Nov 12, 2018

@michaelstjules This is good question. The tf.train.import_meta_graph currently won't work with run or call_for_each_replica method because it directly adds nodes to a TensorFlow graph but distribution strategy need to capture the creation of variables and ops (to replicate them or place them on right devices). We'll think of some way to support it to some degree, which is out of the scope of our v2 API.

@@ -0,0 +1,1014 @@
# Distribution Strategy - Revised API

| Status | Proposed |

This comment has been minimized.

Copy link
@ewilderj

ewilderj Nov 13, 2018

Member

@guptapriya please amend to "Accepted" and then we can merge this, thank you!

This comment has been minimized.

Copy link
@guptapriya

guptapriya Nov 14, 2018

Author Member

Great, just did that. Thanks @ewilderj !

@ewilderj ewilderj merged commit 33cda63 into tensorflow:master Nov 16, 2018
1 check passed
1 check passed
cla/google All necessary CLAs are signed
RFC management automation moved this from Open reviews to Accepted RFCs Nov 16, 2018
model.compile(loss='mean_squared_error',
optimizer=tf.train.GradientDescentOptimizer(learning_rate=0.2),
distribute=strategy)
model.fit(dataset, epochs=5, steps_per_epoch=10)

This comment has been minimized.

Copy link
@sshrdp

sshrdp Nov 20, 2018

Member

The interface between the model.fit and Dataset API is not clear. Is the input dataset supposed to be a single-epoch or should it be repeated? If the dataset is just returning the input dataset and fit is controlling number of epochs to iterate over it, why is steps_per_epoch necessary?

This comment has been minimized.

Copy link
@guptapriya

guptapriya Nov 26, 2018

Author Member

That is good feedback @sshrdp and it applies to model.fit and Dataset API in general. In fact @sb2nov had been looking at cleaning this up to make it more intuitive and usable (along the lines of not needing steps_per_epoch for dataset case).
@sb2nov - can you shed more light?

mean_disc_loss = strategy.reduce(per_replica_disc_losses)
mean_gen_loss = strategy.reduce(per_replica_gen_losses)
with tf.Session() as session:

This comment has been minimized.

Copy link
@galeone

galeone Jan 30, 2019

The examples are using tf.Session that is going to be removed in 2.0: are those examples supposed to work wrapping the following lines in a function and decorating it with @tf.function?

@capilano

This comment has been minimized.

Copy link

commented Aug 6, 2019

Is this a work in progress or does the distribute strategy now support a clean way to efficiently implement custom training loops? I have seen examples of custom loops in tensorflow 1.14, but inside a session, is it possible to use tf.function in 1.14 (without any slowdown) or should we have to move to 2.0 to use the distribute strategy for custom training in eager mode as intended

@guptapriya

This comment has been minimized.

Copy link
Member Author

commented Aug 6, 2019

@capilano Most of the APIs mentioned in this RFC have been implemented. You can see tutorials here:
2.0 style: https://www.tensorflow.org/beta/tutorials/distribute/training_loops
1.x style: https://www.tensorflow.org/tutorials/distribute/training_loops

You should be able to use tf.function in 1.x when using eager mode in the same way as the 2.0 tutorial above describes. You may need to use a more recent version of TF 1.x than 1.14 though, as some of the APIs have been added later. You can find them in TF 1.x nightly release.

@capilano

This comment has been minimized.

Copy link

commented Aug 6, 2019

@guptapriya I have looked at them, and when I tried implementing my model using TPU distribute strategy about a month back in 1.14(latest stable release),there were errors while distributing the dataset (in eager mode) and also it was slower than using an estimator.
As a side note, the 1.x style example (TPU custom training part) does not mention anything about how the losses are reduced over multiple replicas, where as the GPU mirrored strategy manually sums over all replicas. Does the TPU strategy behave differently here? And how about batch_norm mean and variances(are they per replica or combined?)
Also, is there support for non Keras models,let's say if they were built using tf.nn.layers? I am trying to port a GAN model that I am currently training using estimators without much success.
I even tried directly converting the DCGAN Keras example https://www.tensorflow.org/beta/tutorials/generative/dcgan by putting everything that needed to be put under strategy scope,but it throws an error in the current stable tensorflow build as of today. I'll take your suggestion and try nightly or 2.0 beta. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.