In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

2.0.0
sys.version_info(major=3, minor=6, micro=5, releaselevel='final', serial=0)
matplotlib 2.2.2
numpy 1.19.2
pandas 0.23.0
sklearn 0.23.2
tensorflow 2.0.0
tensorflow_core.keras 2.2.4-tf


In [2]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

print(housing.data.shape)
print(housing.target.shape)

(20640, 8)
(20640,)


In [3]:
from sklearn.model_selection import train_test_split

## 划分训练集,验证集和测试集
x_train_all, x_test, y_train_all, y_test = train_test_split(
    housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(
    x_train_all, y_train_all, random_state = 11)
print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)


(11610, 8) (11610,)
(3870, 8) (3870,)
(5160, 8) (5160,)


In [4]:
from sklearn.preprocessing import StandardScaler
 
# 进行数据标准化
scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

tf.data生成csv文件

In [5]:
output_dir = "generate_csv"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

    
def save_to_csv(output_dir, 
                data, 
                name_prefix,
                header=None, 
                n_parts = 10):
    
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")
    filenames = []
    
    for file_idx, row_indices in enumerate(
        # 拆分成n_parts个数组
        np.array_split(np.arange(len(data)), n_parts)):
        
        path_csv = path_format.format(name_prefix, file_idx)
        filenames.append(path_csv) 
        
        with open(path_csv, "wt", encoding = "utf-8") as f:
            if header is not None:
                f.write(header + "\n")
            for row_index in row_indices:
                # 输出 ",拼接的字符串"
                f.write(",".join([repr(col) for col in data[row_index]]))
                f.write("\n")     
    return filenames

# np.c_是按行连接两个矩阵，就是把两矩阵左右相加，要求行数相等
train_data = np.c_[x_train_scaled, y_train]
valid_data = np.c_[x_valid_scaled, y_valid]
test_data = np.c_[x_test_scaled, y_test]


header_cols = housing.feature_names + ["MidianHouseValue"]
header_str = ",".join(header_cols)

# 所有训练文件的文件名
train_filenames = save_to_csv(output_dir, train_data, "train",
                             header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid",
                              header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",
                             header_str, n_parts=10)    

In [6]:
print(train_data) 

[[ 0.80154431  0.27216142 -0.11624393 ... -0.58976206 -0.08241846
   3.226     ]
 [-0.29807281  0.35226166 -0.10920508 ...  1.08055484 -1.06113817
   1.514     ]
 [-0.03058829 -0.92934213  0.25962148 ...  1.59844639 -1.81515182
   1.598     ]
 ...
 [-1.11006415 -1.40994355 -0.57897311 ...  1.76174553 -2.13473376
   1.5       ]
 [ 0.32465459  0.27216142 -0.10777932 ... -0.65508172  0.64662786
   2.636     ]
 [-0.10982126 -0.52884094  0.25735571 ... -1.14497913  1.17094199
   1.925     ]]


In [7]:
print(train_filenames)

['generate_csv\\train_00.csv', 'generate_csv\\train_01.csv', 'generate_csv\\train_02.csv', 'generate_csv\\train_03.csv', 'generate_csv\\train_04.csv', 'generate_csv\\train_05.csv', 'generate_csv\\train_06.csv', 'generate_csv\\train_07.csv', 'generate_csv\\train_08.csv', 'generate_csv\\train_09.csv', 'generate_csv\\train_10.csv', 'generate_csv\\train_11.csv', 'generate_csv\\train_12.csv', 'generate_csv\\train_13.csv', 'generate_csv\\train_14.csv', 'generate_csv\\train_15.csv', 'generate_csv\\train_16.csv', 'generate_csv\\train_17.csv', 'generate_csv\\train_18.csv', 'generate_csv\\train_19.csv']


读取output_dir所有文件生成一个dataset

In [8]:
import pprint
print("train filenames:")
pprint.pprint(train_filenames)
print("valid filenames:")
pprint.pprint(valid_filenames)
print("test filenames:")
pprint.pprint(test_filenames)

train filenames:
['generate_csv\\train_00.csv',
 'generate_csv\\train_01.csv',
 'generate_csv\\train_02.csv',
 'generate_csv\\train_03.csv',
 'generate_csv\\train_04.csv',
 'generate_csv\\train_05.csv',
 'generate_csv\\train_06.csv',
 'generate_csv\\train_07.csv',
 'generate_csv\\train_08.csv',
 'generate_csv\\train_09.csv',
 'generate_csv\\train_10.csv',
 'generate_csv\\train_11.csv',
 'generate_csv\\train_12.csv',
 'generate_csv\\train_13.csv',
 'generate_csv\\train_14.csv',
 'generate_csv\\train_15.csv',
 'generate_csv\\train_16.csv',
 'generate_csv\\train_17.csv',
 'generate_csv\\train_18.csv',
 'generate_csv\\train_19.csv']
