In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import sklearn
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
import os
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

In [2]:
housing = fetch_california_housing()

In [3]:
X, y = housing.data, housing.target
X_train_all, X_test, y_train_all, y_test = train_test_split(X, y, random_state=7)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_all, y_train_all, random_state=11)

In [4]:
stand_scale = StandardScaler()

x_train_scale = stand_scale.fit_transform(X_train)
x_valid_scale = stand_scale.transform(X_valid)
x_test_scale = stand_scale.transform(X_test)

In [5]:
print(x_train_scale.shape, y_train.shape)
print(x_valid_scale.shape, y_test.shape)
print(x_test_scale.shape, y_valid.shape)

(11610, 8) (11610,)
(3870, 8) (5160,)
(5160, 8) (3870,)


In [6]:
len(x_train_scale)

11610

In [7]:
output_dir = "generate_csv"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# output_dir:处理文件路径， data：出力数据源
# header：头文件信息， name_prefix：处理文件命名用
# n_parts：将data拆分成多少份
def save_to_csv(output_dir, data, header, name_prefix, n_parts):
    
    # 定义出力文件格式
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")
    file_names = []
    
    # 将data中的样本数据拆分成n_parts份
    # file_idx:00, 01, 02, 03.....
    # row_indices = [0, 1, ...19],[20, 21,...]
    for file_idx, row_indices in enumerate(np.array_split(np.arange(len(data)), n_parts)):
        # 生成文件名
        file_name = path_format.format(name_prefix, file_idx)
        file_names.append(file_name)
        
        # 写文件
        with open(file_name, mode="w") as f:
            # 生成头文件
            if (header is not None):
                f.write(header + "\n")
            
            # 生成数据
            for row_indice in row_indices:
                f.write(",".join([repr(col) for col in data[row_indice]]))
                f.write("\n")
    
    return file_names

In [8]:
# 合并数据
train_data = np.c_[x_train_scale, y_train]
valid_data = np.c_[x_valid_scale, y_valid]
test_data = np.c_[x_test_scale, y_test]

housing_header = housing.feature_names
housing_header.append("MiddlePrice")
header_str = ",".join(housing_header)

In [9]:
header_str

'MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MiddlePrice'

In [10]:
# 11610/20=581, 一个csv文件581条数据
train_filenames = save_to_csv(output_dir, train_data, header_str, "train", n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, header_str, "valid", n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, header_str, "test", n_parts=10)

### 读取文件

In [11]:
filename_dataset = tf.data.Dataset.list_files(train_filenames)

In [12]:
for filename in filename_dataset:
    print(filename)

