在tf中，队列不仅是一种数据结构，它更提供了多线程机制。队列也是多线程输入数据处理框架的基础。

队列和变量类似，都是计算图上有状态的节点。

* 变量状态修改方法：赋值操作
* 队列状态修改方法：Enqueue、EnqueueMany和Eequeue

TensorFlow有两种队列：
* tf.FIFOQueue：先进先出队列
* tf.RandomShuffleQueue：随机调度队列

In [1]:
import tensorflow as tf

#### 1. 创建队列，并操作里面的元素。

In [2]:
# 创建一个FIFO队列，队列中最多2个元素，类型为int32
q = tf.FIFOQueue(2, "int32")
# enqueue_many初始化队列中的元素
init = q.enqueue_many(([0, 10], ))
# dequeue把第一个元素0出队列，并将其值保存到变量x
x = q.dequeue()
y = x + 1
# enqueue把1插入到队列
q_inc = q.enqueue([y])
with tf.Session() as sess:
    # 运行初始化队列
    init.run()
    for _ in range(5):
        v, _ = sess.run([x, q_inc])
        print(v)

0
10
1
11
2


tf提供了 tf.train.Coordinator 和 tf.train.QueueRunner两个类来完成多线程协同的功能。

tf.train.Coordinator 主要用于协同多个线程一起停止，并提供了三个函数：should_stop、request_stop和join
* 在启动线程之前，需要声明一个Coordinator类，并将该类传入每一个创建的线程中。
* 启动的线程需要一直查询Coordinator类中提供的should_stop函数，当该函数返回true时，则当前线程也需要退出。
* 当某一个线程调用request_stop函数之后，should_stop函数的返回值将被设置为True，这样其他的线程就可以同时中止了。

In [3]:
import numpy as np
import threading
import time

#### 2. 这个程序每隔1秒判断是否需要停止并打印自己的ID。

In [4]:
# 每隔1秒判断是否需要停止并打印自己的ID
def MyLoop(coord, worker_id):
    # 判断当前线程是否需要停止
    while not coord.should_stop():
        # 随机停止所有的线程
        if np.random.rand()<0.1:
            print("Stoping from id: %d\n" % worker_id)
            # 调用request_stop来通知其他线程停止
            coord.request_stop()
        else:
            print("Working on id: %d\n" % worker_id)
        time.sleep(1)

#### 3. 创建、启动并退出线程。

In [5]:
# 声明一个Coordinator类来协同多个线程
coord = tf.train.Coordinator()
# 声明创建5个线程
threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)]
# 启动所有线程
for t in threads:t.start()
# 等待所有线程退出
coord.join(threads)

Working on id: 1

Working on id: 0

Working on id: 2

Stoping from id: 3