valid filenames:
['generate_csv\\valid_00.csv',
 'generate_csv\\valid_01.csv',
 'generate_csv\\valid_02.csv',
 'generate_csv\\valid_03.csv',
 'generate_csv\\valid_04.csv',
 'generate_csv\\valid_05.csv',
 'generate_csv\\valid_06.csv',
 'generate_csv\\valid_07.csv',
 'generate_csv\\valid_08.csv',
 'generate_csv\\valid_09.csv']
test filenames:
['generate_csv\\test

In [9]:
# 读取output_dir所有文件生成一个dataset
# tf.data.Dataset.list_files(train_filenames) 读取文件列表生成一个dataset列表

filename_dataset = tf.data.Dataset.list_files(train_filenames)
for filename in filename_dataset:
    print(filename)

tf.Tensor(b'generate_csv\\train_11.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_03.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_19.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_09.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_16.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_05.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_14.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_10.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_00.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_12.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_15.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_13.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_06.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_08.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_01.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\

interleave()是Dataset的类方法，所以interleave是作用在一个Dataset上的

    interleave(
        map_func,
        cycle_length=AUTOTUNE,
        block_length=1,
        num_parallel_calls=None
    )

解释：

    假定我们现在有一个Dataset——A
    从该A中取出cycle_length个element，然后对这些element apply map_func,得到cycle_length个新的Dataset对象。
    然后从这些新生成的Dataset对象中取数据，取数逻辑为轮流从每个对象里面取数据，每次取block_length个数据
    当这些新生成的某个Dataset的对象取尽时，从原Dataset中再取cycle_length个element，然后apply map_func，以此类推。

In [10]:
# interleave
# 跳过头文件读取生成的csv文件内容
n_readers = 5
dataset = filename_dataset.interleave(
#     lambda filename: tf.data.TextLineDataset(filename),
    lambda filename: tf.data.TextLineDataset(filename).skip(1),
    cycle_length = n_readers  
)

# 查看前15个
for line in dataset.take(15):
    print(line.numpy())

b'-0.32652634129448693,0.43236189741438374,-0.09345459539684739,-0.08402991822890092,0.8460035745154013,-0.0266316482653991,-0.5617679242614233,0.1422875991184281,2.431'
b'-0.09719300311107498,-1.249743071766074,0.36232962250170797,0.026906080250728295,1.033811814747154,0.045881586971778555,1.3418334617377423,-1.6353869745909178,1.832'
b'0.04971034572063198,-0.8492418886278699,-0.06214699417830008,0.17878747064657746,-0.8025354230744277,0.0005066066922077538,0.6466457006743215,-1.1060793768010604,2.286'
b'0.09734603446040174,0.7527628439249472,-0.20218964416999152,-0.1954700015215477,-0.4060513603629498,0.006785531677655949,-0.813715166526018,0.656614793197258,1.119'
b'0.15782311132800697,0.43236189741438374,0.3379948076652917,-0.015880306122244434,-0.3733890577139493,-0.05305245634489608,0.8006134598360177,-1.2359095422966828,3.169'
b'2.2754266257529974,-1.249743071766074,1.0294788075585177,-0.17124431895714504,-0.45413752815175606,0.10527151658164971,-0.9023632702857819,0.90129472047

读取生成的csv文件内容

用法：

    tf.io.decode_csv(str, record_defaults)
    
    str 字符串
    
    record_defaults 字符串的类型

作用：将CSV记录转换为张量。每列映射到一个张量。


In [11]:
sample_str = "1,2,3,4,5"
record_defaults = [tf.constant(0, dtype=tf.int32)] * 5
parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parsed_fields)

[<tf.Tensor: id=96, shape=(), dtype=int32, numpy=1>, <tf.Tensor: id=97, shape=(), dtype=int32, numpy=2>, <tf.Tensor: id=98, shape=(), dtype=int32, numpy=3>, <tf.Tensor: id=99, shape=(), dtype=int32, numpy=4>, <tf.Tensor: id=100, shape=(), dtype=int32, numpy=5>]


In [12]:
sample_str = "1,2,3,4,5"
record_defaults = [tf.constant(0, dtype=tf.int32),
                  0,
                  np.nan,
                  "hello",
                  tf.constant([])]

parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parsed_fields)

[<tf.Tensor: id=107, shape=(), dtype=int32, numpy=1>, <tf.Tensor: id=108, shape=(), dtype=int32, numpy=2>, <tf.Tensor: id=109, shape=(), dtype=float32, numpy=3.0>, <tf.Tensor: id=110, shape=(), dtype=string, numpy=b'4'>, <tf.Tensor: id=111, shape=(), dtype=float32, numpy=5.0>]


