In [36]:
import tensorflow as tf
import time

#### 1. 生成文件存储样例数据。

In [50]:
# 将int64转换成tf.train.Feature格式
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 定义将数据写入多少个文件
num_shards = 2 

# 定义每个文件写入多少个样本
instances_per_shard = 2 

for i in range(num_shards):
    filename = ('../../datasets/data.tfrecords-%.5d-of-%.5d' % (i, num_shards)) 
    
    # 将Example结构写入TFRecord文件。
    writer = tf.python_io.TFRecordWriter(filename)
    
    for j in range(instances_per_shard):
        
        # Example结构仅包含当前样例属于第几个文件以及是当前文件的第几个样本。
        example = tf.train.Example(features=tf.train.Features(feature={
            'i': _int64_feature(i),
            'j': _int64_feature(j)}))
        writer.write(example.SerializeToString())
    writer.close()  

#### 2. 读取文件。

In [59]:
# 使用tf.train.match_filenames_once函数获取一个符合正则表达式的所有文件
files = tf.train.match_filenames_once("../../datasets/data.tfrecords-*")

# 使用tf.string_input_producer 产生文件名队列
filename_queue = tf.train.string_input_producer(files, shuffle=False, num_epochs = None) 

# 使用tf.TFRecordReader去读文件名队列
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)

# 解析一个样例
features = tf.parse_single_example(
      serialized_example,
      features={
          'i': tf.FixedLenFeature([], tf.int64),
          'j': tf.FixedLenFeature([], tf.int64),
      })

with tf.Session() as sess:
    sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()])
    print(sess.run(files))
    
    # 使用tf.train.Coordinator和tf.train.start_queue_runners管理和启动线程
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    for i in range(6):
        print(sess.run([features['i'], features['j']]))
    # 使用coord.request_stop停止所有线程
    coord.request_stop()
    coord.join(threads)

[b'..\\..\\datasets\\data.tfrecords-00000-of-00002'
 b'..\\..\\datasets\\data.tfrecords-00001-of-00002']
[0, 0]
[0, 1]
[1, 0]
[1, 1]
[0, 0]
[0, 1]


#### 3. 组合训练数据（Batching）

In [60]:
example, label = features['i'], features['j']

# 一个batch中包含的样例数目
batch_size = 3

# 组合样例的队列中最多可以存储的样例个数
capacity = 1000 + 3 * batch_size

# 使用tf.train.batch函数组合样例
example_batch, label_batch = tf.train.batch([example, label], batch_size=batch_size, capacity=capacity)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    tf.local_variables_initializer().run()
    
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        
    for i in range(2):
        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
        print(cur_example_batch, cur_label_batch)
        
    coord.request_stop()
    coord.join(threads)


[0 0 1] [0 1 0]
[1 0 0] [1 0 1]


example: 0, lable: 0

example: 0, lable: 1

example: 1, lable: 0

example: 1, lable: 1

#### 4. 使用tr.train.shuffle_batch

In [63]:
# 使用tf.train.match_filenames_once函数获取一个符合正则表达式的所有文件
files = tf.train.match_filenames_once("../../datasets/data.tfrecords-*")

# 使用tf.string_input_producer 产生文件名队列
filename_queue = tf.train.string_input_producer(files, shuffle=False, num_epochs = 1) 

# 使用tf.TFRecordReader去读文件名队列
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)

# 解析一个样例
features = tf.parse_single_example(
      serialized_example,
      features={
          'i': tf.FixedLenFeature([], tf.int64),
          'j': tf.FixedLenFeature([], tf.int64),
      })

###############################################组合batch#########################
example, label = features['i'], features['j']

# 一个batch中包含的样例数目
batch_size = 4

# 组合样例的队列中最多可以存储的样例个数
capacity = 1000 + 3 * batch_size

