<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#队列与多线程" data-toc-modified-id="队列与多线程-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>队列与多线程</a></span></li><li><span><a href="#Tensorflow-中的Queue" data-toc-modified-id="Tensorflow-中的Queue-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Tensorflow 中的Queue</a></span><ul class="toc-item"><li><span><a href="#Queue" data-toc-modified-id="Queue-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Queue</a></span></li><li><span><a href="#QueueRunner" data-toc-modified-id="QueueRunner-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>QueueRunner</a></span><ul class="toc-item"><li><span><a href="#Coordinator" data-toc-modified-id="Coordinator-2.2.1"><span class="toc-item-num">2.2.1&nbsp;&nbsp;</span>Coordinator</a></span></li></ul></li><li><span><a href="#三合一" data-toc-modified-id="三合一-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>三合一</a></span><ul class="toc-item"><li><span><a href="#显示的创建-QueueRunner，-然后调用它的-create_threads-方法启动线程" data-toc-modified-id="显示的创建-QueueRunner，-然后调用它的-create_threads-方法启动线程-2.3.1"><span class="toc-item-num">2.3.1&nbsp;&nbsp;</span>显示的创建 QueueRunner， 然后调用它的 create_threads 方法启动线程</a></span></li><li><span><a href="#使用全局的start_queue_runners方法启动线程" data-toc-modified-id="使用全局的start_queue_runners方法启动线程-2.3.2"><span class="toc-item-num">2.3.2&nbsp;&nbsp;</span>使用全局的start_queue_runners方法启动线程</a></span></li></ul></li></ul></li><li><span><a href="#Tensorflow-数据读取机制" data-toc-modified-id="Tensorflow-数据读取机制-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Tensorflow 数据读取机制</a></span></li><li><span><a href="#文件队列" data-toc-modified-id="文件队列-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>文件队列</a></span></li><li><span><a href="#组合训练数据-(batching)" data-toc-modified-id="组合训练数据-(batching)-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>组合训练数据 (batching)</a></span></li><li><span><a href="#输入数据处理框架-(示例)" data-toc-modified-id="输入数据处理框架-(示例)-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>输入数据处理框架 (示例)</a></span></li></ul></div>

## 队列与多线程

在Tensorflow中，队列不仅是一种数据结构，还是异步计算张量取值的一个重要机制。比如多个线程可以同时向一个队列中写元素，或者同时读取一个队列中的元素。  

<br/>在Tensorflow中，队列和变量类似，都是计算图上`有状态的节点`；其他的计算节点可以修改它们的状态。修改队列状态的操作主要有`Enqueue`, `EnqueueMany` 和 `Dequeue`

## Tensorflow 中的Queue
* Queue 是TF队列和缓存机制的实现
* QueueRunner 是TF中对操作Queue的线程的封装
* Coordinator 是TF中用来协调线程运行的工具

### Queue
根据实现方式的不同，常用的几种类型如下:
* tf.FIFOQueue  按入列顺序出列的队列
```python
__init__(
    capacity,
    dtypes,
    shapes=None,
    names=None,
    shared_name=None,
    name='fifo_queue'
)
```
* tf.RandomShuffleQueue  随机顺序出列的队列
```python
__init__(
    capacity,
    min_after_dequeue,
    dtypes,
    shapes=None,
    names=None,
    seed=None,
    shared_name=None,
    name='random_shuffle_queue'
)
```
* tf.PaddingFIFOQueue  以固定长度批量出列的队列
```python
__init__(
    capacity,
    dtypes,
    shapes,
    names=None,
    shared_name=None,
    name='padding_fifo_queue'
)
```
* tf.PriorityQueue  带优先级出列的队列
```python
__init__(
    capacity,
    types,
    shapes=None,
    names=None,
    shared_name=None,
    name='priority_queue'
)
```

> Queue主要包含`入列(enqueue)`和`出列(dequeue)`两个操作；enqueue操作返回计算图中的一个Operation节点，dequeue操作返回一个Tensor值

> 如果一次性入列的数据超过Queue Size大小，enqueue操作会卡住，直到有数据(被其他线程)从队列取出；对一个已经取空的队列使用dequeue操作也会卡住，直到有新的数据(从其他线程)写入



In [None]:
import tensorflow as tf

# 创建一个先进先出的队列, 指定队列中最多可以保存两个元素, 并且指定类型为整数
q = tf.FIFOQueue(2, "int32")

# 使用enqueue_many一次接收多个元素
init = q.enqueue_many(([0, 10],))

# 使用dequeue函数将队列中的第一个元素出列
x = q.dequeue()