In [13]:
try:
    parsed_files = tf.io.decode_csv(',,,,',  record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


In [14]:
try:
    parsed_files = tf.io.decode_csv('1,2,3,4,5,6,7',  record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


In [15]:
# 解析csv文件
def parse_csv_line(line, n_fields=9):
    '''
        line：文件
        n_fields: 文件个数
    '''
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:]) # 最后一个值
    
    return x, y

#测试
parse_csv_line(b'0.6363646332204844,-1.0895425985107923,0.09260902815633619,-0.20538124656801682,1.2025670451003232,-0.03630122549633783,-0.6784101660505877,0.182235342347858,2.429',
               n_fields=9)

(<tf.Tensor: id=131, shape=(8,), dtype=float32, numpy=
 array([ 0.63636464, -1.0895426 ,  0.09260903, -0.20538124,  1.2025671 ,
        -0.03630123, -0.6784102 ,  0.18223535], dtype=float32)>,
 <tf.Tensor: id=132, shape=(1,), dtype=float32, numpy=array([2.429], dtype=float32)>)

读取output_dir所有文件生成一个dataset

https://blog.csdn.net/anshuai_aw1/article/details/105094548

In [16]:
# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
def csv_reader_dataset(filenames, 
                       n_readers=5,
                       batch_size=32, 
                       n_parse_threads=5,
                       shuffle_buffer_size=10000):
    
    # 读取所有的文件名生成dataset
    dataset = tf.data.Dataset.list_files(filenames)
    #为了配合输出次数，一般默认repeat()空
    dataset = dataset.repeat()
    # 跳过头行读取所有数据内容
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers 
    )
    
    # 打乱顺序
    dataset.shuffle(shuffle_buffer_size)
    # map接收一个函数，Dataset中的每个元素都会被当作这个函数的输入，
    # 并将函数返回值作为新的Dataset
    dataset = dataset.map(parse_csv_line,
                          num_parallel_calls=n_parse_threads)
    # 一次读取batch_size个数据
    dataset = dataset.batch(batch_size)
    return dataset

train_set = csv_reader_dataset(train_filenames, batch_size=3)
for x_batch, y_batch in train_set.take(2):
    print("x:")
    pprint.pprint(x_batch)
    print("y:")
    pprint.pprint(y_batch)

x:
<tf.Tensor: id=216, shape=(3, 8), dtype=float32, numpy=
array([[ 8.1150836e-01, -4.8239522e-02,  5.1873392e-01, -2.9386396e-02,
        -3.4064025e-02, -5.0815947e-02, -7.1573567e-01,  9.1627514e-01],
       [ 6.3034356e-01,  1.8741661e+00, -6.7132145e-02, -1.2543367e-01,
        -1.9737554e-01, -2.2722632e-02, -6.9240725e-01,  7.2652334e-01],
       [ 4.9710345e-02, -8.4924191e-01, -6.2146995e-02,  1.7878747e-01,
        -8.0253541e-01,  5.0660671e-04,  6.4664572e-01, -1.1060793e+00]],
      dtype=float32)>
y:
<tf.Tensor: id=217, shape=(3, 1), dtype=float32, numpy=
array([[2.147],
       [2.419],
       [2.286]], dtype=float32)>
x:
<tf.Tensor: id=218, shape=(3, 8), dtype=float32, numpy=
array([[ 0.63636464, -1.0895426 ,  0.09260903, -0.20538124,  1.2025671 ,
        -0.03630123, -0.6784102 ,  0.18223535],
       [-1.1157656 ,  0.99306357, -0.334192  , -0.06535219, -0.32893205,
         0.04343066, -0.12785879,  0.30707204],
       [-0.69061434, -0.12833975,  7.020181  ,  5.6242876 

In [17]:
# 读取所有的csv文件
batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)

tf.data读取csv文件并与tf.keras结合使用

In [18]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',
                       input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(train_set,
                    validation_data = valid_set,
                    steps_per_epoch = len(x_train) // batch_size,
                    validation_steps = len(x_valid) // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Train for 362 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100


In [19]:
model.evaluate(test_set, steps = len(x_test) // batch_size)



0.45776501066566255

In [22]:
a = tf.data.Dataset.range(1, 6)  # ==> [ 1, 2, 3, 4, 5 ]
# NOTE: New lines indicate "block" boundaries.
b=a.interleave(lambda x: tf.data.Dataset.from_tensors(x).repeat(6),
            cycle_length=2, block_length=4) 


for item in b:
    print(item)

tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype