In [1]:
import os
import pathlib

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras

os.environ['CUDA_VISIBLE_DEVICES'] = '0'
np.random.seed(42)
tf.random.set_seed(42)

In [2]:
print("TensorFlow version:", tf.__version__)
print("Keras version:", keras.__version__)

TensorFlow version: 2.3.0
Keras version: 2.4.0


<div class="alert alert-block alert-info">
    <b>The whole Data API revolves around the concept of a <em><font color='red' size=2>dataset</font></em>, which represents <i>a sequence of data items.</i></b>
</div>

整个 Data API 的核心概念是 dataset，表示 a sequence of data items。

通常，通过从 disk 逐步读取数据 (gradually read data from disk) 来使用数据。

for simplicity，使用 `tf.data.Dataset.from_tensor_slices()` 创建一个完全在 RAM 中的数据集。

In [3]:
X = tf.constant([1., 3., 5.])
# Creates a Dataset whose elements are slices of the given tensors.
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<TensorSliceDataset shapes: (), types: tf.float32>

In [4]:
X = tf.range(10)  # any data tensor
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<TensorSliceDataset shapes: (), types: tf.int32>

**`tf.data.Dataset.from_tensor_slices()`** 接收一个 tensor 作为参数，然后创建一个 `tf.data.Dataset`——其元素全是 `X` 的 slices（沿着第一个 axis）。因此，这个 `dataset` 含有 10 个元素：tensor 0, tensor 1, tensor 2, ..., tensor 9。

In [5]:
# 可以获取相同的 dataset
dataset = tf.data.Dataset.range(10)
dataset

<RangeDataset shapes: (), types: tf.int64>

In [6]:
# Iterate over dataset's items
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


## 1. Chain transformations

有了 dataset 之后，就可以通过调用 dataset 的 method 来完成各种类型的 transformations。

**<font color='crimson'>dataset 的每一个 method 都会返回一个新的 dataset。</font>**

In [7]:
dataset = tf.data.Dataset.range(10)
dataset = dataset.repeat(2)
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


In [8]:
# Chain transformations
dataset = tf.data.Dataset.range(10)
dataset = dataset.repeat(3).batch(7)

for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int64)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int64)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int64)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int64)
tf.Tensor([8 9], shape=(2,), dtype=int64)


<br>
    
![chaining dataset transformations](chap13_figs/chaining_dataset_transformations.png)
<br>
    
<center><i>Chain dataset transformations</i></center>

- 首先，在原始 `dataset` 上调用 **`repeat()`** 方法，这返回一个新的 dataset——复制原始 dataset 的元素 (items) 3 次。当然，这**不是在内存中复制 3 次 dataset**。如果**调用 `repeat()` 时没有指定参数**，那么返回的新 dataset 会无限复制原始 dataset，此时需要在代码中决定合适停止 iteration。


- 然后，在 `repeat(3)` 返回的新 dataset 上调用 **`batch()`** 方法，这个方法也是返回一个新的 dataset。`batch(7)` 将前一个 dataset 的元素分组，7 个 items 为一个 batch。


- 最后，在最终的 dataset 上进行 iteration。

<br>

`batch(7)` 输出的最后一个 batch 只有 2 个 items，而不是 7 个。可以使用 `drop_remainder=True` 来 drop 最后一个 batch，使得输出的所有 batch 大小相同。

In [9]:
# Chain transformations
dataset = tf.data.Dataset.range(10)
# 当最后一个 batch 的 size 小于 batch_size 的时候，drop 最后一个 batch
dataset = dataset.repeat(3).batch(batch_size=7, drop_remainder=True)

for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int64)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int64)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int64)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int64)


<div class="alert alert-block alert-info">
    <b><font color='crimson'>dataset 的 method 不会 modify dataset，只会 return 一个新的 dataset，所以确保使用 <code>dataset = ...</code>，否则没有任何结果。</font></b>
</div>

In [10]:
dataset = tf.data.Dataset.range(10)

# Nothing will happen without a reference the new dataset
dataset.repeat(3)
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


