加载和预处理数据是机器学习/深度学习项目的重要组成部分。使用Pandas加载和探索数据集（存储在CSV文件中），并应用Scikit-Learn的转换程序进行预处理。这些工具非常方便，可能会经常使用它们，尤其是在使用数据进行探索和实验时。

但是，在大型数据集上训练TensorFlow模型时，更适合使用TensorFlow自己的数据加载和预处理API，它称为tf.data。它能够非常高效地加载和预处理数据，使用多线程和队列并行读取多个文件，混淆和批处理样本，等等。

此外，它可以即时完成所有这些操作——它跨多个CPU内核加载和预处理下一批数据，而GPU或TPU则忙于训练当前批次的数据。tf.data API可以让你处理内存中放不下的数据集，并让你充分利用硬件资源，从而加快训练速度。

现成的tf.data API可以从文本文件（例如CSV文件）、具有固定大小记录的二进制文件以及使用TensorFlow TFRecord格式（支持不同大小记录）的二进制文件中读取数据。TFRecord是一种灵活高效的二进制格式，通常包含协议缓冲区（一种开源二进制格式）。tf.data API还支持从SQL数据库读取数据。

Keras还带有强大但易于使用的预处理层，可以将其嵌入模型中：这样，当你将模型部署到生产环境时，它将能够直接摄取原始数据，而无须添加任何额外的预处理代码。这消除了训练期间使用的预处理代码与生产环境中使用的预处理代码不匹配的风险。如果在使用不同编程语言编码的多个应用程序中部署模型，则不必多次重新实现相同的预处理代码，这也降低了不匹配的风险。这两个API可以联合使用——例如同时受益于tf.data数据加载的高效性和Keras预处理层的便利性。

首先介绍tf.data API和TFRecord格式。然后，探索Keras预处理层以及如何将它们与tf.data API结合使用。最后，将快速浏览一些对加载和预处理数据有用的相关库，例如TensorFlow Datasets和TensorFlow Hub。

## tf.data API

In [1]:
import tensorflow as tf

In [2]:
# tf.data API围绕 tf.data.Dataset
X = tf.range(10)
dataset = tf.data.Dataset.from_tensor_slices(X) # from_tensor_slices: 从数据张量创建一个数据集，
dataset

<_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

from_tensor_slices()函数接受一个张量并创建一个tf.data.Dataset,其元素是X沿第一个维度的所有切片

tf.data.API是一种流式API：可以非常高效地遍历数据集的元素，但该API不是为索引或切片而设计的

In [3]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


In [27]:
d = tf.data.Dataset.range(10)
for item in d:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


数据集还可能包含张量元组或名称/张量对字典，甚至是嵌套的元组和张量字典。当对元组、字典或嵌套结构进行切片时，数据集将仅对其包含的张量进行切片，同时保留元组/字典结构。

In [28]:
X_nested = {"a": ([1,2,3], [4,5,6],[2,3,0]), "b": [7,8,9]}
dataset = tf.data.Dataset.from_tensor_slices(X_nested)
for item in dataset:
    print(item)

{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=int32, numpy=4>, <tf.Tensor: shape=(), dtype=int32, numpy=2>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=7>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=2>, <tf.Tensor: shape=(), dtype=int32, numpy=5>, <tf.Tensor: shape=(), dtype=int32, numpy=3>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=8>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=3>, <tf.Tensor: shape=(), dtype=int32, numpy=6>, <tf.Tensor: shape=(), dtype=int32, numpy=0>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=9>}


In [5]:
dataset = tf.data.Dataset.from_tensor_slices({
    "feature": tf.constant([1, 2, 3]),
    "label": tf.constant([10, 20, 30])
})

for element in dataset:
    print(element)

{'feature': <tf.Tensor: shape=(), dtype=int32, numpy=1>, 'label': <tf.Tensor: shape=(), dtype=int32, numpy=10>}
{'feature': <tf.Tensor: shape=(), dtype=int32, numpy=2>, 'label': <tf.Tensor: shape=(), dtype=int32, numpy=20>}
{'feature': <tf.Tensor: shape=(), dtype=int32, numpy=3>, 'label': <tf.Tensor: shape=(), dtype=int32, numpy=30>}


### 链式转换

有了数据集后，就可以通过调用其转换方法对其进行各种转换。每个方法都返回一个新的数据集，因此可以进行链式转换

数据集方法不会修改数据集，而是创建新数据集，因此请保留对这些新数据集的引用（例如使用dataset=...），否则将不会发生任何事情。

