## 7.4 数据集（Dataset）
上一节介绍了通过队列进行多线程输入的方法。除队列以外，TensorFlow还提供了一套更高层的数据处理框架。**在新的框架中，每一个数据来源被抽象成一个“数据集”，开发者可以以数据集为基本对象，方便地进行batching、随机打乱（shuffle）等操作。从1.3版本起，TensorFlow正式推荐使用数据集作为输入数据的首选框架**。从1.4版本起，数据集框架从tf.contrib.data迁移到tf.data，成为TensorFlow的核心组成部件。

### 7.4.1 数据集的基本使用方法
**在数据集框架中，每一个数据集代表一个数据来源：数据可能来自一个张量，一个文本文件，一个TFRecord文件，或者经过sharding的一系列文件，等等。**

由于训练数据通常无法全部写入内存中，从数据集中读取数据时需要使用一个法代器（iterator）按顺序进行读取，这点与队列的dequeue操作和Reader的read操作相似。与队列相似，数据集也是计算图上的一个节点。

**1. 从数组创建数据集**

下面先看一个简单的例子，这个例子从一个张量创建一个数据集，遍历这个数据集，并对每个输入输出$ y = x^2 $的值。

In [1]:
import tensorflow as tf

# 从一个数组创建数据集
input_data = [1, 2, 3, 5, 8]
dataset = tf.data.Dataset.from_tensor_slices(input_data)

# 定义迭代器用于便利数据。因为上面定义的数据集没有用placeholder
# 作为输入参数，所以这里可以使用最简单的one_shot_iterator。
iterator = dataset.make_one_shot_iterator()

# get_next() 返回代表一个输入数据的张量。
x = iterator.get_next()
y = x * x

with tf.Session() as sess:
    for i in range(len(input_data)):
        print(sess.run(y))

1
4
9
25
64


可以看到，利用数据集读取数据有三个基本步骤：
1. **定义数据集的构造方法**，这个例子使用`tf.data.Dataset.from_tensor_slices()`，表明数据集是从一个张量中构建的，如果数据集是文件中构建的，则需要相应调整不同的构造方法。
2. **定义遍历器**，这个例子使用了简单的`one_shot_iterator`来遍历数据集，稍后将介绍更加灵活的`initializable_iterator`。
3. **使用`get_next`方法从遍历器中读取数据张量，作为计算图其他部分的输入。**

**2. 读取文本文件里的数据**

在真实项目中，训练数据通常是保存在硬盘文件上的。比如在自然语言处理的任务中，训练数据通常是以每行一条数据的形式存在文本文件中，这时可以用`TextLineDataset`来更方便地读取数据：

In [2]:
# 创建文本文件作为本例的输入。
with open("./test1.txt", "w") as file:
    file.write("File1, line1.\n") 
    file.write("File1, line2.\n")
with open("./test2.txt", "w") as file:
    file.write("File2, line1.\n") 
    file.write("File2, line2.\n")

# 从文本文件创建数据集。假定每行文字是一个训练例子。这里可以提供多个文件。
input_files = ["./test1.txt", "./test2.txt"]
dataset = tf.data.TextLineDataset(input_files)

# 定义迭代器。
iterator = dataset.make_one_shot_iterator()

# 这里get_next()返回一个字符串类型的张量，代表文件中的一行。
x = iterator.get_next()  
with tf.Session() as sess:
    for i in range(4):
        print(sess.run(x))

b'File1, line1.'
b'File1, line2.'
b'File2, line1.'
b'File2, line2.'


**3. 解析TFRecord文件里的数据**

在图像相关任务中，输入数据通常以TFRecord形式存储，这时可以用`TFRecordDataset`来读取数据。与文本文件不同，每一个TFRecord都有自己不同的feature格式，因此在读取TFRecord时，需要提供一个`parser`函数来解析所读取的TFRecord的数据格式。这里读取文件为本章第一节创建的文件，如下：

In [3]:
# 解析一个TFRecord的方法。
def parser(record):
    # 解析读取的一个样例
    features = tf.parse_single_example(
        record,
        features={
            'image_raw':tf.FixedLenFeature([],tf.string),
            'pixels':tf.FixedLenFeature([],tf.int64),
            'label':tf.FixedLenFeature([],tf.int64)
        })
    decoded_images = tf.decode_raw(features['image_raw'],tf.uint8)
    retyped_images = tf.cast(decoded_images, tf.float32)
    images = tf.reshape(retyped_images, [784])
    labels = tf.cast(features['label'],tf.int32)
    #pixels = tf.cast(features['pixels'],tf.int32)
    return images, labels

