Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed FIFOQueue with shared_name is not shared #17050

Closed
illeatmyhat opened this issue Feb 15, 2018 · 3 comments
Closed

Distributed FIFOQueue with shared_name is not shared #17050

illeatmyhat opened this issue Feb 15, 2018 · 3 comments
Labels
stat:awaiting response Status - Awaiting response from author

Comments

@illeatmyhat
Copy link

illeatmyhat commented Feb 15, 2018

System information

  • Environment:
    Shared Cluster
  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04):
    RHEL Server 7.2
  • TensorFlow installed from (source or binary):
    pip install tensorflow-gpu
  • TensorFlow version (use command below):
    v1.5.0-0-g37aa430d84 1.5.0
  • Python version:
    3.6
  • CUDA/cuDNN version:
    9.0/7.0
  • GPU model and memory:
    N/A -- GPU not allocated

I am attempting to use a FIFOQueue to signal the parameter servers to shut down on a multi-machine shared cluster, based on this example. After some testing, I believe that shared_name simply doesn't seem to do anything--even after removing the dequeue() operations, the number of elements in the FIFOQueue don't correlate to the number of workers.

Minimum Reproducible Code

# for example
cluster = tf.train.ClusterSpec({
    'ps': ['192.168.1.1:36598'],
    'worker': ['192.168.1.2:40596', '192.168.1.3:47324', '192.168.1.4:38923']
})
# ... #
server = tf.train.Server(cluster, job_name=job_name, task_index=task)

# using server.join() causes cluster management headaches
# use a FIFOQueue to tell the parameter server to shutdown
if job_name == 'ps':
    with tf.device('/job:ps/task:%d' % task):
        queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue')
    with tf.Session(server.target) as sess:
        sess.run(queue.dequeue())
        print('ps %d: quitting' % task)

# MonitoredTrainingSession with FinalOpsHook not shown
elif job_name == 'worker':
    with tf.device('/job:worker/task:%d' % task):
        with tf.name_scope('done_queue'):
            queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue')
    with tf.Session(server.target) as sess:
        _, size = sess.run([queue.enqueue(1), queue.size()])
        print('Worker:%d sending done to ps:%d [elements=%d]' % (task, 0, size))
@illeatmyhat
Copy link
Author

illeatmyhat commented Feb 15, 2018

Whew. How silly.
The problem is that the queues are all assigned to different devices--one on /job:ps and the others on their individual /job:worker tasks.
I've produced a minimal working example:

import tensorflow as tf
import threading

def main(job_name, task):
    cluster = tf.train.ClusterSpec({
        'ps': ['localhost:22222', 'localhost:22223'],
        'worker': ['localhost: 22224','localhost: 22225','localhost: 22226']
    })

    # Create and start a server for the local task
    server = tf.train.Server(cluster, job_name=job_name, task_index=task)

    if job_name == 'ps':
        with tf.device('/job:ps/task:%d' % task):
            queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % task)
        with tf.Session(server.target) as sess:
            for i in range(cluster.num_tasks('worker')):
                sess.run(queue.dequeue())
                print('ps:%d received done from worker:%d' % (task, i))
            print('ps:%d quitting' % task)
    elif job_name == 'worker':
        # queue needs to be visible to /job:ps
        queues = []
        for i in range(cluster.num_tasks('ps')):
            with tf.device('/job:ps/task:%d' % i):
                queues.append(tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % i))
        with tf.Session(server.target) as sess:
            for i in range(cluster.num_tasks('ps')):
                _, size = sess.run([queues[i].enqueue(task), queues[i].size()])
                print('Worker:%d sending done to ps:%d [elements=%d]' % (task, i, size))

if __name__ == '__main__':
    threads = [
        threading.Thread(target=main, args=('ps', 0)),
        threading.Thread(target=main, args=('ps', 1)),
        threading.Thread(target=main, args=('worker', 0)),
        threading.Thread(target=main, args=('worker', 1)),
        threading.Thread(target=main, args=('worker', 2))]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

It's simple to change to work with MonitoredTrainingSession using FinalOpsHook, although it certainly isn't the prettiest way to go about doing things.

@cy89
Copy link

cy89 commented Feb 16, 2018

@illeatmyhat (excellent username, BTW) have you resolved your own bug, or is there a remaining issue that you need help with?

@cy89 cy89 added the stat:awaiting response Status - Awaiting response from author label Feb 16, 2018
@illeatmyhat
Copy link
Author

Yes, the issue is resolved. It can be closed now.
Thanks

@cy89 cy89 closed this as completed Feb 20, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stat:awaiting response Status - Awaiting response from author
Projects
None yet
Development

No branches or pull requests

2 participants