# tf.data API使用

In [1]:
# 导入
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl,np,pd,sklearn,tf,keras:
    print(module.__name__,module.__version__)

2.0.0
sys.version_info(major=3, minor=6, micro=10, releaselevel='final', serial=0)
matplotlib 3.1.2
numpy 1.18.1
pandas 0.25.3
sklearn 0.22.1
tensorflow 2.0.0
tensorflow_core.keras 2.2.4-tf


### 利用内存中的数据构建Dataset

#### numpy生成数据集

In [2]:
# 从内存中构建数据集
dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))
print(dataset)

<TensorSliceDataset shapes: (), types: tf.int32>


In [3]:
# 对dataset遍历
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


1.repeat操作，每一个epoch就是遍历一遍数据

2.get batch获取batch size个数据

In [4]:
dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


3.interleave：对现有的dataset中的每个数据做处理，处理后产生新的结果，interleave将结果合并起来形成新的数据集

常见的case：文件dataset -> 具体数据集

现有的dataset存储的是一系列文件的文件名，使用interleave做变化，遍历文件名数据集中的所有元素--文件名，把文件名对应的文件内容读取出来，这样的文件名就形成了新的数据集，interleave再把新的数据集合并起来，成为总的大数据集

In [5]:
dataset2 = dataset.interleave(
    # map_fn 做什么样的变化
    # cycle_length 并行的处理多少个元素
    # block_length 从上面的变化中每次取多少个元素出来
    lambda v: tf.data.Dataset.from_tensor_slices(v),
    cycle_length=5,
    block_length=5, # 达到均匀混合的效果
)
for item in dataset2:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype

#### python原生的数据类型生成数据集

In [6]:
x = np.array([[1, 2], [3, 4], [5 ,6]])
y = np.array(['cat', 'dog', 'fox'])
dataset3 = tf.data.Dataset.from_tensor_slices((x,y))
print(dataset3)

for item_x,item_y in dataset3:
    print(item_x.numpy(), item_y.numpy())

<TensorSliceDataset shapes: ((2,), ()), types: (tf.int32, tf.string)>
[1 2] b'cat'
[3 4] b'dog'
[5 6] b'fox'


In [7]:
dataset4 = tf.data.Dataset.from_tensor_slices({"feature": x, "label": y})
for item in dataset4:
    print(item)
    print(item['feature'].numpy(), item['label'].numpy())

{'feature': <tf.Tensor: id=98, shape=(2,), dtype=int32, numpy=array([1, 2])>, 'label': <tf.Tensor: id=99, shape=(), dtype=string, numpy=b'cat'>}
[1 2] b'cat'
{'feature': <tf.Tensor: id=100, shape=(2,), dtype=int32, numpy=array([3, 4])>, 'label': <tf.Tensor: id=101, shape=(), dtype=string, numpy=b'dog'>}
[3 4] b'dog'
{'feature': <tf.Tensor: id=102, shape=(2,), dtype=int32, numpy=array([5, 6])>, 'label': <tf.Tensor: id=103, shape=(), dtype=string, numpy=b'fox'>}
[5 6] b'fox'


### 从CSV文件中读取数据生成数据集
在文件中读取文件内容，构成Dataset，进行使用，以及在keras中进行使用
#### 生成csv文件

In [8]:
# 导入数据集 房价预测
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

In [9]:
# 切分数据集
from sklearn.model_selection import train_test_split

x_train_all,x_test,y_train_all,y_test = train_test_split(
    housing.data,housing.target,random_state=7)
x_train,x_valid,y_train,y_valid = train_test_split(
    x_train_all,y_train_all,random_state=11)

print(x_train.shape,y_train.shape)
print(x_valid.shape,y_valid.shape)
print(x_test.shape,y_test.shape)

(11610, 8) (11610,)
(3870, 8) (3870,)
(5160, 8) (5160,)


In [10]:
# 对数据进行归一化
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

In [11]:
import shutil
# 定义生成csv文件的文件夹
output_dir = os.path.join('generate_csv')
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
else:
    shutil.rmtree(output_dir)
    os.mkdir(output_dir)
    
# 将x和y按行进行merge
train_data = np.c_[x_train_scaled, y_train]
valid_data = np.c_[x_valid_scaled, y_valid]
test_data = np.c_[x_test_scaled, y_test]

# 定义header信息
header_cols = housing.feature_names + ['MidianHouseValue']
header_str = ','.join(header_cols)