In [30]:
# 链式转换
dataset = tf.data.Dataset.from_tensor_slices(tf.range(10))
dataset = dataset.repeat(3).batch(7)

for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


![数据链式转换](./images/tensorflow/p6.png)

在此示例中，首先在原始数据集上调用repeat()方法，它返回一个新数据集，该数据集将重复原始数据集的元素3次。当然，这不会将内存中的所有数据复制三遍

如果不带任何参数调用此方法，则新数据集将永远重复源数据集，因此遍历该数据集的代码必须决定何时停止。

然后，在此新数据集上调用batch()方法，并再次创建一个新的数据集。这把先前数据集的元素以7个元素为一个批次分组。

最后，遍历此最终数据集的元素。batch()方法最后输出一个大小为2而不是7的最终批次，但是如果希望它删除最终批次，可以使用drop_remainder=True调用它，使所有批次具有完全相同的大小。

In [31]:
# 使用map转换元素
dataset = dataset.map(lambda x: x * 2)  # x是一个批次
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


此map()方法是将调用的对数据应用预处理的方法。有时，这可能包括非常密集的计算，例如改变图像形状或旋转图像，

因此通常需要生成多个线程来加快速度。这可以通过将num_parallel_calls参数设置为要运行的线程数或tf.data.AUTOTUNE来完成（（根据可用的CPU动态选择正确的线程数）。注意，传递给map()方法的函数必须可以转换为TF函数

In [32]:
# 过滤数据集
dataset = dataset.filter(lambda x: tf.reduce_sum(x) > 50)
for item in dataset:
    print(item)

tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)


In [33]:
# 查看数据集中的几个元素
for item in dataset.take(2):
    print(item)

tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)


### 乱序数据

当训练集中的实例独立同分布(IID)时，梯度下降效果最佳。

确保这一点的一种简单方法是使用shuffle()方法对实例进行乱序处理。它会创建一个新的数据集，该数据集首先将源数据集的第一项元素填充到缓冲区中。然后，无论何时要求提供一个元素，它都会从缓冲区中随机取出一个元素，并用源数据集中的新元素替换它，直到完全遍历完源数据集为止。它将继续从缓冲区中随机抽取元素直到其为空。必须指定缓冲区的大小，重要的是要使其足够大，否则乱序处理不会非常有效。

不要超出所拥有的RAM的数量，即使有足够的RAM，也不要超出数据集的大小。如果每次运行程序都需要相同的随机顺序，那么可以提供随机种子

In [34]:
# 乱序数据
dataset = tf.data.Dataset.range(10).repeat(2)
dataset = dataset.shuffle(buffer_size=4, seed=42).batch(7)
for item in dataset:
    print(item)

tf.Tensor([1 4 2 3 5 0 6], shape=(7,), dtype=int64)
tf.Tensor([9 8 2 0 3 1 4], shape=(7,), dtype=int64)
tf.Tensor([5 7 9 6 7 8], shape=(6,), dtype=int64)


如果在经过乱序处理的数据集上调用repeat()，则默认情况下它在每次迭代时生成一个新次序数据集。，

但如果希望在每次迭代中重用相同的顺序（例如用于测试或调试），则可以在调用shuffle()时设置reshuffle_each_iteration=False。

In [42]:
dataset_test = tf.data.Dataset.range(10).shuffle(buffer_size=4, seed=42).repeat(2).batch(10)
for item in dataset_test:
    print(item)

tf.Tensor([1 4 2 3 5 0 6 8 7 9], shape=(10,), dtype=int64)
tf.Tensor([3 4 2 0 1 8 5 7 9 6], shape=(10,), dtype=int64)


对于内存放不下的大型数据集，这种简单的缓冲区乱序方法可能不够用，因为与数据集相比，缓冲区很小。一种解决方法是对源数据本身进行乱序处理（例如在Linux上，可以使用shuf命令打乱文本文件）。这会大大改善乱序效果。

即使源数据已经乱序了，通常也希望对其进行更彻底的乱序处理，否则在每个轮次都有重复的相同顺序，该模型最终可能会产生偏差（例如，由于源数据顺序中偶然出现了一些虚假模式）。为了进一步打乱实例，一种常见的方法是将源数据拆分为多个文件，然后在训练过程中以随机顺序读取它们。

但是，位于同一文件中的实例仍然相互接近。为避免这种情况，可以随机选择多个文件并同时读取它们，并且将它们的记录穿插在一起。还可以使用shuffle()方法添加一个乱序缓冲区。 tf.data API只需几行代码