# 从TFRecord文件创建数据集。这里可以提供多个文件。
input_files = ["output.tfrecords"]
dataset = tf.data.TFRecordDataset(input_files)

# map函数表示对数据集中的每一条数据进行调用解析方法。使用TFRecordDataset读出的
# 是二进制的数据，这里需要通过map来调用parser对二进制数据进行解析。类似地，
# map函数也可以用来完成其他的数据预处理工作。
dataset = dataset.map(parser)

# 定义遍历数据集的迭代器。
iterator = dataset.make_one_shot_iterator()

# 读取数据，可用于进一步计算
image, label = iterator.get_next()

with tf.Session() as sess:
    for i in range(10):
        x, y = sess.run([image, label]) 
        print(y)

7
3
4
6
1
8
1
0
9
8


**4. 使用initializable_iterator来动态初始化数据集**

以上例子使用了最简单的`one_ shot_iterator`来遍历数据集。在使用`one shot_iterator`时，数据集的所有参数必须已经确定，因此`one_shot_iterator`不需要特别的初始化过程。

如果需要用placeholder来初始化数据集，那就需要用到`initializable_iterator`。以下代码给出了用`initializable_iterator`来动态初始化数据集的例子:

In [4]:
# 从TFRecord文件创建数据集，具体文件路径是一个placeholder，稍后再提供具体路径。
input_files = tf.placeholder(tf.string)
dataset = tf.data.TFRecordDataset(input_files)
dataset = dataset.map(parser)

# 定义遍历dataset的initializable_iterator。
iterator = dataset.make_initializable_iterator()
image, label = iterator.get_next()

with tf.Session() as sess:
    # 首先初始化iterator，并给出input_files的值。
    sess.run(iterator.initializer,
             feed_dict={input_files: ["output.tfrecords"]})
    # 遍历所有数据一个epoch。当遍历结束时，程序会抛出OutOfRangeError。
    while True:
        try:
            x, y = sess.run([image, label])
        except tf.errors.OutOfRangeError:
            break 

**在上面的例子中，文件路径使用placeholder和feed_dict的方式传给数据集。使用这种方法，在实际项目中就不需要总是将参数写入计算图的定义，而可以使用程序参数的方式动态指定参数。**

另外注意到，上面例子中的循环体不是指定循环运行10次sess.run，而是使用*while(True) - try - except*的形式来将所有数据遍历一遍（即一个epoch）。这是因为在动态指定输入数据时，不同数据来源的数据量大小难以预知，而这个方法使我们不必提前知道数据量的精确大小。

以上介绍的两种iterator足以满足大多数项目的需求。除这两种以外，TensorFlow还提供了下面两种更加灵活的迭代器，这里不再具体介绍:
- `reinitializable_iterator`可以多次initialize用于遍历不同的数据来源；
- `feedable_iterator`可以用feed_dict的方式动态指定运行哪个iterator。

### 7.4.2 数据集的高层操作
在上一小节中介绍了数据集的基础用法。在这一小节中，将介绍数据集框架提供的一些方便实用的高层API。

**1. dataset.map**