In [12]:
# 将数据保存到csv文件
def save_to_csv(output_dir, data, name_prefix, header=None, n_parts=10):
    path_format = os.path.join(output_dir, '{}_{:02d}.csv') # 定义文件名格式
    filenames = []
    
    # 将数据分成n_parts个部分，获得每组索引和值，遍历
    for file_idx, row_indices in enumerate(
        np.array_split(np.arange(len(data)), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filenames.append(part_csv)
        with open(part_csv, 'wt', encoding='utf-8') as f:
            if header is not None:
                f.write(header + '\n')
            # 对每一组进行遍历，将取到的col字符串化，再用逗号连接
            for row_index in row_indices:
                f.write(','.join([repr(col) for col in data[row_index]]))
                f.write('\n')
    # 返回所有的文件
    return filenames

train_filenames = save_to_csv(output_dir,train_data,'train',header_str,n_parts=20)
valid_filenames = save_to_csv(output_dir,valid_data,'valid',header_str,n_parts=10)
test_filenames = save_to_csv(output_dir,test_data,'test',header_str,n_parts=10)

#### 解析csv文件

In [13]:
# 打印文件名
import pprint
print('train filenames:')
pprint.pprint(train_filenames)
print('valid filenames:')
pprint.pprint(valid_filenames)
print('test filenames:')
pprint.pprint(test_filenames)

train filenames:
['generate_csv\\train_00.csv',
 'generate_csv\\train_01.csv',
 'generate_csv\\train_02.csv',
 'generate_csv\\train_03.csv',
 'generate_csv\\train_04.csv',
 'generate_csv\\train_05.csv',
 'generate_csv\\train_06.csv',
 'generate_csv\\train_07.csv',
 'generate_csv\\train_08.csv',
 'generate_csv\\train_09.csv',
 'generate_csv\\train_10.csv',
 'generate_csv\\train_11.csv',
 'generate_csv\\train_12.csv',
 'generate_csv\\train_13.csv',
 'generate_csv\\train_14.csv',
 'generate_csv\\train_15.csv',
 'generate_csv\\train_16.csv',
 'generate_csv\\train_17.csv',
 'generate_csv\\train_18.csv',
 'generate_csv\\train_19.csv']
valid filenames:
['generate_csv\\valid_00.csv',
 'generate_csv\\valid_01.csv',
 'generate_csv\\valid_02.csv',
 'generate_csv\\valid_03.csv',
 'generate_csv\\valid_04.csv',
 'generate_csv\\valid_05.csv',
 'generate_csv\\valid_06.csv',
 'generate_csv\\valid_07.csv',
 'generate_csv\\valid_08.csv',
 'generate_csv\\valid_09.csv']
test filenames:
['generate_csv\\test

解析csv步骤：
1. filename -> dataset
2. read file -> dataset -> datasets -> merge
3. parse csv

In [14]:
# 1.将文件名构成数据集
# list_files专门处理文件名，会把文件名生成Dataset
filename_dataset = tf.data.Dataset.list_files(train_filenames) # 将训练集所有文件构成dataset
for filename in filename_dataset:
    print(filename)

tf.Tensor(b'generate_csv\\train_14.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_10.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_16.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_11.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_07.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_17.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_08.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_04.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_12.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_19.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_09.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_01.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_03.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_15.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_00.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\

In [15]:
# 2.读取文件里的内容，构成新的dataset，再合并
n_readers = 5
dataset = filename_dataset.interleave(
    lambda filename: tf.data.TextLineDataset(filename).skip(1),# 按行读取文件内容，跳过第一行header
    cycle_length=n_readers
)
# 读取15条数据
for line in dataset.take(15):
    print(line.numpy())

b'0.6303435674178064,1.874166156711919,-0.06713214279531016,-0.12543366804152128,-0.19737553788322462,-0.022722631725889016,-0.692407235065288,0.7265233438487496,2.419'
b'-1.0591781535672364,1.393564736946074,-0.026331968874673636,-0.11006759528831847,-0.6138198966579805,-0.09695934953589447,0.3247131133362288,-0.037477245413977976,0.672'
b'-0.8219588176953616,1.874166156711919,0.18212349433218608,-0.03170019246279883,-0.6011178900722581,-0.14337494105109344,1.0852205298015787,-0.8613994495208361,1.054'
b'0.6363646332204844,-1.0895425985107923,0.09260902815633619,-0.20538124656801682,1.2025670451003232,-0.03630122549633783,-0.6784101660505877,0.182235342347858,2.429'
b'-1.1157655153587753,0.9930635538078697,-0.33419201318312125,-0.0653521844775239,-0.3289320346639209,0.04343065774347637,-0.12785878480573185,0.30707203993980686,0.524'
b'1.6312258686346301,0.3522616607867429,0.04080576110152256,-0.1408895163348976,-0.4632103899987006,-0.06751623819156843,-0.8277122355407183,0.59669317835

In [16]:
# 3.解析csv
# tf.io.decode_csv(str,record_defaults)

sample_str = '1,2,3,4,5'
# 设置记录的默认值和数据类型
record_defaults = [
    tf.constant(0, dtype=tf.int32),
    0,
    np.nan,
    'hello',
    tf.constant([])
]
parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parsed_fields)

[<tf.Tensor: id=203, shape=(), dtype=int32, numpy=1>, <tf.Tensor: id=204, shape=(), dtype=int32, numpy=2>, <tf.Tensor: id=205, shape=(), dtype=float32, numpy=3.0>, <tf.Tensor: id=206, shape=(), dtype=string, numpy=b'4'>, <tf.Tensor: id=207, shape=(), dtype=float32, numpy=5.0>]


In [17]:
try:
    parsed_fields = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


In [18]:
try:
    parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


#### 读取csv文件

In [21]:
# 将csv中的一行转化
def parse_csv_line(line, n_fields=9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1]) # 将前n-1条数据转化为x向量
    y = tf.stack(parsed_fields[-1:]) # 将最后一条转化为y向量
    return x, y

parse_csv_line(b'0.6289049056773436,-0.44874070548966555,0.011390452394941722,-0.21388453904713714,0.13196934716086342,-0.08002252121823207,-0.883700511599516,0.8813208488627673,2.522',
               n_fields=9)

(<tf.Tensor: id=296, shape=(8,), dtype=float32, numpy=
 array([ 0.6289049 , -0.4487407 ,  0.01139045, -0.21388453,  0.13196935,
        -0.08002252, -0.8837005 ,  0.88132083], dtype=float32)>,
 <tf.Tensor: id=297, shape=(1,), dtype=float32, numpy=array([2.522], dtype=float32)>)

完整流程实现csv读取、解析

In [22]:
def csv_reader_dateset(filenames, n_readers=5, batch_size=32,
                       n_parse_threads=5, shuffle_buffer_size=10000):
    # 1.filename -> filename dataset
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat() # 将数据重复无限次
    # 2.filename dataset -> text dataset 一对多的关系
    dataset = dataset.interleave(
        lambda filename : tf.data.TextLineDataset(filename).skip(1),
        cycle_length=n_readers
    )
    dataset.shuffle(shuffle_buffer_size) # 将数据进行混排
    # 3.parse csv 一对一关系
    dataset = dataset.map(parse_csv_line, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

# test
train_set = csv_reader_dateset(train_filenames, batch_size=3)
for x_batch, y_batch in train_set.take(2):
    print('x:')
    pprint.pprint(x_batch)
    print('y:')
    pprint.pprint(y_batch)

x:
<tf.Tensor: id=381, shape=(3, 8), dtype=float32, numpy=
array([[ 0.63636464, -1.0895426 ,  0.09260903, -0.20538124,  1.2025671 ,
        -0.03630123, -0.6784102 ,  0.18223535],
       [ 0.63034356,  1.8741661 , -0.06713215, -0.12543367, -0.19737554,
        -0.02272263, -0.69240725,  0.72652334],
       [-1.0591781 ,  1.3935647 , -0.02633197, -0.1100676 , -0.6138199 ,
        -0.09695935,  0.3247131 , -0.03747724]], dtype=float32)>
y:
<tf.Tensor: id=382, shape=(3, 1), dtype=float32, numpy=
array([[2.429],
       [2.419],
       [0.672]], dtype=float32)>
x:
<tf.Tensor: id=383, shape=(3, 8), dtype=float32, numpy=
array([[ 0.48530516, -0.8492419 , -0.06530126, -0.02337966,  1.4974351 ,
        -0.07790658, -0.90236324,  0.78145146],
       [ 0.40127665, -0.92934215, -0.0533305 , -0.18659453,  0.65456617,
         0.02643447,  0.9312528 , -1.4406418 ],
       [-1.0635474 ,  1.8741661 , -0.49344894, -0.06962613, -0.27358758,
        -0.13419515,  1.033898  , -1.3457658 ]], dtype=float32)

In [23]:
batch_size = 32
train_set = csv_reader_dateset(train_filenames, batch_size=batch_size)
valid_set = csv_reader_dateset(valid_filenames, batch_size=batch_size)
test_set = csv_reader_dateset(test_filenames, batch_size=batch_size)

#### 与keras集成

In [24]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',input_shape=[8]),
    keras.layers.Dense(1)
])
model.summary()

model.compile(loss='mean_squared_error',optimizer='adam')

callbacks = [keras.callbacks.EarlyStopping(patience=5,min_delta=1e-2)]
history = model.fit(train_set,
                    validation_data = valid_set,
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 30)                270       
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 31        
Total params: 301
Trainable params: 301
Non-trainable params: 0
_________________________________________________________________
Train for 348 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100


In [25]:
model.evaluate(test_set, steps = 5160 // batch_size)



0.3523958790931642