可以调用 **`map()`** 方法来 transform dataset 中的 items。

这个 method 可用来对 dataset 应用任何预处理手段。有时，应用的时包含的计算非常 intensive，比如 reshape 或者 rotate 一张图片，所以需要使用多线程 (multiple threads) 来加速，指定 **`num_parallel_calls`** 参数。

In [11]:
# Chain transformations
dataset = tf.data.Dataset.range(10)
dataset = dataset.repeat(3).batch(batch_size=7)

# 对 dataset 中的每一个元素应用 lambda 函数
# 返回一个转换之后的元素组成的新 dataset
dataset = dataset.map(lambda x: x * 2)
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int64)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int64)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int64)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int64)
tf.Tensor([16 18], shape=(2,), dtype=int64)


In [12]:
# Chain transformations
dataset = tf.data.Dataset.range(10)
dataset = dataset.repeat(3).batch(batch_size=7)

dataset = dataset.map(
    # A func mapping a dataset element to another dataset element
    map_func=lambda x: x * 2,
    # The number of parallel calls is set dynamically based on avaiable CPU
    num_parallel_calls=tf.data.experimental.AUTOTUNE)
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int64)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int64)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int64)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int64)
tf.Tensor([16 18], shape=(2,), dtype=int64)


In [13]:
# 将 dataset 中所有 item 转换为 ndarray
list(dataset.as_numpy_iterator())

[array([ 0,  2,  4,  6,  8, 10, 12]),
 array([14, 16, 18,  0,  2,  4,  6]),
 array([ 8, 10, 12, 14, 16, 18,  0]),
 array([ 2,  4,  6,  8, 10, 12, 14]),
 array([16, 18])]

**`map()` 方法是对 dataset 中每一个 item 做转换，而 `apply()` 方法是对整个 dataset 做转换。**

In [14]:
# 对应 dataset 应用 unbatch 方法：
# 将 dataset 中每个 item 转换为单整数 tensor，而不是 7 个整数组成的 batch
dataset = dataset.unbatch()
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(10, shape=(), dtype=int64)
tf.Tensor(12, shape=(), dtype=int64)
tf.Tensor(14, shape=(), dtype=int64)
tf.Tensor(16, shape=(), dtype=int64)
tf.Tensor(18, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(10, shape=(), dtype=int64)
tf.Tensor(12, shape=(), dtype=int64)
tf.Tensor(14, shape=(), dtype=int64)
tf.Tensor(16, shape=(), dtype=int64)
tf.Tensor(18, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(10, shape=(), dtype=int64)
tf.Tensor(12, shape=(), dtype=int64)
tf.Tensor(14, sh

使用 **`filter()`** 方法来筛选 dataset：

In [15]:
# Filter the dataset according to `predicate`
dataset = dataset.filter(
    # A function mapping a dataset element to a boolean.
    predicate=lambda x: x < 10
)
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)


In [16]:
list(dataset.as_numpy_iterator())

[0, 2, 4, 6, 8, 0, 2, 4, 6, 8, 0, 2, 4, 6, 8]

使用 **`take()`** 方法来查看 dataset 中的 items：

In [17]:
# Generate a dataset with at most `count` elements from this dataset
for item in dataset.take(count=3):
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)


In [18]:
list(dataset.take(3).as_numpy_iterator())

[0, 2, 4]

## 2. Shuffle the data

<font color='crimson'>当训练集中的样本是独立同分布的 (independent and identically distributed, IID) 的时候，梯度下降的效果最好 (gradient descent works best) 。</font>

一个简单的方法是使用 `shuffle()` 方法来打乱 (shuffle) 样本。

**`shuffle()`** 返回一个新的 dataset：

- 首先，使用 source dataset 的前部分数据 (the first items of the source dataset) 来填充 (fill up) 一个 buffer。


- 然后，无论何时 ask for 一个 item 的时候，都会从 buffer 中随机选择一个 pull out，然后用 source dataset 中的下一个 item 来填充 buffer，直到 source dataset 里的数据用完了，这时会继续从 buffer 中随机 pull out 一个 item，直到 buffer 为空。

**<font color='crimson'>必须指定 buffer 的大小 (size)，而且 buffer 应该足够大，否则 shuffle 没啥效果。不要比 RAM 大，而且即使 RAM 充足，也没必要超过 dataset 的大小。</font>**

**<font color='crimson'>[TF docs](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#shuffle): For perfect shuffling, a buffer size greater than or equal to the full size of the dataset is required.</font>**

**<font color='blue'>例子：</font>** 数据集有 10000 个样本，buffer 的大小为 1000， buffer 初始时由数据集中的前 1000 个样本填充，即 `shuffle()` 第一次将随机从数据集的前 1000 个元素中随机选择一个。一旦一个元素被选中，它在 buffer 中位置由数据集中的下一个元素（即，第 1001 个元素）来替代，buffer 的大小维持在 1000，直至数据集用完了，这时继续从 buffer 中随机选择一个元素，直至 buffer 为空。

<font color='blue'>每次运行程序时想要相同的随机顺序，应该指定 random seed。</font>

In [19]:
tf.random.set_seed(42)

In [20]:
dataset = tf.data.Dataset.range(10).repeat(3)  # 0 到 9，重复 3 次
print(list(dataset.as_numpy_iterator()))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [21]:
dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 1 6 5 7 3 9], shape=(7,), dtype=int64)
tf.Tensor([8 2 1 0 4 6 4], shape=(7,), dtype=int64)
tf.Tensor([7 2 5 9 2 1 3], shape=(7,), dtype=int64)
tf.Tensor([4 3 8 7 9 5 0], shape=(7,), dtype=int64)
tf.Tensor([8 6], shape=(2,), dtype=int64)