y = x + 1
q_inc = q.enqueue([y])

with tf.Session() as sess:
    init.run()
    for _ in range(5):
        v, _ = sess.run([x, q_inc])
        print v

### QueueRunner

Tensorflow的计算主要在使用CPU/GPU和内存，而数据读取涉及磁盘操作，速度远低于前者操作。因此通常会使用多个线程读取数据，然后使用一个线程消费数据。QueueRunner就是来管理这些读写队列的线程的

> QueueRunner需要与Queue一起使用（这名字已经注定了它和Queue脱不开干系），但并不一定必须使用Coordinator。

In [None]:
import tensorflow as tf

q = tf.FIFOQueue(10, "float")
counter = tf.Variable(0.0)

increment_op = tf.assign_add(counter, 1.0)
enqueue_op = q.enqueue(counter)

# 创建了4个线程, 两个用于'increment_op', 两个用于'enqueue_op'
qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)
'''
__init__(
    queue=None,
    enqueue_ops=None,
    close_op=None,
    cancel_op=None,
    queue_closed_exception_types=None,
    queue_runner_def=None,
    import_scope=None
)
'''

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    qr.create_threads(sess, start=True)
    for i in range(20):   # 主线程消费完20个数据后停止, 但其他线程继续运行(导致抛出异常)
        print sess.run(q.dequeue())

#### Coordinator

Coordinator 是用来保存线程组运行状态的协调器对象，它和Tensorflow的Queue没有必然关系，是可以单独和Python线程使用的

Coordinator 可以用来同时停止多个工作线程并且向等待所有工作线程结束的程序报告异常，该线程捕获到这个异常之后就会终止所有线程

In [None]:
import tensorflow as tf
import threading, time

def loop(coord, id):
    t = 0
    while not coord.should_stop():
        print "id: {}, t: {}\n".format(id, t)
        time.sleep(1)
        t += 1
        if (t >= 2 and id == 1):
            # 只要有任意一个线程调用了Coordinator的request_stop方法，
            # 所有的线程都可以通过should_stop方法感知并停止当前线程
            coord.request_stop()
            
# 创建一个线程管理器对象    
coord = tf.train.Coordinator()
# 创建10个线程
threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]

for t in threads:
    t.start()

# 等待线程结束
coord.join(threads)

### 三合一

在Tensorflow中用Queue的经典模式有两种，都是配合了QueueRunner和Coordinator一起使用

#### 显示的创建 QueueRunner， 然后调用它的 create_threads 方法启动线程

In [None]:
import numpy as np
import tensorflow as tf

# 创建一个[1000, 4]二维矩阵, 每个值为 1~10之间的随机数
data = 10 * np.random.randn(1000, 4) + 1
# 创建1000个随机整数, 值为 0 / 1
target = np.random.randint(0, 2, size=1000)

# 创建Queue, 队列中每一项包含一个输入数据(4个元素)和相应的目标值(1个元素)
queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])

# 批量入列数据
enqueue_op = queue.enqueue_many([data, target])
# 出列数据
data_sample, label_sample = queue.dequeue()

# 创建包含4个线程的QueueRunner
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    # 启动QueueRunner管理的线程
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
    # 主线程, 消费100个数据
    for step in range(100):
        if coord.should_stop():
            break
        data_batch, label_batch = sess.run([data_sample, label_sample])
        print 'step: {}, data: {}, label: {}'.format(step, data_batch, label_batch)
    # 主线程计算完成, 停止所有采集数据的线程
    coord.request_stop()
    coord.join(enqueue_threads)

#### 使用全局的start_queue_runners方法启动线程

In [None]:
import tensorflow as tf

# 同时打开多个文件, 显示创建Queue, 同时隐含了QueueRunner的创建
filename_queue = tf.train.string_input_producer(["data1.csv", "data2.csv"])
reader = tf.TextLineReader(skip_header_lines=1)

# Tensorflow的Reader对象可以直接接受一个Queue作为输入
key, value = reader.read(filename_queue)

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    # 启动计算图中所有的队列线程
    threads = tf.train.start_queue_runners(coord=coord)
    # 主线程, 消费100个数据
    for _ in range(100):
        features, labels = sess.run([data_batch, label_batch])
    # 主线程计算完成, 停止所有采集数据的进程
    coord.request_stop()
    coord.join(threads)
    
'''
tf.train.string_input_producer 会将一个隐含的QueueRunner添加到全局图中；
由于没有显式地返回 QueueRunner，所以无法使用create_threads 启动线程；
这时可以使用 tf.train.start_queue_runners 方法直接启动
tf.GraphKeys.QUEUE_RUNNERS 集合中的所有队列线程
'''    

