Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Public API to preempt tf.train.Server #21492

Closed
superbobry opened this issue Aug 8, 2018 · 17 comments

Comments

@superbobry
Copy link
Member

commented Aug 8, 2018

System information

  • Have I written custom code (as opposed to using a stock example script provided in TensorFlow): N/A
  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): N/A
  • Mobile device (e.g. iPhone 8, Pixel 2, Samsung Galaxy) if the issue happens on mobile device: N/A
  • TensorFlow installed from (source or binary): binary
  • TensorFlow version (use command below): v1.9.0-0-g25c197e023 1.9.0
  • Python version: N/A
  • Bazel version (if compiling from source): N/A
  • GCC/Compiler version (if compiling from source):
  • CUDA/cuDNN version: N/A
  • GPU model and memory: N/A
  • Exact command to reproduce: N/A

Describe the problem

tf.train_and_evaluate uses time.sleep to (optimistically) synchronize the startup of chief/worker nodes in the distributed mode (see estimator/training.py). The implicit assumption in this logic is that by the time the worker is spawned, the chief has already started its tf.train.Server, i.e. the scheduler should be aware of the assumption and should schedule and initialize the chief first. This might not be easily achievable on general purpose systems like YARN.

One possible solution to this is to synchronize the chief/worker tasks on a barrier, and then preemptively start the server right after the barrier, but prior to calling train_and_evaluate. This does not eliminate the race condition entirely but makes it much less likely in practice. The only problem here is that train_and_evaluate does not provide a documented way to account for preempted servers. The undocumented way is:

if not _is_google_env():
self._start_std_server(config)

I was wondering if it would be possible to make this part of the train_and_evaluate contract public or, alternatively, address the issue in another way?

@karmel

This comment has been minimized.

Copy link
Member

commented Aug 9, 2018

@xiejw @ispirmustafa -- what is the recommended way to handle pre-empted Servers for systems like YARN?

@xiejw

This comment has been minimized.

Copy link
Contributor

commented Aug 10, 2018

This is a wrong argument to understand the sleep logic in the code.

  1. std server starts before the sleeping. All workers (including chief) will try to find each other first.
  2. The time.sleep after that is "delay starting". It is part of the machine learning, not related to distributed cluster management. It is the best practice for between-graph execution. For multiple worker training scenario, it is recommended to have few workers sending the gradients to update the initial variables. More workers should join later. This produces better model quality.
@superbobry

This comment has been minimized.

Copy link
Member Author

commented Aug 10, 2018

@xiejw thank you for the clarification, and apologies for poor wording on my side. The code fragment I've linked to gives the "delay starting" rationale in the comment.

All workers (including chief) will try to find each other first

Would it be right to say that in order for a task to start, it should successfully connect to all other tasks; and that if the connection is unsuccessful, the task will retry indefinitely/"long enough"? I would very much appreciate it if you could reference relevant code in the reply.

@xiejw

This comment has been minimized.

Copy link
Contributor

commented Aug 10, 2018

Assume device_filter is not set. The precise sequence can be described as follows:

  1. start_std_server. This opens the port and listens the grpc in background thread. All workers must do this first. Otherwise it might delay other workers to train the model.

  2. (optional) time.sleep for delay starting. Note that at this time the grpc port is open already.

  3. Estimator.train. This creates the graph and launches a new tf.Session to train the model. The session will try to talk to each other device (in this case other workers) and then run the TF graph. This is the part it will try to block until all devices are found. The code is here.

Say there is a worker in the time.sleep for 100000 secs (basically forever). As the port has been opened in step 1, so other workers can start train (step 3) without waiting.

With device filter, only step 3 is slightly different where each worker only tries to find the devices in the list rather than all devices.

@superbobry

This comment has been minimized.

Copy link
Member Author

commented Aug 10, 2018

Thank you very much for a detailed reply. Let me do some experiments and get back to you.

@superbobry

This comment has been minimized.

Copy link
Member Author

commented Aug 11, 2018

This is indeed a non-issue, thanks again @xiejw!

@superbobry superbobry closed this Aug 11, 2018

@superbobry

This comment has been minimized.

Copy link
Member Author

commented Aug 13, 2018

@xiejw I thought about this more over a weekend and came up with another use-case for preemption: port reservation.

YARN does not manage the network on the containers, so in order to run TF on YARN one has to manually reserve a port on each of the allocated containers, communicate it to all other tasks to assemble a ClusterSpec and only then start the server. IIUC with the current API there is no race-free way of doing this is in TF. Could you advise?

@superbobry superbobry reopened this Aug 13, 2018

@xiejw

This comment has been minimized.

Copy link
Contributor

commented Aug 14, 2018

@karmel do we have any expert for YARN? I do not understand that environment, so it is very hard for me to give best suggestions here.

@superbobry As I mentioned above, I do not understand YARN. So, my suggestions could be wrong or sub-optimal. The minimal knowledge each training job should know is

  1. The port itself uses
  2. The ports used by PS

The ports of other workers are optional. You still need to fill the TF_CONFIG, but the values of other workers do not matter.

