In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

2.2.0
sys.version_info(major=3, minor=8, micro=2, releaselevel='final', serial=0)
matplotlib 3.2.1
numpy 1.18.5
pandas 1.0.4
sklearn 0.23.1
tensorflow 2.2.0
tensorflow.keras 2.3.0-tf


In [2]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

In [4]:
from sklearn.model_selection import train_test_split

x_train_all, x_test, y_train_all, y_test = train_test_split(
    housing.data, housing.target, random_state = 668)
x_train, x_valid, y_train, y_valid = train_test_split(
    x_train_all, y_train_all, random_state = 168)

In [5]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

In [8]:
# 指定生成文件的目录
output_dir = 'generate_csv'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

# 定义函数实现将数据保存到对应csv文件中，传入文件目录，数据，文件前缀，
# 默认头部为None，文件分割数为10
def save_to_csv(output_dir, data, name_prefix, header=None, n_parts=10):
    
    # 生成文件名
    path_format = os.path.join(output_dir, '{}_{:02d}.csv')
    filenames = []  # 用以存储子文件名
    
    # 分割数据
    # len()得到数据长度，np.arange()生成对应长度的数组，n_parts为默认的分割数
    # np.array_split()将生成数组按分割数进行分割，返回分割后的子数组对象
    # enumerate()返回对象下标和对象，
    for file_idx, row_indices in enumerate(np.array_split(np.arange(len(data)), n_parts)):
        
        # 生成子文件名，拼接前缀和文件名
        part_csv = path_format.format(name_prefix, file_idx)
        filenames.append(part_csv)
        
        with open(part_csv, 'wt', encoding='utf-8') as f:
            if header is not None:
                f.write(header + '\n')
            
            # 拿到数据切片中的各行
            for row_index in row_indices:
                # 遍历行中的数据转化为字符串后作为字符串列表
                # 用逗号拼接内容，使用\n换到下一行
                f.write(','.join([repr(col) for col in data[row_index]]))
                f.write('\n')
         
    return filenames

# 使用np.c_将特征值和目标值合并起来
train_data = np.c_[x_train_scaled, y_train]
valid_data = np.c_[x_valid_scaled, y_valid]
test_data = np.c_[x_test_scaled, y_test]

# 创建头部
header_cols = housing.feature_names + ['MidianHouseValue']
header_str = ','.join(header_cols)

train_filenames = save_to_csv(output_dir, train_data, 'train', header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, 'valid', header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, 'train', header_str, n_parts=10)

In [9]:
import pprint
print("train filenames:")
pprint.pprint(train_filenames)
print("valid filenames:")
pprint.pprint(valid_filenames)
print("test filenames:")
pprint.pprint(test_filenames)

train filenames:
['generate_csv/train_00.csv',
 'generate_csv/train_01.csv',
 'generate_csv/train_02.csv',
 'generate_csv/train_03.csv',
 'generate_csv/train_04.csv',
 'generate_csv/train_05.csv',
 'generate_csv/train_06.csv',
 'generate_csv/train_07.csv',
 'generate_csv/train_08.csv',
 'generate_csv/train_09.csv',
 'generate_csv/train_10.csv',
 'generate_csv/train_11.csv',
 'generate_csv/train_12.csv',
 'generate_csv/train_13.csv',
 'generate_csv/train_14.csv',
 'generate_csv/train_15.csv',
 'generate_csv/train_16.csv',
 'generate_csv/train_17.csv',
 'generate_csv/train_18.csv',
 'generate_csv/train_19.csv']
valid filenames:
['generate_csv/valid_00.csv',
 'generate_csv/valid_01.csv',
 'generate_csv/valid_02.csv',
 'generate_csv/valid_03.csv',
 'generate_csv/valid_04.csv',
 'generate_csv/valid_05.csv',
 'generate_csv/valid_06.csv',
 'generate_csv/valid_07.csv',
 'generate_csv/valid_08.csv',
 'generate_csv/valid_09.csv']