## Tensorflow 数据读取机制

Tensorflow 使用"文件名队列　＋　内存队列"双队列的形式读入文件(这样可以很好地管理`epoch(运行一个epoch就是将这个数据集中的数据全部计算一遍)`)
![tensorflow数据读取机制](illustration/tensorflow数据读取机制.png)

<br/>**_如何在Tensorflow中创建上述两个队列呢？_**
* 对于`文件名队列`，使用`tf.train.string_input_producer`函数；这个函数需要传入一个文件名list，系统会自动将它转为一个文件名队列
```python
tf.train.string_input_producer(
    string_tensor,
    num_epochs=None,
    shuffle=True,
    seed=None,
    capacity=32,
    shared_name=None,
    name=None,
    cancel_op=None
)
```
* 对于`内存队列`，在Tensorflow中不需要自己建立，只需要使用`reader`对象从文件名队列中读取数据就可以了
```python
filename_queue = tf.train.string_input_producer(["d1.csv", "d2.csv"])
reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)
```

<br/>**补充: tf.train.start_queue_runners的作用**  
* 在使用`tf.train.string_input_producer`创建文件名队列后，整个系统流程其实还处于"停滞状态"，也就是，文件名并没有真正被加入队列中；所有如果此时开始计算，因为内存队列中什么也没有，计算单元就会一直等待，导致整个系统被阻塞  
* 而使用`tf.train.start_queue_runners`之后，才会启动填充队列的线程，这时系统就不再"停滞"
```python
tf.train.start_queue_runners(
    sess=None,
    coord=None,
    daemon=True,
    start=True,
    collection=tf.GraphKeys.QUEUE_RUNNERS
)
```

In [2]:
import tensorflow as tf

with tf.Session() as sess:
    filename = ['A.jpg', 'B.jpg', 'C.jpg']
    # 通过 string_input_producer 函数产生一个文件名队列
    filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)
    reader = tf.WholeFileReader()
    key, value = reader.read(filename_queue)
    # 由于 string_input_producer 函数中定义了一个 epoch 变量，需要对它进行初始化
    tf.local_variables_initializer().run()
    # 只有调用 start_queue_runners 之后，才会开始填充队列
    threads = tf.train.start_queue_runners(sess=sess)
    i = 0
    while True:   # 程序最后会抛出 OutOfRangeError异常
        i += 1
        image_data = sess.run(value)
        with open("read/test_%d.jpg" % i, "wb") as f:
            f.write(image_data)

## 文件队列

虽然一个TFRecord文件中可以存储多个训练样例，但是当训练数据量较大时，可以将数据分成多个TFRecord文件来提供处理效率  

Tensorflow 提供了`tf.train.match_filenames_once`函数来获取符合一个正则表达式的所有文件，得到的文件列表可以通过`tf.train.string_input_producer`函数进行有效管理  

`tf.train.string_input_producer`函数会使用初始化时提供的文件列表创建一个输入队列，输入队列中原始的元素为文件列表中的所有文件; 创建好的输入队列可以作为文件读取函数的参数, 每次调用文件读取函数时, 该函数会先判断当前是否已有打开的文件可读, 如果没有或者打开的文件已经读完, 这个函数会从输入队列中出队一个文件并从这个文件中读取数据  

通过设置`shuffle`参数, `tf.train.string_input_producer`函数支持随机打乱文件列表中文件出队的顺序。`tf.train.string_input_producer`生成的输入队列可以同时被多个文件读取线程操作，而且输入队列会将队列中的文件均匀地分给不同的线程


In [None]:
import tensorflow as tf

def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 定义了写入多少个文件
num_shards = 2  
# 定义了每个文件中有多少个数据
instances_per_shard = 2

# 将数据存储为TFRecord格式
for i in range(num_shards):
    filename = '/path/to/data.tfrecords-%.5d-of-%.5d' % (i+1, num_shards)
    writer = tf.python_io.TFRecordWriter(filename)
    for j in range(instances_per_shard):
        example = tf.train.Example(features=tf.train.Feature(feature={
            "i" : _int64_feature(i),
            "j" : _int64_feature(j)
        }))
        writer.write(example.SerializeToString())
    writer.close()
    
# 获取文件列表
files = tf.train.match_filenames_once("/path/to/data.tfrecords-*")
'''
tf.train.match_filenames_once(pattern, name=None)
Args:
    pattern: A file pattern (glob), or 1D tensor of file patterns.
    name: A name for the operations (optional).
Returns:
    A variable that is initialized to the list of files matching the pattern(s).    
'''

