虽然图像预处理的方法可以减小无关因素对图像识别模型效果的影响，但是这些复杂的预处理流程会减慢整个训练过程。为了避免图像预处理成为神经网络模型训练的瓶颈，TensorFlow提供了一套多线程处理数据的框架。<br/>
## 队列与多线程
在TensorFlow中，队列和变量类似，都是计算图上有状态的节点。其他计算节点可以修改他们的状态。对于变量，可以通过赋值操作修改变量的值。对于队列，修改队列状态的操作有Enqueue、EnqueueMany和Dequeue。<br/>
如下操作队列示例：<br/>

In [1]:
import tensorflow as tf

# 创建一个先进先出队列，队列最多保存两个元素，且类型为整数
q = tf.FIFOQueue(2, "int32")
# 使用enqueue_many来初始化队列元素
# enqueue_many(vals, name): vals：可以是一个tensor、一个tensor的list、
# 一个tensor的tuple
init = q.enqueue_many(([0, 10],))
# 使用dequeue函数将队列中的第一个元素出队
x = q.dequeue()
y = x + 1
# 将元素重新入队
q_inc = q.enqueue([y])
with tf.Session() as sess:
    # 运行初始化队列操作
    init.run()
    for _ in range(5):
        # 运行出队和重新入队操作
        v, _ = sess.run([x, q_inc])
        print(v)

0
10
1
11
2


TensorFlow除了提供FIFOQueue队列，还提供了RandomShuffleQueue。`FIFOQueue`实现的是一个先进先出的队列。`RandomShuffleQueue`会将队列中元素打乱，每次出队列操作得到的是从当前队列所有元素中随机选择一个。<br/>
在TensorFlow中，队列不仅是一种数据结构，还是异步计算张量取值的一个重要机制。比如多个线程可以同时往一个队列中读写元素。<br/>
TensorFlow提供了tf.Coordinator和tf.QueueRunner两个类来完成多线程协同功能。<br/>
`tf.Coordinator`类主要用于保存线程组运行状态的协调器对象，它与TensorFlow的Queue没有必然联系，可以单独与Python的线程使用。该类提供了should_stop、request_stop和join三个函数。<br/>
在线程启动之前，首先声明一个tf.Coordinator类，并将该类传入创建的每个线程中。启动的线程需一直查询函数should_stop是否返回True，当返回True时，则当前线程退出。每一个启动的线程都可以通过调用request_stop函数来通知其他线程退出。只要有任何一个线程调用了Coordinator的request_stop方法，所有的线程都可以通过should_stop方法感知并停止当前线程。<br/>
使用`tf.Coordinator`类示例：

In [4]:
import tensorflow as tf
import numpy as np
import threading
import time

def MyLoop(coord, worker_id):
    while not coord.should_stop():
        if np.random.rand() < 0.1:
            print("Stoping from id:%d\n" % worker_id)
            coord.request_stop()
        else:
            print("Working on id:%d\n" % worker_id)
        time.sleep(1)

coord = tf.train.Coordinator()
# 创建五个线程
threads = [
    threading.Thread(target=MyLoop, args=(coord, i,)) for i in range(5)]
# 启动所有线程
for t in threads: t.start()
# 等待所有线程退出
coord.join(threads)

Working on id:0

Working on id:1
Working on id:2


Working on id:3

Working on id:4

Working on id:0

Working on id:1
Working on id:2


Working on id:4

Working on id:3

Working on id:0
Stoping from id:1




`tf.QueueRunner`主要用于启动多个线程来操作同一个队列，启动的线程可以通过`tf.Coordinator`来统一管理。

In [5]:
import tensorflow as tf

queue = tf.FIFOQueue(100, "float")
# 定义入队操作
enqueue_op = queue.enqueue([tf.random_normal([1])])
# 创建多线程运行队列的入队操作
# 第一个参数被操作的队列
# 第二个参数启动5个线程，每个线程运行enqueue_op操作
qr = tf.train.QueueRunner(queue, [enqueue_op]*5)
# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合。
# add_queue_runner函数没有指定集合，默认加入tf.GraphKeys.QUEUE_RUNNERS
tf.train.add_queue_runner(qr)
# 定义出队操作
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # 使用tf.train.Coordinator来协同启动线程
    coord = tf.train.Coordinator()
    # 明确调用tf.train.start_queue_runners来启动所有线程
    # tf.train.start_queue_runners函数会默认启动tf.GraphKeys.QUEUE_RUNNERS
    # 集合中所有QueueRunner。
    # 一般来说，tf.train.add_queue_runner函数和tf.train.start_queue_runnners
    # 函数会指向同一个集合
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    for _ in range(3): print(sess.run(out_tensor)[0])
    # 使用tf.train.Coordinator停止所有线程
    coord.request_stop()
    coord.join(threads)

0.16973639
-0.4061291
0.42650115