在7.4.1小节中介绍过map方法来对TFRecord进行解析操作：`dataset = dataset.map(parser)`。map是在数据集上进行操作的最常用的方法之一。在这里，map(parser）方法表示对数据集中的每一条数据调用参数中指定的parser方法。**对每一条数据进行处理后，map将处理后的数据包装成一个新的数据集返回。map 函数非常灵活，可以用于对数据的任何预处理操作**。例如在前面小节中曾使用如下方法来对数据进行预处理：

`distorted_image = preprocess_for_train(decoded_image, image_size, image_size, None)`

而在数据集框架中，可以通过map来对每条数据调用`preprocess_for_train`方法：

`dataset = dataset.map(lambda x:preprocess_for_train(x, image_size, image_size, None))`

在上面的代码中，**lambda表达式的作用是将原来有4个参数的函数转化为只有1个参数的函数。**`preprocess_for_train`函数的第一个参数decoded_image变成了lambda表达式中的x，这个参数就是原来函数中的参数decoded_image.preprocess_for_train函数中后3个参数都被换成了具体的数值。注意这里的image_size是一个变量，有具体取值，该值需要在程序的上文中给出。

**从表面上看，新的代码在长度上似乎并没有缩短，然而由于map方法返回的是一个新的数据集，可以直接继续调用其他高层操作。在上一节介绍的队列框架中，预处理、shuffle、batch等操作有的在队列上进行，有的在图片张量上进行，整个处理流程在处理队列和张量的代码片段中来回切换。而在数据集操作中，所有操作都在数据集上进行，这样的代码结构将非常的干净、简洁。**

**2. dataset.batch & dataset.shuffle**

类似于队列框架中的`tf.train.batch`和`tf.train.shuffle_batch`，这两个操作在数据集框架中的实现如下：

- `dataset = dataset.batch(batch_size)`，**将数据组合成batch，参数batch_size代表要输出的每个batch由多少条数据组成。如果数据集中包含多个张量，那么batch操作将对每一个张量分开进行**。举例而言，如果数据集中的每一个数据（即iterator.get_next()的返回值）是image、label两个张量，其中image的维度是[300, 300]，label的维度是[]，batch_size是128，那么经过batch 操作后的数据集的每一个输出将包含两个维度分别是[128, 300, 300]和[128]的张量。

- `dataset = dataset.shuffle(buffer_size)`，随机打乱顺序，参数buffer_size等效于`tf.train.shuffle_batch`中的参数min_after_dequeue。shuffle算法在内部使用一个缓冲区中保存buffer_size条数据，每读入一条新数据时，从这个缓冲区中随机选择一条数据进行输出。缓冲区的大小越大，随机的性能越好，但占用的内存也越多。

**3. dataset.repeat**

repeat是另一个常用的操作方法，它将数据集中的数据复制多份，其中每一份被称为一个epoch。

`dataset = dataset.repeat(N)`

上面代码将数据重复N份。需要指出的是，**如果数据集在repeat前己经进行了shuffle操作，输出的每个epoch中随机shuffle的结果并不会相同。**例如，如果输入数据是[1, 2, 3]，shuffle后输出的第一个epoch是[2, 1, 3]，而第二个epoch则有可能是[3, 2, 1]。repeat和map、shuffle、batch等操作一样，都只是计算图中的一个计算节点。repeat只代表重复相同的处理过程，并不会记录前一个epoch的处理结果。

**4. 其他数据集操作**

除这些方法以外，数据集还提供了其他多种操作。例如:
- concatenate()将两个数据集顺序连接起来;
- take(N)从数据集中读取前N项数据;
- skip(N)在数据集中跳过前N项数据;
- flap_map()从多个数据集中轮流读取数据;

等等，这里不再一一介绍，有需要的读者可以查询TensorFlow相关文档。

以下例子将这些方法组合起来，使用数据集实现了7.3.4小节中的数据输入流程。与7.3.4小节中介绍的类似，该例子从文件中读取原始数据，进行预处理、shuffle、batching等操作，并通过repeat方法训练多个epoch。不同的是，以下例子在训练数据集之外，还另外读取了测试数据集，并对测试集和数据集进行了略微不同的预处理。在训练时，调用7.2.2小节中的`preprocess_for_train`方法对图像进行随机反转等预处理操作；而在测试时，测试数据以原本的样子直接输入测试。

In [1]:
import tensorflow as tf

# 1. 列举输入文件。训练和测试使用不同的数据
train_files = tf.train.match_filenames_once("output.tfrecords")
test_files = tf.train.match_filenames_once("output_test.tfrecords")


# 2. 解析一个TFRecord的方法。
def parser(record):
    features = tf.parse_single_example(
        record,
        features={
            'image_raw':tf.FixedLenFeature([],tf.string),
            'pixels':tf.FixedLenFeature([],tf.int64),
            'label':tf.FixedLenFeature([],tf.int64)
        })
    decoded_images = tf.decode_raw(features['image_raw'],tf.uint8)
    retyped_images = tf.cast(decoded_images, tf.float32)
    images = tf.reshape(retyped_images, [784])
    labels = tf.cast(features['label'],tf.int32)
    #pixels = tf.cast(features['pixels'],tf.int32)
    return images, labels


# 3. 定义训练数据集
image_size = 299          # 定义神经网络输入层图片的大小。
batch_size = 100          # 定义组合数据batch的大小。
shuffle_buffer = 10000   # 定义随机打乱数据时buffer的大小。

# 定义读取训练数据的数据集。
dataset = tf.data.TFRecordDataset(train_files)
dataset = dataset.map(parser)

# 对数据集进行预处理，这里略去
# dataset = dataset.map(lambda image, label:(
#                          preprocess_for_train(image, image_size, image_size, None), label)

# 对数据进行shuffle和batching操作。这里省略了对图像做随机调整的预处理步骤。
dataset = dataset.shuffle(shuffle_buffer).batch(batch_size)

# 重复NUM_EPOCHS个epoch。间接指定训练的轮数
NUM_EPOCHS = 10
dataset = dataset.repeat(NUM_EPOCHS)

# 定义数据集迭代器。虽然定义数据集时没有直接使用placeholder来提供文件地址，但是
# tf.train.match_filenames_once方法得到的结果和与placeholder的机制类似，也需要
# 初始化，所以这里使用的是initializable_iterator。
iterator = dataset.make_initializable_iterator()
image_batch, label_batch = iterator.get_next()


# 4. 定义神经网络的结构个优化过程。这里与7.3.4小节相同。
def inference(input_tensor, weights1, biases1, weights2, biases2):
    layer1 = tf.nn.relu(tf.matmul(input_tensor, weights1) + biases1)
    return tf.matmul(layer1, weights2) + biases2

INPUT_NODE = 784
OUTPUT_NODE = 10
LAYER1_NODE = 500
REGULARAZTION_RATE = 0.0001   
TRAINING_STEPS = 5000        

weights1 = tf.Variable(tf.truncated_normal([INPUT_NODE, LAYER1_NODE], stddev=0.1))
biases1 = tf.Variable(tf.constant(0.1, shape=[LAYER1_NODE]))

weights2 = tf.Variable(tf.truncated_normal([LAYER1_NODE, OUTPUT_NODE], stddev=0.1))
biases2 = tf.Variable(tf.constant(0.1, shape=[OUTPUT_NODE]))

y = inference(image_batch, weights1, biases1, weights2, biases2)
    
# 计算交叉熵及其平均值
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=label_batch)
cross_entropy_mean = tf.reduce_mean(cross_entropy)
    