# 创建输入队列, 并且打乱读文件的顺序
filename_queue = tf.train.string_input_producer(files, shuffle=True)
'''
tf.train.string_input_producer(
    string_tensor,
    num_epochs=None,
    shuffle=True,
    seed=None,
    capacity=32,
    shared_name=None,
    name=None,
    cancel_op=None
)
Args:
    string_tensor: A 1-D string tensor with the strings to produce.
    num_epochs: An integer (optional). If specified, string_input_producer produces each string from string_tensor num_epochs times before generating an OutOfRange error. If not specified, string_input_producer can cycle through the strings in string_tensor an unlimited number of times.
    shuffle: Boolean. If true, the strings are randomly shuffled within each epoch.
    seed: An integer (optional). Seed used if shuffle == True.
    capacity: An integer. Sets the queue capacity.
    shared_name: (optional). If set, this queue will be shared under the given name across multiple sessions. All sessions open to the device which has this queue will be able to access it via the shared_name. Using this in a distributed setting means each name will only be seen by one of the sessions which has access to this operation.
    name: A name for the operations (optional).
    cancel_op: Cancel op for the queue (optional).
'''

# 读取并解析一个样本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(serialized_example, features={
    "i" : tf.FixedLenFeature([], tf.int64),
    "j" : tf.FixedLenFeature([], tf.int64),
})

with tf.Session() as sess:
    # 使用tf.train.match_filenames_once函数时需要初始化一些变量
    sess.run(tf.global_variables_initializer())
    
    # 声明tf.train.Coordinator 类来协同不同线程, 并启动线程
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 多次执行获取数据的操作
    for i in range(6):
        print sess.run([features["i"], features["j"]])
    
    coord.request_stop()
    coord.join(threads)
    
'''
脚本会依次读出样例数据中的每一个样例，而且当所有样例都被读完之后，程序会自动从头开始；
但如果函数`tf.train.string_input_producer`的参数`num_epochs`限制为`1`，
那么程序将会报错
'''

## 组合训练数据 (batching)

Tensorflow 提供了`tf.train.batch`和`tf.train.shuffle_batch`函数来将单个的样例组织成`batch`的形式输出；这两个函数都会生成一个队列，每次出队得到的是`一个batch的样例`

In [None]:
import tensorflow as tf

# 延续使用上一步解析得到的样例; 假设'i'表示一个样例的特征向量, 'j'是对应的标签
example, label = features['i'], features['j']

# 一个batch中样例的个数
batch_size = 3
# 队列中最多可以存储的样例个数
capacity = 1000 + 3 * batch_size
# 使用 tf.train.batch 函数来组合样例, [example, label]参数给出了需要组合的元素
# 一般 example 和 label 分别代表训练样本和这个样本对应的正确标签
# batch_size 参数给出了每个 batch 中样本的个数
example_batch, label_batch = tf.train.batch(
    [example, label],  
    batch_size = batch_size,  
    capacity = capacity 
)
'''
tf.train.batch(
    tensors,      # [example, label]
    batch_size,
    num_threads=1,
    capacity=32,
    enqueue_many=False,
    shapes=None,
    dynamic_pad=False,
    allow_smaller_final_batch=False,
    shared_name=None,
    name=None
)
'''

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 获取并打印组合之后的样例 (这个输出一般会作为神经网络的输入)
    for i in range(2):
        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
        print cur_example_batch, cur_label_batch
        
    coord.request_stop()
    coord.join(threads)
    
    
# tf.train.shuffle_batch 函数与 tf.train.batch 类似，多了一个 min_after_dequeue 参数,
# min_after_dequeue 参数限制了出队时刻队列中元素的最少个数; 当队列中元素太少时, 随机打乱样例
# 顺序的作用就不大了；故：当出队函数被调用但是队列中的元素不够时, 出队操作阻塞，待更多的元素入队
# 之后才会完成出队操作

# 通过设置 tf.train.shuffle_batch 函数中的 num_threads 参数，可以指定多个线程同时执行入队操作；
# tf.train.shuffle_batch 函数的入队操作就是数据读取以及预处理的过程；当参数大于 1 时，
# 多个线程会同时读取一个文件中的不同样例并进行预处理

## 输入数据处理框架 (示例)

对于输入数据的处理，大体上流程都差不多，可以归结如下
1. 将数据转为 TFRecord 格式的多个文件
2. 用 tf.train.match_filenames_once() 创建文件列表
3. 用 tf.train.string_input_producer() 创建输入文件队列，可以将输入文件顺序随机打乱
4. 用 tf.TFRecordReader() 读取文件中的数据
5. 用 tf.parse_single_example() 解析数据
6. 对数据进行解码及预处理
7. 用 tf.train.shuffle_batch() 将数据组合成 batch
8. 将 batch 用于训练  