## 输入文件队列
虽然TensorFlow中一个TFRecord文件中可以存储多个训练样本，但是当训练数据量较大时，可以将数据分成多个TFRecord文件来提供处理效率。TensorFlow提供`tf.train.math_filenames_once`函数来获取符合正则表达式的所有文件，得到的文件列表可以通过`tf.train.string_input_producer`函数进行管理。<br/>
`tf.train.string_input_producer`函数会使用初始化时提供的文件列表创建一个输入队列，输入队列中原始的元素为文件列表中的所有文件。通过设置`shuffle`参数，tf.train.string_input_producer函数支持随机打乱文件列表中文件出队顺序。当shuffle参数设置为True时，文件在加入队列之前会被打乱顺序，所有出队顺序也是随机的。随机打乱文件顺序以及加入输入队列的过程会单独跑在一个单独的线程上，这样不会影响获取文件的速度。tf.train.string_input_producer生成的输入队列可以同时被多个文件读取线程操作，而且输入队列会将队列中的文件均匀地分配给不同线程，不会出现有些文件处理多次而有些文件没有被处理过。<br/>
当一个输入队列中的所有文件被处理完后，它会将初始化时提供的文件列表中的文件全部重新加入队列。`tf.train.string_input_producer`函数可以设置`num_epochs`参数来限制初始文件列表的最大轮数。当所有文件都已经被使用了设定的轮数后，如果继续尝试读取新的文件，输入队列会报OutOfRange错误。<br/>
生成样例数据的示例：<br/>

In [6]:
import tensorflow as tf

def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

num_shards = 2
instances_per_shard = 3
for i in range(num_shards):
    filename = ('./Data/data.tfrecord-%.5d-of-%.5d' % (i, num_shards))
    writer = tf.python_io.TFRecordWriter(filename)
    for j in range(instances_per_shard):
        example = tf.train.Example(features=tf.train.Features(feature={
            'i': _int64_feature(i),
            'j': _int64_feature(j)
        }))
        writer.write(example.SerializeToString())
    writer.close()

程序运行会后，在'./Data/data.tfrecord-00000-of-00002'和'./Data/data.tfrecord-00001-of-00002'。每个文件中存储了两个样例。以下代码展示tf.train.match_filenames_once函数和tf.train.string_input_producer函数的使用。

In [21]:
import tensorflow as tf

# 获取文件列表
files = tf.train.match_filenames_once('./Data/data.tfrecord-*')
# 创建输入队列
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# 读取并解析一个样本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example, 
    features = {
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    })

with tf.Session() as sess:
    # 使用tf.train.match_filenames_once函数时需要初始化一些变量
    tf.local_variables_initializer().run()
    print(sess.run(files))
    
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    for i in range(6):
        print(sess.run([features['i'], features['j']]))
    
    coord.request_stop()
    coord.join(threads)

[b'./Data/data.tfrecord-00000-of-00002'
 b'./Data/data.tfrecord-00001-of-00002']
[0, 0]
[0, 1]
[0, 2]
[1, 0]
[1, 1]
[1, 2]


**tf.global_variables_initializer()与tf.local_variables_initializer()的区别**<br/>
- `tf.global_variables_initializer()`添加节点用于初始化所有的变量(GraphKeys.VARIABLES)。返回一个初始化所有全局变量的操作。在构建完整个模型并在会话中加载模型后，运行这个节点。能够将所有的变量一步到位的初始化，非常的方便。通过feed_dict, 也可以将指定的列表传递给它，只初始化列表中的变量。
- `tf.local_variables_initializer()`返回一个初始化所有局部变量的操作（Op）。初始化局部变量（GraphKeys.LOCAL_VARIABLE）。GraphKeys.LOCAL_VARIABLE中的变量指的是被添加入图中，但是未被储存的变量。match_filenames_once函数最后一行可以看到使用了local Variable，所以session里面要对local变量初始化，对全局变量初始化没用。

## 组合训练数据(batching)
TensorFlow提供了`tf.train.batch`和`tf.train.shuffle_batch`函数来将单个样本组织成batch的形式输出的。这两个函数都会生成一个队列，队列的入队操作是生成单个样例的方法，而每次出队得到的是一个batch的样例。它们唯一的区别在于是否将数据顺序打乱。<br/>
tf.train.batch函数的使用方法：

In [22]:
import tensorflow as tf

example, label = features['i'], features['j']
# batch大小
batch_size = 3
# 组合样例队列中最多可以存储样例个数
capacity = 1000 + 3 * batch_size
# 使用tf.train.batch来组合样例。其中capacity给出了队列的最大容量
example_batch, label_batch = tf.train.batch([example, label], 
                                            batch_size = batch_size,
                                            capacity = capacity)
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    for i in range(2):
        cur_example_batch, cur_label_batch = sess.run([example_batch, 
                                                       label_batch])
        print(cur_example_batch, cur_label_batch)
    coord.request_stop()
    coord.join(threads)

INFO:tensorflow:Error reported to Coordinator: <class 'tensorflow.python.framework.errors_impl.FailedPreconditionError'>, Attempting to use uninitialized value matching_filenames
	 [[Node: matching_filenames/read = Identity[T=DT_STRING, _class=["loc:@input_producer/ArithmeticOptimizer/RemoveIdempotent_Identity"], _device="/job:localhost/replica:0/task:0/device:CPU:0"](matching_filenames)]]