**<font color='blue'>如果在一个 shuffle 后的 dataset 上调用 `repeat()` 方法，默认在每次 iteration 的时候都会产生一个新顺序 (new order)。</font>** 这通常来说都是不错的 idea，但是有时（如测试或 debugging）更希望在每次 iteration 后顺序保持不变。

<br>

对一个大 dataset（不能放进内存中）来说，这种简单的 shuffling-buffer 方法不够充分 (sufficient) ，因为相对于 dataset 来说，buffer 比较小。

- **<font color='crimson'>一种解决方法是 shuffle 原始数据</font>**，如在 Linux 中使用 `shuf` 命令来 shuffle 文本文件。这种方法当然 (definitely) 可以大大提升 shuffling。即使原始数据已经 shuffle 了，通常你**还想进一步 shuffle，否则在每个 epoch 使用的数据顺序都相同，导致模型有偏差 (biased)**，如由于因为原始数据顺序的偶然性导致的虚假模式 (spurious patterns)。


- **<font color='crimson'>为了进一步 shuffle 数据，一种通用的方法是将原始数据分成多个文件</font>**，然后在训练的时候随机读取。然而，同一个文件中的样本位置相近。为了避免这种情况，**<font color='crimson'>随机选择多个文件，同时从这些文件中读取数据，并交叉记录 (interleave records)。然后，再使用 `shuffle()` 方法</font>**。

看起来很麻烦，实际使用 Data API 来处理很简单。

## 3. Interleave lines from multiple files

### 3.1 生成数据文件

load、split 并且 scale California housing 数据，最后将 train/validation/test set 分别写进文件中。

In [22]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
x_train, x_test, y_train, y_test = train_test_split(
    housing['data'], housing['target'].reshape(-1, 1), random_state=42)
x_train, x_val, y_train, y_val = train_test_split(
    x_train, y_train, random_state=42)

scaler = StandardScaler()
scaler.fit(x_train)
x_mean = scaler.mean_
x_std = scaler.scale_

In [23]:
x_mean

array([ 3.89175860e+00,  2.86245478e+01,  5.45593655e+00,  1.09963474e+00,
        1.42428122e+03,  2.95886657e+00,  3.56464315e+01, -1.19584363e+02])

In [24]:
x_std