<br/>![Tensorflow输入数据处理流程示意图](illustration/Tensorflow输入数据处理流程示意图.gif)

In [None]:
import tensorflow as tf

files = tf.train.match_filenames_once("/path/to/file_pattern-*")
filename_queue = tf.train.string_input_producer(files, shuffle=False)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(serialized_example, features={
    "image" : tf.FixedLenFeature([], tf.string),
    "label" : tf.FixedLenFeature([], tf.int64),
    "height" : tf.FixedLenFeature([], tf.int64),
    "width" : tf.FixedLenFeature([], tf.int64),
    "channels" : tf.FixedLenFeature([], tf.int64),
})

image, label = features['image'], features['label']
height, width, channels = features['height'], features['width'], features['channels']

# 从原始图像数据解析出像素矩阵, 并根据图像尺寸还原图像
decoded_image = tf.decode_raw(image, tf.uint8)
decoded_image.set_shape([height, width, channels])

# 定义神经网络输入层图片的大小
image_size = 299
# 图像预处理
distorted_image = preprocess_for_train(decoded_image, image_size, image_size, None)

# 将处理后的图像和标签数据通过tf.train.shuffle_batch整理成神经网络训练时需要的batch
min_after_dequeue = 10000
batch_size = 100
capacity = min_after_dequeue + 3 * batch_size
image_batch, label_batch = tf.train.shuffle_batch(
    [distorted_image, label],
    batch_size = batch_size,
    capacity = capacity,
    min_after_dequeue = min_after_dequeue,
)

# 定义神经网络结果以及优化过程
logit = inference(image_batch)
loss = calc_loss(logit, label_batch)
train_step = tf.train.GradientDescentOptimizer(0.001).minimize(loss)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # 开始训练神经网络
    for i in range(1000):
        sess.run(train_step)
        
    # 停止所有线程
    coord.request_stop()
    coord.join(threads)

In [None]:
import numpy as np
import tensorflow as tf

# 给定一张图像, 随机调整图像的色彩。因为调整亮度, 对比度, 饱和度和色相的顺序会影响最后得到的结果
# 所以可以定义多种不同的顺序
def distort_color(image, color_ordering=0):
    if color_ordering == 0:
        image = tf.image.random_brightness(image, max_delta=32. / 255.)
        image = tf.image.random_saturation(image, lower=0.5, upper=1.5)
        image = tf.image.random_hue(image, max_delta=0.2)
        image = tf.image.random_contrast(image, lower=0.5, upper=1.5)
    elif color_ordering == 1:
        image = tf.image.random_saturation(image, lower=0.5, upper=1.5) 
        image = tf.image.random_brightness(image, max_delta=32. / 255.)        
        image = tf.image.random_contrast(image, lower=0.5, upper=1.5)   
        image = tf.image.random_hue(image, max_delta=0.2)
    elif color_ordering == 2:
        ...
        
    return tf.clip_by_value(image, 0.0, 1.0)


'''
    该函数对图像进行预处理后, 输出的就是神经网络模型的输入层
    @image: 解码后的图像
    @height: 图像处理之后的高度值
    @width: 图像处理之后的宽度值
    @bbox: 图像上的标注框
'''
def preprocess_for_train(image, height, width, bbox=None):
    # 如果没有提供标注框, 则认为整个图像就是需要关注的部分
    if bbox is None:
        bbox = tf.constant([0.0, 0.0, 1.0, 1.0],
                          dtype=tf.float32,
                          shape=[1, 1, 4])
        
    # 转换图像张量的数据类型
    if image.dtype != tf.float32:
        image = tf.image.convert_image_dtype(image, dtype=tf.float32)
        
    # 随机截取图像, 减少需要关注的物体大小对图像识别算法的影响
    bbox_begin, bbox_size, _ = tf.image.sample_distorted_bounding_box(
        tf.shape(image), bounding_boxes = bbox)
    distorted_image = tf.slice(image, bbox_begin, bbox_size)
    
    # 将随机截取的图像调整为神经网络输入层的大小 (大小调整的算法是随机选择的)
    distorted_image = tf.image.resize_images(
        distorted_image, height, width, method=np.random.randint(4))
    
    # 随机左右翻转图像
    distorted_image = tf.image.random_flip_left_right(distorted_image)
    
    # 使用一种随机的顺序调整图像色彩
    distorted_image = distort_color(distorted_image, np.random.randint(2))
    
    return distorted_image