# 损失函数的计算
regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
regularaztion = regularizer(weights1) + regularizer(weights2)
loss = cross_entropy_mean + regularaztion

# 优化损失函数
train_step = tf.train.GradientDescentOptimizer(0.01).minimize(loss)


# 5. 定义测试用的数据集及相关
# 与训练时不同，测试数据的Dataset不需要经过随机翻转等预处理操作，
# 也不需要打乱顺序和重复多个epoch，这里使用与训练数据相同的parser
# 进行解析，调整分辨率到网络输入层大小，然后直接进行batching操作。
# 定义测试用的Dataset。
test_dataset = tf.data.TFRecordDataset(test_files)
test_dataset = test_dataset.map(parser)
test_dataset = test_dataset.batch(batch_size)

# 定义测试数据上的迭代器。
test_iterator = test_dataset.make_initializable_iterator()
test_image_batch, test_label_batch = test_iterator.get_next()

# 定义测试数据上的预测结果为logits值最大的分类。
test_logit = inference(test_image_batch, weights1, biases1, weights2, biases2)
predictions = tf.argmax(test_logit, axis=-1, output_type=tf.int32)


# 6. 声明会话并运行神经网络的优化过程。
with tf.Session() as sess:  
    # 初始化变量。
    sess.run((tf.global_variables_initializer(),
              tf.local_variables_initializer()))
    
    # 初始化训练数据的迭代器。
    sess.run(iterator.initializer)
    
    # 循环进行训练，直到数据集完成输入、抛出OutOfRangeError错误。
    while True:
        try:
            sess.run(train_step)
        except tf.errors.OutOfRangeError:
            break

    # 初始化测试数据的迭代器。
    sess.run(test_iterator.initializer)
    # 获取预测结果。
    test_results = []
    test_labels = []
    while True:
        try:
            pred, label = sess.run([predictions, test_label_batch])
            test_results.extend(pred)
            test_labels.extend(label)
        except tf.errors.OutOfRangeError:
            break

# 计算准确率
correct = [float(y == y_) for (y, y_) in zip (test_results, test_labels)]
accuracy = sum(correct) / len(correct)
print("Test accuracy is:", accuracy) 

Test accuracy is: 0.8962