tf.Tensor(b'generate_csv\\train_08.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_03.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_15.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_10.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_06.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_07.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_11.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_05.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_13.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_02.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_01.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_19.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_18.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_12.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_00.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\

In [13]:
dataset = filename_dataset.interleave(
    lambda filename:tf.data.TextLineDataset(filename).skip(1),
    cycle_length=15)

In [14]:
for line in dataset.take(15):
    print(line.numpy())

b'0.801544314532886,0.27216142415910205,-0.11624392696666119,-0.2023115137272354,-0.5430515742518128,-0.021039615516440048,-0.5897620622908205,-0.08241845654707416,3.226'
b'-1.0591781535672364,1.393564736946074,-0.026331968874673636,-0.11006759528831847,-0.6138198966579805,-0.09695934953589447,0.3247131133362288,-0.037477245413977976,0.672'
b'-1.0775077698160966,-0.44874070548966555,-0.5680568205591913,-0.14269262164909954,-0.09666677138213985,0.12326468238687088,-0.3144863716683942,-0.4818958888413162,0.978'
b'-0.32652634129448693,0.43236189741438374,-0.09345459539684739,-0.08402991822890092,0.8460035745154013,-0.0266316482653991,-0.5617679242614233,0.1422875991184281,2.431'
b'0.4853051504718848,-0.8492418886278699,-0.06530126513877861,-0.023379656040017353,1.4974350551260218,-0.07790657783453239,-0.9023632702857819,0.7814514907892068,2.956'
b'0.6363646332204844,-1.0895425985107923,0.09260902815633619,-0.20538124656801682,1.2025670451003232,-0.03630122549633783,-0.6784101660505877,0.1

In [15]:
sample_records = "1,2,3,4,5"
record_defaults = [tf.constant(0)] * 5

# tf.io.decode_csv => tf.Tensor
# record_defaults.size = sample_records.size
tf.io.decode_csv(
    sample_records,
    record_defaults)

[<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=int32, numpy=2>,
 <tf.Tensor: shape=(), dtype=int32, numpy=3>,
 <tf.Tensor: shape=(), dtype=int32, numpy=4>,
 <tf.Tensor: shape=(), dtype=int32, numpy=5>]

In [16]:
sample_records = "1,2,3,4,5"
# int,float,str,float,float
record_defaults = [
    tf.constant(0),
    tf.constant(1.0),
    "str",
    np.nan,
    1.0
]

tf.io.decode_csv(
    sample_records,
    record_defaults)

[<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=float32, numpy=2.0>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'3'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=4.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

In [17]:
# 数量必须一致
try:
    tf.io.decode_csv(",,,", record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 4 in record 0 [Op:DecodeCSV]


In [18]:
# 解析单条记录
def parse_to_csv(records, n_records=9):
    dfs = [np.nan] * n_records
    dataset = tf.io.decode_csv(records, dfs)
    x = tf.stack(dataset[0:-1])  # tf.stack=>多个tensor合并为一个
    y = dataset[-1]
    return x, y

In [19]:
parse_to_csv(b'-0.09719300311107498,-1.249743071766074,0.36232962250170797,0.026906080250728295,1.033811814747154,0.045881586971778555,1.3418334617377423,-1.6353869745909178,1.832',
            9)

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([-0.097193  , -1.2497431 ,  0.36232963,  0.02690608,  1.0338118 ,
         0.04588159,  1.3418335 , -1.635387  ], dtype=float32)>,
 <tf.Tensor: shape=(), dtype=float32, numpy=1.832>)

In [20]:
# 1.filename => dataset
# 2.read_file => dataset => datasets => merge
# 3.parse csv

def csv_reader_dataset(filenames, batch_size):
    # 读入文件名
    dataset = tf.data.Dataset.list_files(filenames)
    print("1:")
    print(dataset)
    
    # 读入文件中的数据
    dataset = dataset.interleave(
        lambda filename:tf.data.TextLineDataset(filename).skip(1),
        cycle_length = 5)
    print("2:")
    print(dataset)
    
    # 对文件中的数据转换
    dataset = dataset.map(parse_to_csv)
    print("3:")
    print(dataset)
    
    # 生成batch_size的文件
    dataset = dataset.batch(batch_size)
    print("4:")
    print(dataset)
    
    return dataset

In [21]:
tf.data.Dataset.list_files(train_filenames)

<ShuffleDataset shapes: (), types: tf.string>

In [22]:
import pprint
pprint.pprint(train_filenames)

['generate_csv\\train_00.csv',
 'generate_csv\\train_01.csv',
 'generate_csv\\train_02.csv',
 'generate_csv\\train_03.csv',
 'generate_csv\\train_04.csv',
 'generate_csv\\train_05.csv',
 'generate_csv\\train_06.csv',
 'generate_csv\\train_07.csv',
 'generate_csv\\train_08.csv',
 'generate_csv\\train_09.csv',
 'generate_csv\\train_10.csv',
 'generate_csv\\train_11.csv',
 'generate_csv\\train_12.csv',
 'generate_csv\\train_13.csv',
 'generate_csv\\train_14.csv',
 'generate_csv\\train_15.csv',
 'generate_csv\\train_16.csv',
 'generate_csv\\train_17.csv',
 'generate_csv\\train_18.csv',
 'generate_csv\\train_19.csv']


In [26]:
train_set = csv_reader_dataset(train_filenames, batch_size=3)
print(train_set)

# 取两组数据，一组有三个
for x_train, y_train in train_set.take(2):
    print(x_train)
    print(y_train)

1:
<ShuffleDataset shapes: (), types: tf.string>
2:
<InterleaveDataset shapes: (), types: tf.string>
3:
<MapDataset shapes: ((8,), ()), types: (tf.float32, tf.float32)>
4:
<BatchDataset shapes: ((None, 8), (None,)), types: (tf.float32, tf.float32)>
<BatchDataset shapes: ((None, 8), (None,)), types: (tf.float32, tf.float32)>
tf.Tensor(
[[ 0.81150836 -0.04823952  0.5187339  -0.0293864  -0.03406402 -0.05081595
  -0.7157357   0.91627514]
 [-1.1157656   0.99306357 -0.334192   -0.06535219 -0.32893205  0.04343066
  -0.12785879  0.30707204]
 [ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]], shape=(3, 8), dtype=float32)
tf.Tensor([2.147 0.524 2.419], shape=(3,), dtype=float32)
tf.Tensor(
[[ 0.48530516 -0.8492419  -0.06530126 -0.02337966  1.4974351  -0.07790658
  -0.90236324  0.78145146]
 [-1.119975   -1.3298433   0.14190045  0.4658137  -0.10301778 -0.10744184
  -0.7950524   1.5304717 ]
 [-0.69061434 -0.12833975  7.020181    5.6242876  -0.26632

In [24]:
train_set = csv_reader_dataset(train_filenames, batch_size=32)
valid_set = csv_reader_dataset(valid_filenames, batch_size=32)
test_set = csv_reader_dataset(test_filenames, batch_size=32)

1:
<ShuffleDataset shapes: (), types: tf.string>
2:
<InterleaveDataset shapes: (), types: tf.string>
3:
<MapDataset shapes: ((8,), ()), types: (tf.float32, tf.float32)>
4:
<BatchDataset shapes: ((None, 8), (None,)), types: (tf.float32, tf.float32)>
1:
<ShuffleDataset shapes: (), types: tf.string>
2:
<InterleaveDataset shapes: (), types: tf.string>
3:
<MapDataset shapes: ((8,), ()), types: (tf.float32, tf.float32)>
4:
<BatchDataset shapes: ((None, 8), (None,)), types: (tf.float32, tf.float32)>
1:
<ShuffleDataset shapes: (), types: tf.string>
2:
<InterleaveDataset shapes: (), types: tf.string>
3:
<MapDataset shapes: ((8,), ()), types: (tf.float32, tf.float32)>
4:
<BatchDataset shapes: ((None, 8), (None,)), types: (tf.float32, tf.float32)>


In [25]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu', input_shape=[8]),
    keras.layers.Dense(1),
])

model.compile(loss="mean_squared_error", optimizer=tf.keras.optimizers.SGD(0.001))
callbacks = [keras.callbacks.EarlyStopping(patience=5, min_delta=1e-2)]

batch_size= 32
history = model.fit(train_set.repeat(),   # 加上repeat
                    validation_data = valid_set,
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