array([1.90927329e+00, 1.26409177e+01, 2.55038070e+00, 4.65460128e-01,
       1.09576000e+03, 2.36138048e+00, 2.13456672e+00, 2.00093304e+00])

In [25]:
y_train.shape
y_train

array([[1.442],
       [1.687],
       [1.621],
       ...,
       [0.68 ],
       [0.613],
       [1.97 ]])

对于一个不能读进内存中的大数据集，首先需要将数据集切分放到多个文件中，然后使用 TensorFlow 来并行 (in parallel) 读取这些数据。

In [26]:
def save_dataset_to_multiple_csv_files(data, name_prefix='train',
                                       header=None, n_parts=10):
    """将数据分成若干份文件。"""
    housing_dir = os.path.join('chap13_dataset', 'housing')
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, 'my_{}_{:02d}.csv')

    filepaths = []
    data_length = len(data)
    # data 被分成 n_parts 份
    data_parts_splited = np.array_split(np.arange(data_length), n_parts)
    for file_idx, row_indices in enumerate(data_parts_splited):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
    
        with open(part_csv, 'wt', encoding='utf-8') as f:
            if header is not None:
                f.write(header)
                f.write('\n')
            for row_idx in row_indices:
                f.write(','.join([repr(col) for col in data[row_idx]]))
                f.write('\n')
    return filepaths

In [27]:
train_data = np.c_[x_train, y_train]
val_data = np.c_[x_val, y_val]
test_data = np.c_[x_test, y_test]

header_cols = housing.feature_names + ['MedianHouseValue']
header = ','.join(header_cols)

In [28]:
header

'MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue'

In [29]:
header_cols

['MedInc',
 'HouseAge',
 'AveRooms',
 'AveBedrms',
 'Population',
 'AveOccup',
 'Latitude',
 'Longitude',
 'MedianHouseValue']

In [30]:
# 数据存在多个文件中
train_filepaths = save_dataset_to_multiple_csv_files(
    train_data, 'train', header, n_parts=20)
val_filepaths = save_dataset_to_multiple_csv_files(
    val_data, 'val', header, n_parts=20)
test_filepaths = save_dataset_to_multiple_csv_files(
    test_data, 'test', header, n_parts=20)

In [31]:
train_filepaths

['chap13_dataset/housing/my_train_00.csv',
 'chap13_dataset/housing/my_train_01.csv',
 'chap13_dataset/housing/my_train_02.csv',
 'chap13_dataset/housing/my_train_03.csv',
 'chap13_dataset/housing/my_train_04.csv',
 'chap13_dataset/housing/my_train_05.csv',
 'chap13_dataset/housing/my_train_06.csv',
 'chap13_dataset/housing/my_train_07.csv',
 'chap13_dataset/housing/my_train_08.csv',
 'chap13_dataset/housing/my_train_09.csv',
 'chap13_dataset/housing/my_train_10.csv',
 'chap13_dataset/housing/my_train_11.csv',
 'chap13_dataset/housing/my_train_12.csv',
 'chap13_dataset/housing/my_train_13.csv',
 'chap13_dataset/housing/my_train_14.csv',
 'chap13_dataset/housing/my_train_15.csv',
 'chap13_dataset/housing/my_train_16.csv',
 'chap13_dataset/housing/my_train_17.csv',
 'chap13_dataset/housing/my_train_18.csv',
 'chap13_dataset/housing/my_train_19.csv']