# 使用tf.train.shuffle_batch函数组合样例,min_after_dequeue参数限制了出队时队列中元素的最少个数
example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=batch_size, capacity=capacity, 
                                                   min_after_dequeue = 30)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    tf.local_variables_initializer().run()
    
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        
    for i in range(1):
        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
        print(cur_example_batch, cur_label_batch)
        
    coord.request_stop()
    coord.join(threads)

[0 1 0 1] [1 0 0 1]
INFO:tensorflow:Error reported to Coordinator: <class 'tensorflow.python.framework.errors_impl.CancelledError'>, Run call was cancelled


OutOfRangeError: RandomShuffleQueue '_1630_shuffle_batch_3/random_shuffle_queue' is closed and has insufficient elements (requested 4, current size 0)
	 [[Node: shuffle_batch_3 = QueueDequeueManyV2[component_types=[DT_INT64, DT_INT64], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](shuffle_batch_3/random_shuffle_queue, shuffle_batch_3/n)]]

Caused by op 'shuffle_batch_3', defined at:
  File "D:\software\anaconda\envs\tensorflow\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "D:\software\anaconda\envs\tensorflow\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\traitlets\config\application.py", line 658, in launch_instance
    app.start()
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\ipykernel\kernelapp.py", line 486, in start
    self.io_loop.start()
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tornado\platform\asyncio.py", line 127, in start
    self.asyncio_loop.run_forever()
  File "D:\software\anaconda\envs\tensorflow\lib\asyncio\base_events.py", line 422, in run_forever
    self._run_once()
  File "D:\software\anaconda\envs\tensorflow\lib\asyncio\base_events.py", line 1432, in _run_once
    handle._run()
  File "D:\software\anaconda\envs\tensorflow\lib\asyncio\events.py", line 145, in _run
    self._callback(*self._args)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tornado\platform\asyncio.py", line 117, in _handle_events
    handler_func(fileobj, events)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tornado\stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\zmq\eventloop\zmqstream.py", line 450, in _handle_events
    self._handle_recv()
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\zmq\eventloop\zmqstream.py", line 480, in _handle_recv
    self._run_callback(callback, msg)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\zmq\eventloop\zmqstream.py", line 432, in _run_callback
    callback(*args, **kwargs)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tornado\stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\ipykernel\kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\ipykernel\kernelbase.py", line 233, in dispatch_shell
    handler(stream, idents, msg)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\ipykernel\kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\ipykernel\ipkernel.py", line 208, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\ipykernel\zmqshell.py", line 537, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\IPython\core\interactiveshell.py", line 2662, in run_cell
    raw_cell, store_history, silent, shell_futures)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\IPython\core\interactiveshell.py", line 2785, in _run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\IPython\core\interactiveshell.py", line 2903, in run_ast_nodes
    if self.run_code(code, result):
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\IPython\core\interactiveshell.py", line 2963, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-63-ab19250023a0>", line 30, in <module>
    min_after_dequeue = 30)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\training\input.py", line 1301, in shuffle_batch
    name=name)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\training\input.py", line 847, in _shuffle_batch
    dequeued = queue.dequeue_many(batch_size, name=name)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\ops\data_flow_ops.py", line 483, in dequeue_many
    self._queue_ref, n=n, component_types=self._dtypes, name=name)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\ops\gen_data_flow_ops.py", line 3795, in queue_dequeue_many_v2
    component_types=component_types, timeout_ms=timeout_ms, name=name)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\framework\op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\framework\ops.py", line 3290, in create_op
    op_def=op_def)
  File "D:\software\anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\framework\ops.py", line 1654, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

OutOfRangeError (see above for traceback): RandomShuffleQueue '_1630_shuffle_batch_3/random_shuffle_queue' is closed and has insufficient elements (requested 4, current size 0)
	 [[Node: shuffle_batch_3 = QueueDequeueManyV2[component_types=[DT_INT64, DT_INT64], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](shuffle_batch_3/random_shuffle_queue, shuffle_batch_3/n)]]