OutOfRangeError: FIFOQueue '_165_input_producer_2' is closed and has insufficient elements (requested 1, current size 0)
	 [[Node: ReaderReadV2_2 = ReaderReadV2[_device="/job:localhost/replica:0/task:0/device:CPU:0"](TFRecordReaderV2_2, input_producer_2)]]

Caused by op 'ReaderReadV2_2', defined at:
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.6/site-packages/ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "/usr/local/lib/python3.6/site-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/usr/local/lib/python3.6/site-packages/ipykernel/kernelapp.py", line 486, in start
    self.io_loop.start()
  File "/usr/local/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 132, in start
    self.asyncio_loop.run_forever()
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 422, in run_forever
    self._run_once()
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 1432, in _run_once
    handle._run()
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 122, in _handle_events
    handler_func(fileobj, events)
  File "/usr/local/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/zmq/eventloop/zmqstream.py", line 450, in _handle_events
    self._handle_recv()
  File "/usr/local/lib/python3.6/site-packages/zmq/eventloop/zmqstream.py", line 480, in _handle_recv
    self._run_callback(callback, msg)
  File "/usr/local/lib/python3.6/site-packages/zmq/eventloop/zmqstream.py", line 432, in _run_callback
    callback(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/usr/local/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 233, in dispatch_shell
    handler(stream, idents, msg)
  File "/usr/local/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "/usr/local/lib/python3.6/site-packages/ipykernel/ipkernel.py", line 208, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "/usr/local/lib/python3.6/site-packages/ipykernel/zmqshell.py", line 537, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2662, in run_cell
    raw_cell, store_history, silent, shell_futures)
  File "/usr/local/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2785, in _run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/usr/local/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2903, in run_ast_nodes
    if self.run_code(code, result):
  File "/usr/local/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2963, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-21-226cca117585>", line 9, in <module>
    _, serialized_example = reader.read(filename_queue)
  File "/usr/local/lib/python3.6/site-packages/tensorflow/python/ops/io_ops.py", line 164, in read
    return gen_io_ops.reader_read_v2(self._reader_ref, queue_ref, name=name)
  File "/usr/local/lib/python3.6/site-packages/tensorflow/python/ops/gen_io_ops.py", line 941, in reader_read_v2
    queue_handle=queue_handle, name=name)
  File "/usr/local/lib/python3.6/site-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "/usr/local/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 3414, in create_op
    op_def=op_def)
  File "/usr/local/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 1740, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

OutOfRangeError (see above for traceback): FIFOQueue '_165_input_producer_2' is closed and has insufficient elements (requested 1, current size 0)
	 [[Node: ReaderReadV2_2 = ReaderReadV2[_device="/job:localhost/replica:0/task:0/device:CPU:0"](TFRecordReaderV2_2, input_producer_2)]]


tf.train.shuffle_batch()函数使用：

In [None]:
example, label = features['i'], features['j']

# min_after_dequeue参数限制了出队时队列中元素的最少个数
example_batch, label_batch = tf.train.shuffle_batch(
    [example, label], batch_size=batch_size, capacity=capacity, 
    min_after_dequeue=30)

tf.train.batch和tf.train.shuffle_batch函数除了可以将单个训练数据整理成输入batch，也提供并行处理输入数据的方法。通过设置`num_threads`参数，可以指定多个线程同时执行入队操作。`tf.train.shuffle_batch`函数的入队操作就是数据读取和预处理的过程。当num_threads参数大于1时，多个线程会同时读取一个文件中的不同样例并进行预处理。如果需要多个线程处理不同文件中的样例时，则可以使用`tf.train.shuffle_batch_join`函数。<br/>
tf.train.shuffle_batch函数和 tf.train.shuffle_batch_join函数都可以完成多线程并行的方式来进行数据预处理，但它们各有优劣。对于tf.train.shuffle_batch函数，不同线程会读取同一个文件。如果一个文件中的样例比较相似(比如都属于同一个类别)，那么神经网络的 训练效果有可能会受到影响。所以在使用tf.train.shuffie_batch函数时，需要尽量将同一个TFRecord文件中的样例随机打乱。而使用tf.train.shuffle_batch_join函数时，不同线程会读取不同文件.如果读取数据的线程数比总文件数还大，那么多个线程可能会读取同一个文件中相近部分的数据。而且多个线程读取多个文件可能导致过多的硬盘寻址，从而使得读取效率降低.不同的井行化方式各有所长，具体采用哪一种方法需要根据具体情况来确定。

## 输入数据处理框架
<img src="attachment:image.png" width="550">
tf.train.string_input_producer函数会生成并维护一个输入文件队列，不同线程中的文件读取函数可以共享这个输入文件队列。在读取样例数据之后，需要将图像进行预处理。图像预 处理的过程也会通过tf.train.shuffle_batch提供的机制井行地跑在多个线程中。输入数据处理流程的最后通过tf.train.shuffle_batch函数将处理好的单个输入样例整理成batch提供给神经网络的输入层。通过这种方式，可以有效地提高数据预处理的效率，避免数据预处理成为神经网络模型训练过程中的性能瓶颈。