In [32]:
pd.read_csv(train_filepaths[0]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956


In [33]:
!head -n 5 chap13_dataset/housing/my_train_00.csv

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442
5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687
3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621
7.1736,12.0,6.289002557544757,0.9974424552429667,1054.0,2.6956521739130435,33.55,-117.7,2.621


In [34]:
with open(train_filepaths[0]) as f:
    for i in range(5):
        print(f.readline(), end='')

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442
5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687
3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621
7.1736,12.0,6.289002557544757,0.9974424552429667,1054.0,2.6956521739130435,33.55,-117.7,2.621


### 3.2 构建一个输入 pipeline

In [35]:
# 创建一个只包含这些文件路径的 dataset
filepath_dataset = tf.data.Dataset.from_tensor_slices(train_filepaths)
for item in filepath_dataset:
    print(item)

tf.Tensor(b'chap13_dataset/housing/my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_03.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_04.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_06.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_11.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_12.csv', shape=(), dtype=string)

In [36]:
train_filepath_pattern = 'chap13_dataset/housing/my_train_*.csv'

# A dataset of all files matching one or more glob patterns
filepath_dataset = tf.data.Dataset.list_files(
    file_pattern=train_filepath_pattern, seed=42)
for item in filepath_dataset:
    print(item)

tf.Tensor(b'chap13_dataset/housing/my_train_15.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_03.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'chap13_dataset/housing/my_train_12.csv', shape=(), dtype=string)

这个方法默认返回随机 shuffle 顺序的文件名，`shuffle=False` 或指定 `seed` 可以得到顺序确定 (deterministic order) 的结果。

`train_filepath_pattern` 参数应该是少量的 (a small number of) glob patterns。如果文件名称已经 glob 了，使用 `tf.data.Dataset.from_tensor_slices(filenames)`，因为用 `list_files` re-globbing 每个文件名可能会导致远程存储系统 (remote storage system) 的 poor performance。

<br>

接下来，使用 **`interleave()`** 方法同时从 5 个文件中读取数据，并且 interleave 它们的行。使用 **`skip()`** 方法来跳过文件的第一行。

In [37]:
n_readers = 5
interleave_dataset = filepath_dataset.interleave(
    map_func=lambda file_path: tf.data.TextLineDataset(file_path).skip(count=1),
    cycle_length=n_readers)

`interleave()` 会创建一个新的 dataset —— `interleave_dataset`。

**`interleave()`** 从 `filepath_dataset` 中选择 5 个文件路径，在每一个文件路径上调用 `map_func` 函数来创建一个新的 dataset （这里是 `TextLineDataset`）。准确地来说，在这个阶段有 7 个 dataset：文件路径 dataset（`filepath_dataset`）、interleave dataset （`interleave_dataset`）以及在 interleave dataset 内部创建的 5 个 `TextLineDataset`。

**在 iterate `interleave_dataset` 的时候，将循环遍历 (cycle through) 这 5 个 `TextLineDataset`，每次从一个 `TextLineDataset` 中读取一行，直至所有的 `TextLineDataset` 为空。然后，从 `filepath_dataset` 中选择接下来的 5 个文件路径，以同样的方式进行 interleave，直到用完所有的文件路径。**

<div class="alert alert-block alert-info">
    <b>为了使 interleave 的效果最佳 (work best)，所有的文件长度应该相同 (identical length)。否则，最长文件的尾部数据不会被 interleave。</b>
</div>

默认，`interleave()` 不会使用 parallelism。它每次只会顺序地从每个文件读取一行。

将 `num_parallel_calls` 设置为你想的 threads，可以并行地 (in parallel) 读取文件。**`num_parallel_calls=tf.data.experiemntal.AUTOTUNE` 可以使 TensorFlow 根据可用的 (available) CPU 来动态地选择合适数量的 threads。**

In [38]:
for line in interleave_dataset.take(5):
    print(line)

tf.Tensor(b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504', shape=(), dtype=string)
tf.Tensor(b'8.72,44.0,6.163179916317992,1.0460251046025104,668.0,2.794979079497908,34.2,-118.18,4.159', shape=(), dtype=string)
tf.Tensor(b'3.8456,35.0,5.461346633416459,0.9576059850374065,1154.0,2.8778054862842892,37.96,-122.05,1.598', shape=(), dtype=string)
tf.Tensor(b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526', shape=(), dtype=string)
tf.Tensor(b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625', shape=(), dtype=string)


以上，是随机选择的 5 个 CSV 文件的第一行。

这些数据需要进一步处理。

### 3.3 数据预处理

In [39]:
x_mean

array([ 3.89175860e+00,  2.86245478e+01,  5.45593655e+00,  1.09963474e+00,
        1.42428122e+03,  2.95886657e+00,  3.56464315e+01, -1.19584363e+02])

In [40]:
x_std

array([1.90927329e+00, 1.26409177e+01, 2.55038070e+00, 4.65460128e-01,
       1.09576000e+03, 2.36138048e+00, 2.13456672e+00, 2.00093304e+00])

使用 **`tf.io.decode_csv()`** 可以将 CSV records 转换为 tensors。每一个 column 对应一个 
tensor。

该函数至少接收 2 个参数——第一个参数是需要解析的行，第二个参数是 CSV 文件每一个 column 的默认值组成的数组。这个默认值数组不仅指定了每个 column 的默认值，还指定了 column 的数量和它们的类型。

**输出：** `tf.Tensor` 组成的 list。tensor 类型与 `record_defaults` 对应，每个 tensor 的 shape 相同。

In [41]:
record_defaults = [0, np.nan, tf.constant(np.nan, dtype=tf.float64), 'hello',
                  tf.constant([])]
record_parsed = tf.io.decode_csv(
    # string 类型的 Tensor
    # 每个 string 是一个 row/record，所有的 records 应该具有相同的 format
    records='1,2,3,4,5',
    # 指定类型 tensor 组成的 list
    record_defaults=record_defaults,
    # recorde 的分隔符
    field_delim=',')
record_parsed

[<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=float32, numpy=2.0>,
 <tf.Tensor: shape=(), dtype=float64, numpy=3.0>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'4'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

In [42]:
# column 为空的用 record_defaults 的对应值来替代
record_parsed = tf.io.decode_csv(',,,,5', record_defaults)
record_parsed

[<tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'hello'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

**第 5 个 column 是强制的 (compulsory)，因为指定的默认值是 `tf.constant([])`：**

In [43]:
try:
    record_parsed = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


In [44]:
# 如果最后一个 tf.constant(1)，就不会报错了
record_defaults_2 = [0, np.nan, tf.constant(np.nan, dtype=tf.float64), 'hello',
                     tf.constant(1)]
tf.io.decode_csv(',,,,', record_defaults_2)

[<tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'hello'>,
 <tf.Tensor: shape=(), dtype=int32, numpy=1>]

In [45]:
# records 的长度应该与 record_defaults 的一致
try:
    tf.io.decode_csv('1,2,3,4,5,6', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 6 in record 0 [Op:DecodeCSV]


In [46]:
def process(line, n_features=8, mean=x_mean, std=x_std):
    record_defaults = [0.] * n_features + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=record_defaults)

    # Stack tensors into a 1D array
    x = tf.stack(
        # shape 和 dtype 均相同的 tensor 组成的 list
        values=fields[:-1],
        axis=0)
    # Generate a 1D array with a single value, rather than a scalar tensor
    y = tf.stack(fields[-1:])
    return (x - mean) / std, y

看看代码吧：

- 首先，假设每个特征的平均值 (mean) 和标准差 (standard deviation) 已经计算好了。`mean` 和 `std` 都是 包含 8 个浮点数的 1D tensor （或者 NumPy ndarray），每个特征对应一个值。


- `process` 函数接收 CSV 中一行，然后 parse 它。为此，使用 `tf.io.decode_csv()` 函数，该函数接收 2 个参数——第一个参数是需要解析的行，第二个参数是 CSV 文件每一个 column 的默认值组成的数组。这个默认值数组不仅指定了每个 column 的默认值，还指定了 column 的数量和它们的类型。这里，所有的特征/column 都是 float，缺失值置为 0，最后一个 column 为 target，其默认值为 `tf.float32` 类型的空数组。这告诉 TensorFlow 这一个 column 包含 float，但是没有默认值，所以遇到缺失值的时候应该引发异常 (raise an exception)。


- `tf.io.decode_csv` 函数返回一个由 scalar tensor （每个 column 一个）组成的 list。但是，需要的是 1D tensor 数组。所以，使用 `tf.stack()`


- 最后，scale 特征，返回一个 features 和 target 组成 tuple。

In [47]:
process(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.16579159,  1.216324  , -0.05204564, -0.39215982, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

## 4. Put everything together

In [48]:
def csv_reader_dataset(filepaths,
                       repeat=1,
                       n_readers=5,
                       n_read_threads=None,
                       shuffle_buffer_size=10000,
                       n_parse_threads=5,
                       batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths)
    dataset = dataset.repeat(repeat)
    dataset = dataset.interleave(
        map_func=lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers,
        num_parallel_calls=n_read_threads
    )
    dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(
        map_func=process,
        num_parallel_calls=n_parse_threads
    )
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(1)
    return dataset

<Br>
    
![load and process data from multiple CSV files](chap13_figs/load_and_preprocess_data_from_multiple_CSV_file.png)
<br>

<center><i>load and process data from multiple CSV files</i></center>

In [49]:
tf.random.set_seed(42)

train_set = csv_reader_dataset(train_filepaths, batch_size=3)
for x_batch, y_batch in train_set.take(3):
    print("X =", x_batch)
    print("y =", y_batch)
    print()

X = tf.Tensor(
[[ 0.5804519  -0.20762321  0.05616303 -0.15191229  0.01343246  0.00604472
   1.2525111  -1.3671792 ]
 [ 5.818099    1.8491895   1.1784915   0.28173092 -1.2496178  -0.3571987
   0.7231292  -1.0023477 ]
 [-0.9253566   0.5834586  -0.7807257  -0.28213993 -0.36530012  0.27389365
  -0.76194876  0.72684526]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[1.752]
 [1.313]
 [1.535]], shape=(3, 1), dtype=float32)

X = tf.Tensor(
[[-0.8324941   0.6625668  -0.20741376 -0.18699841 -0.14536144  0.09635526
   0.9807942  -0.67250353]
 [-0.62183803  0.5834586  -0.19862501 -0.3500319  -1.1437552  -0.3363751
   1.107282   -0.8674123 ]
 [ 0.8683102   0.02970133  0.3427381  -0.29872298  0.7124906   0.28026953
  -0.72915536  0.86178064]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[0.919]
 [1.028]
 [2.182]], shape=(3, 1), dtype=float32)

X = tf.Tensor(
[[-1.0344558   1.0581076  -0.8869343  -0.08743899  0.6157541   0.4368748
  -0.75726473  0.64688075]
 [ 0.4675818   0.6625668   0.03120424 -0.

## 5. `prefetch()`

在最后调用 **`prefetch(1)`**，也会创建一个新的 dataset，这个 dataset 总是尽可能地提前一个 batch (do its best to always be one batch ahead)。即，在一个 batch 上训练的时候，dataset 会并行地准备下一个 batch（从 disk 中读取并处理）。

<font color='blue'>这可以大大提升性能</font>。

**如果能确保加载和预处理数据是线程的 (multithreaded)**（如，在 `interleave()` 和 `map()` 中设置 `num_parallel_calls`），**可以在 CPU 上利用多个核 (core)。如果准备一个 batch 的时间短于在一个 batch 在 GPU 上的训练时间（一个 training step），那么 GPU 的利用率几乎是 100%（不考虑数据从 CPU 到 GPU 的传输时间），训练将会非常快。**

<br>

![load and process data from multiple CSV files](chap13_figs/CPU_and_GPU_work_in_parallel.png)

<div class="alert alert-block alert-success">
    <b>如果打算买显卡，显卡的<font color='crimson'>处理能力 (processing power) </font>和<font color='crimson'>内存大小 (memory size) </font>是非常重要的。同样重要的还有<font color='crimson'>内存带宽 (memory bandwith) </font>——每秒从 RAM 读取多少 gigabytes 的数据或将多少 gigabytes 的数据存到 RAM 中。</b>
</div>

## 6. `cache()`

如果**数据集可以 fit in memory**，那么调用 `cache()` 可以**大大地加速训练过程**。

`cache()` 将数据内容 cache 进 RAM。

**<font color='crimson'>通常在 loading 和 preprocessing 之后，但在 shuffling、batching 和 prefetching 之前调用。</font>** 这样，每个样本只会被读取和预处理一次，而不是每个 epoch 一次。但是，数据在每个 epoch 都会进行不同的 shuffle，而且，下一个 batch 会被提前准备。

## 7. 在 `tf.keras` 中使用 dataset

In [50]:
# 不需要 repeat，tf.keras 会完成
train_set = csv_reader_dataset(train_filepaths, repeat=None)
val_set = csv_reader_dataset(val_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [51]:
tf.keras.backend.clear_session()

model = keras.Sequential([
    keras.Input(shape=x_train.shape[1:]),
    keras.layers.Dense(30, activation='relu'),
    keras.layers.Dense(1)
])

model.compile(
    optimizer=keras.optimizers.SGD(learning_rate=1e-3),
    loss='mse'
)

model.fit(
    train_set,
    epochs=10,
    steps_per_epoch=len(x_train) // 32,
    validation_data=val_set
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f0a98976390>

In [52]:
model.evaluate(test_set)



0.46504756808280945

In [53]:
# 不包含 labels，如果包含， keras 会忽略
new_set = test_set.map(lambda x, y: x)

model.predict(new_set)

array([[2.131139 ],
       [1.9502337],
       [1.3629727],
       ...,
       [1.9515147],
       [5.962754 ],
       [3.373827 ]], dtype=float32)

如果需要自定义 training loop，只要 iterate 训练集就好了。

In [54]:
optimizer = keras.optimizers.SGD(lr=0.01)
loss_fn = keras.losses.mean_squared_error

n_epochs = 5
batch_size = 32
n_step_per_epoch = len(x_train) // batch_size
total_steps = n_epochs * n_step_per_epoch
global_step = 0

for x_batch, y_batch in train_set.take(total_steps):
    global_step += 1
    print("\rGlobal step {}/{}.".format(global_step, total_steps), end="")
    
    with tf.GradientTape() as tape:
        y_pred = model(x_batch)
        main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
        loss = tf.add_n([main_loss] + model.losses)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

Global step 1810/1810.

甚至，可以创建一个 TF 函数来完成整个 training step。

In [55]:
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error


@tf.function
def train(model, n_epochs, batch_size=32,
          n_readers=5, n_read_threads=5,
          shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(
        train_filepaths, repeat=n_epochs, n_readers=n_readers,
        n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,
        n_parse_threads=n_parse_threads, batch_size=batch_size)

    for x_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(x_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

In [56]:
train(model, 5)

In [57]:
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error


@tf.function
def train(model, n_epochs, batch_size=32,
          n_readers=5, n_read_threads=5,
          shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(
        train_filepaths, repeat=n_epochs, n_readers=n_readers,
        n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,
        n_parse_threads=n_parse_threads, batch_size=batch_size)

    n_steps_per_epoch = len(x_train) // batch_size
    total_steps = n_epochs * n_steps_per_epoch
    global_step = 0

    for x_batch, y_batch in train_set:
        global_step += 1
        if tf.equal(global_step % 100, 0):
            tf.print("\rGlobal step", global_step, "/", total_steps)

        with tf.GradientTape() as tape:
            y_pred = model(x_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))


train(model, 5)

Global step 100 / 1810
Global step 200 / 1810
Global step 300 / 1810
Global step 400 / 1810
Global step 500 / 1810
Global step 600 / 1810
Global step 700 / 1810
Global step 800 / 1810
Global step 900 / 1810
Global step 1000 / 1810
Global step 1100 / 1810
Global step 1200 / 1810
Global step 1300 / 1810
Global step 1400 / 1810
Global step 1500 / 1810
Global step 1600 / 1810
Global step 1700 / 1810
Global step 1800 / 1810


到这，学会使用 Data API 了。

到目前为止，使用的都是 CSV 文件，虽然这很常用、简单和方便，但是效率并不高，而且不能很好地支持大的或者复杂的数据结构（如 images 或 audio）。这时就需要 TFRecords 了。

当然，也不一定非要使用 TFRecords。当训练期间的主要 bottleneck 是加载和解析数据的时候，TFRecords 就很有用啦。