In [35]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

In [36]:
# 把训练数据集分成20份，保存成csv文件
# 对于内存中无法容纳的非常大的数据集，通常需要先将其拆分为多个文件，然后让 TensorFlow 并行读取这些文件。
import numpy as np
from pathlib import Path

def save_to_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = Path() / "datasets" / "housing"
    housing_dir.mkdir(parents=True, exist_ok=True)
    filename_format = "my_{}_{:02d}.csv"

    filepaths = []
    m = len(data)
    chunks = np.array_split(np.arange(m), n_parts)
    for file_idx, row_indices in enumerate(chunks):
        part_csv = housing_dir / filename_format.format(name_prefix, file_idx)
        filepaths.append(str(part_csv))
        with open(part_csv, "w") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([str(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths

train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_csv_files(test_data, "test", header, n_parts=10)

In [19]:
train_filepaths

['datasets\\housing\\my_train_00.csv',
 'datasets\\housing\\my_train_01.csv',
 'datasets\\housing\\my_train_02.csv',
 'datasets\\housing\\my_train_03.csv',
 'datasets\\housing\\my_train_04.csv',
 'datasets\\housing\\my_train_05.csv',
 'datasets\\housing\\my_train_06.csv',
 'datasets\\housing\\my_train_07.csv',
 'datasets\\housing\\my_train_08.csv',
 'datasets\\housing\\my_train_09.csv',
 'datasets\\housing\\my_train_10.csv',
 'datasets\\housing\\my_train_11.csv',
 'datasets\\housing\\my_train_12.csv',
 'datasets\\housing\\my_train_13.csv',
 'datasets\\housing\\my_train_14.csv',
 'datasets\\housing\\my_train_15.csv',
 'datasets\\housing\\my_train_16.csv',
 'datasets\\housing\\my_train_17.csv',
 'datasets\\housing\\my_train_18.csv',
 'datasets\\housing\\my_train_19.csv']

In [20]:
print("".join(open(train_filepaths[0]).readlines()[:4]))  # 展示文件前几行

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442
5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687
3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621



In [21]:
# 创建一个仅包含以下文件路径的数据集
# list_files()函数返回一个乱序的文件路径的数据集，不希望打乱可以设置shuffle=False
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

In [22]:
# filepath_dataset被打成乱序
for filepath in filepath_dataset:
    print(filepath)

tf.Tensor(b'datasets\\housing\\my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_17.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_14.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_13.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_15.csv', sh

In [24]:
# 调用interleave()方法一次读取5个文件并使它们的行交织，使用skip()方法跳过每个文件的第一行
n_readers = 5
dataset = filepath_dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                                      cycle_length=n_readers)

interleave()方法将创建一个数据集，该数据集将从filepath_dataset中拉出5个文件路径，对于每个路径，它将调用提供的函数（在此示例中为lambda）来创建新的数据集（TextLineDataset）。

在此阶段总共有7个数据集：文件路径数据集、交织数据集和由交织数据集在内部创建的5个TextLineDataset。当遍历交织数据集时，它将循环遍历这5个TextLineDataset，每次读取一行，直到所有数据集都读出为止。

然后，它将从filepath_dataset获取下五个文件路径，并以相同的方式交织它们，以此类推，直到读完文件路径。为了使交织工作更好地进行，最好使文件具有相同的大小，否则最大的文件将无法参与交织。

默认情况下，interleave()不使用并行机制，它只是依次从每个文件中一次读取一行。如果想让它真正地并行读取文件，则可以将interleave()方法的num_parallel_calls参数设置为想要的线程数（map()方法也有这个参数）。也可以将其设置为tf.data.AUTOTUNE，以使TensorFlow根据可用的CPU动态选择正确的线程数

In [25]:
# 打印随机选择的5个csv文件的第一行
for line in dataset.take(5):
    print(line)

tf.Tensor(b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418', shape=(), dtype=string)
tf.Tensor(b'2.4792,24.0,3.4547038327526134,1.1341463414634145,2251.0,3.921602787456446,34.18,-118.38,2.0', shape=(), dtype=string)
tf.Tensor(b'4.2708,45.0,5.121387283236994,0.953757225433526,492.0,2.8439306358381504,37.48,-122.19,2.67', shape=(), dtype=string)
tf.Tensor(b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205', shape=(), dtype=string)
tf.Tensor(b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215', shape=(), dtype=string)