If the conditions above are met in YARN, then you can construct a device_filter (see example code). The session will try to find the device in that list only even other workers are online (or even the other worker's addresses are fake).

@superbobry

This comment has been minimized.

Copy link
Member Author

commented Aug 21, 2018

@xiejw the problem with YARN is that it does not manage the network, i.e. there is no way to reserve a port prior to submitting an application. One possible solution to this is to

  1. Spawn a Python process in each YARN container.
  2. Bind to port 0.
  3. Communicate the port assigned by the OS to all other containers and aggregate a cluster spec
  4. Export the spec in TF_CONFIG and start training.

which roughly translates to

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.bind(("", 0))
    ipaddr, port = sock.getsockname()
    cluster_spec = broadcast_addr_and_aggregate_spec(f"{ipaddr}:{port}")

# export TF_CONFIG
tf.estimator.train_and_evaluate(...)

Note that this implementation has a race condition between closing sock and binding a tf.train.Server to the reserved port. This means that a task from another distributed TF training running on the same YARN node could hijack the port, and effectively talk to a completely unrelated task. The window of opportunity for the race condition to occur is undefined and depends on the implementation of train_and_evaluate.

I see multiple potential ways of getting rid of the race condition by slightly modifying the existing Python/C++ API of tf.train.Server:

  • Add an API to spawn a server using an existing FD. The gRPC backend seems to already support this (see grpc/grpc#6610).
  • Allow SO_REUSEPORT when creating a server. Currently the option is explicitly disabled.

superbobry referenced this issue Aug 21, 2018

Don't use SO_REUSEPORT in TensorFlow servers.
Most TensorFlow programs contain a single server, but this addresses
test flakiness where we create multiple servers in the same process.
Change: 145293417
@superbobry

This comment has been minimized.

Copy link
Member Author

commented Aug 28, 2018

@mrry could you comment on the flakiness caused by SO_REUSEPORT?

superbobry added a commit to criteo/tf-yarn that referenced this issue Sep 6, 2018

Removed the {"environment": "google"} hack
It is not needed (and in fact never was), see tensorflow/tensorflow#21492
as it does not remove the race condition, and in the general case does
not make it any less probable.
@karmel

This comment has been minimized.

Copy link
Member

commented Sep 14, 2018

@jhseu -- I hear you might be able to advise on YARN?

@jhseu

This comment has been minimized.

Copy link
Member

commented Oct 2, 2018

@superbobry

This comment has been minimized.

Copy link
Member Author

commented Oct 3, 2018

Thanks @jhseu, I had a quick look at the code, and I think they have exactly the same issue.

I'd like to point out that nothing above is specific to YARN, nor does it require any familiarity with the Hadoop ecosystem. The gist of the issue is that tf.train.Server does not provide a way to bind to an existing socket, either by reusing the fd or via SO_REUSEPORT.

@tensorflowbutler

This comment has been minimized.

Copy link
Member

commented Nov 20, 2018

Nagging Assignee @karmel: It has been 47 days with no activity and this issue has an assignee. Please update the label and/or status accordingly.

@karmel

This comment has been minimized.

Copy link
Member

commented Dec 3, 2018

DistributionStrategies is the new API for handling distribution and synchronization, and that API will be preferred in the coming months to the tf.train.Server API. That said, we have now started a Special Interest Group (SIG) for networking issues like these, so while I am closing this issue as it pertains to tf.train.Server, I would encourage you to join and participate in the Networking SIG to ensure these questions are addressed for YARN and other systems.

@karmel karmel closed this Dec 3, 2018

@cndaimin

This comment has been minimized.

Copy link

commented Dec 13, 2018

@xiejw the problem with YARN is that it does not manage the network, i.e. there is no way to reserve a port prior to submitting an application. One possible solution to this is to

  1. Spawn a Python process in each YARN container.
  2. Bind to port 0.
  3. Communicate the port assigned by the OS to all other containers and aggregate a cluster spec
  4. Export the spec in TF_CONFIG and start training.

which roughly translates to

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.bind(("", 0))
    ipaddr, port = sock.getsockname()
    cluster_spec = broadcast_addr_and_aggregate_spec(f"{ipaddr}:{port}")

# export TF_CONFIG
tf.estimator.train_and_evaluate(...)

Note that this implementation has a race condition between closing sock and binding a tf.train.Server to the reserved port. This means that a task from another distributed TF training running on the same YARN node could hijack the port, and effectively talk to a completely unrelated task. The window of opportunity for the race condition to occur is undefined and depends on the implementation of train_and_evaluate.

I see multiple potential ways of getting rid of the race condition by slightly modifying the existing Python/C++ API of tf.train.Server:

  • Add an API to spawn a server using an existing FD. The gRPC backend seems to already support this (see grpc/grpc#6610).
  • Allow SO_REUSEPORT when creating a server. Currently the option is explicitly disabled.

HI. We have faced the exactly same problem as yours when training tensorflow on yarn. Do you have progress on it?

@superbobry

This comment has been minimized.

Copy link
Member Author

commented Dec 14, 2018

@karmel as far as I can tell the old distributed TF corresponds to ParameterServerStrategy which is configured via TF_CONFIG environment variable. This is exactly the setup described in the issue, meaning that the issue of port reservation is just as relevant in the context of distribution strategies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.