test filenames:
['generate_csv/train_00.csv',
 'generate_csv/train

In [11]:
# 数据分割后，得到存储数据切片的不同文件
# 从文件中获取数据

# 使用.list_files()将文件名合并为一个文件名数据集dataset
filename_dataset = tf.data.Dataset.list_files(train_filenames)
for filename in filename_dataset:
    print(filename)

tf.Tensor(b'generate_csv/train_19.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_07.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_18.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_00.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_16.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_03.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_04.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_15.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_08.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_13.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_05.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_17.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_14.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_01.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_12.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv/train_02.csv', 

In [12]:
# 指定读取数目
n_readers = 5

# tf.data.TextLineDataset()，从另一个数据集中获取数据组成新的数据集
dataset = filename_dataset.interleave(
    lambda filename: tf.data.TextLineDataset(filename).skip(1),
    cycle_length = n_readers
)

# .take()指定数据获取量
for line in dataset.take(15):
    print(line.numpy())
    

b'-0.3281387370556396,-1.0044649142435087,-0.17411778092728217,-0.05180209490726485,-0.17902174411178695,-0.07903731619060342,-1.306450703555216,1.3694277715423138,2.021'
b'-0.19443506348560768,0.344485803429447,-0.2538104784225457,-0.22113402704060262,0.326656892234898,-0.04924278272520294,-1.3626632513495633,1.2498801887923283,1.265'
b'-0.3930596464705963,-1.4012151253237897,1.2666081079428901,1.4297680870288512,-0.4375973818946468,-0.15971991501483038,-1.1331286811893087,1.21999329310483,2.813'
b'-0.7283103876938144,0.5031858878615595,-0.31689488668120985,-0.15763087136164158,0.28606139961369814,0.07427588109429509,-0.13535595783961848,0.31342412391742397,0.54'
b'-1.1295779700825632,1.1379862255900093,-0.6165760998210988,-0.21693328826697142,-0.545263688411742,-0.08322788611673292,-0.763062741543177,0.6122930807923913,1.897'
b'-0.7762704485879827,1.3760363522381778,-0.46059818054710583,0.10980547641775501,-0.10400833383348292,0.03301655309248693,-0.7677471205260418,0.721878364979879

In [28]:
# 使用tf.io.decode_csv()对字符串内容进行解析

sample_str = '1, 2, 3, 4'

# 指定解析内容类型，一一对应解析内容
record_defaults = [
    tf.constant(1, dtype=tf.int32),
    'hello',
    np.nan,
    'hello',
#     tf.constant([]),  # 默认为float32
#     'hello',
]


parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parsed_fields)

[<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=string, numpy=b' 2'>, <tf.Tensor: shape=(), dtype=float32, numpy=3.0>, <tf.Tensor: shape=(), dtype=string, numpy=b' 4'>]


In [29]:
# 无法解析空字符串
try:
    parsed_fields = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 4 fields but have 5 in record 0 [Op:DecodeCSV]


In [31]:
def parse_csv_line(line, n_fields=9):
    
    # 根据长度定义默认类型均为tf.constant(np.nan)
    defs = [tf.constant(np.nan)] * n_fields
    
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    
    return x, y

parse_csv_line(b'-0.9868720801669367,0.832863080552588,-0.18684708416901633,-0.14888949288707784,-0.4532302419670616,-0.11504995754593579,1.6730974284189664,-0.7465496877362412,1.138',
               n_fields=9)
    

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([-0.9868721 ,  0.8328631 , -0.18684709, -0.1488895 , -0.45323023,
        -0.11504996,  1.6730974 , -0.74654967], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.138], dtype=float32)>)

In [34]:
# 读取数据的完整流程

# 传入的参数：文件名，读取数，每批次数据量，进行解析字段数，混洗缓存的大小
def csv_read_dataset(filenames, n_readers=5,
                    batch_size=32, n_parse_threads=5,
                    shuffle_buffer_size=10000):
    
    # 将文件名组成一个新的数据集
    dataset = tf.data.Dataset.list_files(filenames)
    
    # 设置数据集可重复获取
    dataset = dataset.repeat()
    
    # 提取文件中的内容作为新的数据集
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    
    # .map()对数据集进行映射，应用函数
    dataset = dataset.map(parse_csv_line, num_parallel_calls=n_parse_threads)
    
    # 使用.batch()对数据集进行分批
    dataset = dataset.batch(batch_size)
    
    return dataset
    
train_set = csv_read_dataset(train_filenames, batch_size=3)

for x_batch, y_batch in train_set.take(2):
    print('x:')
    pprint.pprint(x_batch)
    print('y:')
    pprint.pprint(y_batch)

x:
<tf.Tensor: shape=(3, 8), dtype=float32, numpy=
array([[-0.39305964, -1.4012151 ,  1.2666081 ,  1.4297681 , -0.4375974 ,
        -0.15971991, -1.1331286 ,  1.2199932 ],
       [-0.32813874, -1.0044649 , -0.17411777, -0.0518021 , -0.17902175,
        -0.07903732, -1.3064507 ,  1.3694278 ],
       [-0.15503371,  0.26513577,  0.18460795, -0.15213478, -0.49319556,
        -0.07645705,  2.4082618 , -2.26183   ]], dtype=float32)>
y:
<tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[2.813],
       [2.021],
       [0.928]], dtype=float32)>
x:
<tf.Tensor: shape=(3, 8), dtype=float32, numpy=
array([[-0.19443506,  0.3444858 , -0.25381047, -0.22113402,  0.32665688,
        -0.04924278, -1.3626633 ,  1.2498802 ],
       [ 0.46197587, -0.29031453,  0.19074658, -0.20333153, -0.29021809,
         0.01428765, -0.92233163,  0.81153905],
       [ 0.41469425, -0.13161445, -0.24779166, -0.11114632, -0.27609792,
        -0.09067344, -0.8333284 ,  0.6023308 ]], dtype=float32)>
y:
<tf.Tensor: shape=(

In [35]:
batch_size = 32
train_set = csv_read_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_read_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_read_dataset(test_filenames,
                              batch_size = batch_size)

In [36]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',
                       input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(train_set,
                    validation_data = valid_set,
                    steps_per_epoch = 11160 // batch_size, #每步训练集数目
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100


In [37]:
model.evaluate(test_set, steps = 5160 // batch_size)



0.4229345917701721