In [1]:
import tensorflow as tf

tf.train.Coordinator: help multiple thread stop together and report exceptions to a program that waits for them to stop.

tf.train.QueueRunner: create a number of threads cooperatiing to **enqueue** tensors in the **same** queue.

## Coordinator

### Key method

tf.train.Coordinator.should_stop

return `True` if the threads should stop. This is called from threads, so the thread know if it should stop.

tf.train.Coordinator.request_stop

request that the threads should stop. when this is called, calls to `should_top` will return `True`.

tf.train.Coordinator.join

This call blocks until a set of threads have terminated.

note: there is a message about **exc_info**, check it, I think this a oppotunity to study how to hanle exception in thread.

### Simple example

In [5]:
import time
import threading
import random


def worker(coord):
    while not coord.should_stop():
        time.sleep(10)
        thread_id = threading.current_thread().name
        print("worker %s running" % (thread_id,))
        rand_int = random.randint(0,10)
        if rand_int > 5:
            print("worker %s requst stop" % (thread_id,))
            coord.request_stop()
    print("worker %s stopped" % (thread_id,))
        
coord = tf.train.Coordinator()

threads = [threading.Thread(target=worker, args=(coord,),name=str(i)) for i in range(3)]

for t in threads:
    t.start()
    
coord.join(threads)


worker 1 running
worker 0 running
worker 2 running
worker 0 running
worker 0 requst stop
worker 0 stopped
worker 1 running
worker 1 stopped
worker 2 running
worker 2 requst stop
worker 2 stopped


## Queue

### RandomShuffleQueue

A queue is a TensorFlow data structure that stores tensors across multiple steps, and expose operations that enqueue and dequeue tensors.

classic usage of Queue is: 

* Multiple threads prepare training examples and enqueue them.
* A training thread executes a training op that dequeues mini-batches from the queue

## QueueRunner

The `QueueRunner` class creates a number of threads that repeatly run an `enqueue` op. These threads can use a coordinator to stop together. In addition a queue runner runs a closer thread that automatically closes the queue if an exception is reported to the coordinator.

## Put it together

In [8]:
def simple_shuffle_batch(source, capacity, batch_size=10):
    queue = tf.RandomShuffleQueue(capacity=capacity, min_after_dequeue=int(0.9*capacity),
                                 shapes=source.shape, dtypes=source.dtype)
    enqueue = queue.enqueue(source)
    num_threads = 4
    qr = tf.train.QueueRunner(queue,[enqueue]*num_threads)
    
    tf.train.add_queue_runner(qr)
    return queue.dequeue_many(%%bashtch_size)

The `simple_shuffle_batch` use a `QueueRunner` to execute the `enqueue` ops. but the queue runner don't start yet. Now we need start the queue runner and start a main thread to dequeue elements from queue.

In [13]:
input = tf.constant(list(range(1, 100)))
input = tf.data.Dataset.from_tensor_slices(input)
input = input.make_one_shot_iterator().get_next()

get_batch = simple_shuffle_batch(input, capacity=20)

# start queue runner directly


sess = tf.Session()

with sess.as_default() as sess:
    tf.train.start_queue_runners()
    while True:
        try:
            print(sess.run(get_batch))
        except tf.errors.OutOfRangeError:
            print("queue is empty")
            break

[ 6 13  2 21 23  7  5 18 19 20]
[ 8 24 31  4 25 26  3 17 27 22]
[40  9 14 38 15 37  1 41 44 39]
[43 28 50 12 49 32 47 45 42 56]
[53 54 34 58 52 33 10 63 35 64]
[57 29 51 72 36 16 71 70 67 78]
[59 62 30 80 69 55 73 82 77 87]
[88 84 79 65 90 61 11 60 92 96]
[85 83 86 68 66 99 97 93 75 76]
queue is empty


Or, we can start queue runners indirectly with `tf.train.MonitorSession`

In [14]:
input = tf.constant(list(range(1, 100)))
input = tf.data.Dataset.from_tensor_slices(input)
input = input.make_one_shot_iterator().get_next()

get_batch = simple_shuffle_batch(input, capacity=20)

# start queue runner directly


with tf.train.MonitoredSession() as sess:
    while not sess.should_stop():
        print(sess.run(get_batch))

[15  9  5  4  2 19 10 24 25  3]
[28 14  7 12 30 11 22 36 34 26]
[40 20 18 35 44 43 41  8  6 17]
[32 37 27 49 47 29 55 23 31 52]
[61 60 53 59 56 33 50 51 21 64]
[13 16 38 67 46 68 54 74  1 45]
[75 48 62 82 84 58 39 79 83 65]
[89 87 90 63 76 91 95 73 86 77]
[92 78 93 99 71 69 98 66 57 70]


## To-Do list

`tf.train.shuffle_batch`.