In [1]:
import matplotlib as mlp
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras 
import warnings

warnings.filterwarnings('ignore')
print(tf.__version__)
for model in sklearn, pd, keras, np ,mlp:
    print(model.__name__, model.__version__)

2.1.0
sklearn 0.20.2
pandas 0.24.2
tensorflow_core.python.keras.api._v2.keras 2.2.4-tf
numpy 1.17.4
matplotlib 2.1.2


In [2]:
source_dir = "./generate_csv"
def get_filenames_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir, filename))
    return results
train_filenames = get_filenames_by_prefix(source_dir, "train")
valid_filenames = get_filenames_by_prefix(source_dir, "valid")
test_filenames = get_filenames_by_prefix(source_dir, "test")

import pprint
pprint.pprint(train_filenames)
pprint.pprint(test_filenames)

['./generate_csv/train_15.csv',
 './generate_csv/train_01.csv',
 './generate_csv/train_00.csv',
 './generate_csv/train_14.csv',
 './generate_csv/train_02.csv',
 './generate_csv/train_16.csv',
 './generate_csv/train_17.csv',
 './generate_csv/train_03.csv',
 './generate_csv/train_07.csv',
 './generate_csv/train_13.csv',
 './generate_csv/train_12.csv',
 './generate_csv/train_06.csv',
 './generate_csv/train_10.csv',
 './generate_csv/train_04.csv',
 './generate_csv/train_05.csv',
 './generate_csv/train_11.csv',
 './generate_csv/train_08.csv',
 './generate_csv/train_09.csv',
 './generate_csv/train_19.csv',
 './generate_csv/train_18.csv']
['./generate_csv/test_08.csv',
 './generate_csv/test_09.csv',
 './generate_csv/test_02.csv',
 './generate_csv/test_03.csv',
 './generate_csv/test_01.csv',
 './generate_csv/test_00.csv',
 './generate_csv/test_04.csv',
 './generate_csv/test_05.csv',
 './generate_csv/test_07.csv',
 './generate_csv/test_06.csv']


In [3]:
# 从CSV文件中，先读取出来
# 按行进行解析
def parse_csv_line(line, n_files=9):
    defs = [tf.constant(np.nan)] * n_files
    parsed_files = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_files[0: -1])
    y = tf.stack(parsed_files[-1:])
    return x, y

# 将整个过程整合成一个函数
def csv_reader_dateset(filenames, n_reader=5, batch_size=32, n_parse_threads=5, shuffle_buffer_size=10000):
    # 将文件名字进行合并
    dataset = tf.data.Dataset.list_files(filenames)
    # 对数据集进行无限次重复
    dataset = dataset.repeat()
    # 按行读取每个文件，并跳过第一行，将读取的数据，合并成一个数据集
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length=n_reader
    )
    # shuffle
    dataset.shuffle(shuffle_buffer_size)
    # 将字符串转化成需要的类型（Tensor）,将数据中每行数据传给'parse_csv_line'，并行处理
    dataset = dataset.map(parse_csv_line, num_parallel_calls=n_parse_threads)
    # 按batch 生成数据
    dataset = dataset.batch(batch_size)
    return dataset

batch_size = 32
train_set = csv_reader_dateset(filenames=train_filenames, batch_size=batch_size)
valid_set = csv_reader_dateset(filenames=valid_filenames, batch_size=batch_size)
test_set = csv_reader_dateset(filenames=test_filenames, batch_size=batch_size)

In [4]:
# 将一组数据转化成tfrecord并序列化
def serialize_example(x, y):
    """Converts to x, y to tf.train.Example and Serialize"""
    input_features = tf.train.FloatList(value=x)
    lables = tf.train.FloatList(value=y)
    features = tf.train.Features(
        feature = {
            "input_features": tf.train.Feature(float_list=input_features),
            "lables": tf.train.Feature(float_list=lables)
        }
    )
    example = tf.train.Example(features=features)
    return example.SerializeToString()


def csv_data_to_tfrecord(base_filename, dataset, n_shards, steps_per_shard, compression_type=None):
    """
    将从CSV读取的数据保存成tfrecord文件
    :param base_filename: 基础文件名
    :param dataset: 数据集
    :param n_shards: 将数据存成多少个文件
    :param steps_per_shard: 每个文件中要存储多少数据
    :param compression_type: 文件压缩类型
    :return: 所有文件的名字
    """
    options = tf.io.TFRecordOptions(compression_type=compression_type)
    all_filenames = []
    for shard_id in range(n_shards):
        filename_fullpath = "{}_{:05d}-of-{:05d}".format(base_filename, shard_id, n_shards)
        with tf.io.TFRecordWriter(path=filename_fullpath, options=options) as writer:
            for x_batch, y_batch in dataset.take(steps_per_shard):
                for x_example, y_example in zip(x_batch, y_batch):
                    writer.write(serialize_example(x_example, y_example))
        all_filenames.append(filename_fullpath)
        
    return all_filenames
                    

In [5]:
# 调用函数进行存储
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards
output_dir = "generate_tfrecords"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")
train_record_filenames = csv_data_to_tfrecord(train_basename, train_set, n_shards,
                                              steps_per_shard=train_steps_per_shard,compression_type=None)
valid_record_filenames = csv_data_to_tfrecord(valid_basename, valid_set, n_shards,
                                              steps_per_shard=valid_steps_per_shard)
test_record_filenames = csv_data_to_tfrecord(test_basename, test_set, n_shards,
                                             steps_per_shard=test_steps_per_shard)

In [None]:
# 调用函数进行存储
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards
output_dir = "generate_tfrecords_zip"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")
train_record_filenames = csv_data_to_tfrecord(train_basename, train_set, n_shards,
                                              steps_per_shard=train_steps_per_shard, compression_type='GZIP')
valid_record_filenames = csv_data_to_tfrecord(valid_basename, valid_set, n_shards,
                                              steps_per_shard=valid_steps_per_shard, compression_type='GZIP')
test_record_filenames = csv_data_to_tfrecord(test_basename, test_set, n_shards,
                                             steps_per_shard=test_steps_per_shard, compression_type='GZIP')

In [6]:
print(train_record_filenames)

['generate_tfrecords/train_00000-of-00020', 'generate_tfrecords/train_00001-of-00020', 'generate_tfrecords/train_00002-of-00020', 'generate_tfrecords/train_00003-of-00020', 'generate_tfrecords/train_00004-of-00020', 'generate_tfrecords/train_00005-of-00020', 'generate_tfrecords/train_00006-of-00020', 'generate_tfrecords/train_00007-of-00020', 'generate_tfrecords/train_00008-of-00020', 'generate_tfrecords/train_00009-of-00020', 'generate_tfrecords/train_00010-of-00020', 'generate_tfrecords/train_00011-of-00020', 'generate_tfrecords/train_00012-of-00020', 'generate_tfrecords/train_00013-of-00020', 'generate_tfrecords/train_00014-of-00020', 'generate_tfrecords/train_00015-of-00020', 'generate_tfrecords/train_00016-of-00020', 'generate_tfrecords/train_00017-of-00020', 'generate_tfrecords/train_00018-of-00020', 'generate_tfrecords/train_00019-of-00020']


In [7]:
# 文件读取
expect_features = {
    "input_features": tf.io.FixedLenFeature([8], dtype=tf.float32),
    "label": tf.io.FixedLenFeature([1], dtype=tf.float32)
}
def parse_example(serialize_example):
    exmple = tf.io.parse_single_example(serialize_example, expect_features)
    return exmple["input_features"], exmple["label"]

def tfrecords_reader_dateset(filenames, n_reader=5, batch_size=32, n_parse_threads=5, shuffle_buffer_size=10000):
    # 将文件名字进行合并
    dataset = tf.data.Dataset.list_files(filenames)
    # 对数据集进行无限次重复
    dataset = dataset.repeat()
    # 按行读取每个文件，并跳过第一行，将读取的数据，合并成一个数据集
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename),
        cycle_length=n_reader
    )
    # shuffle
    dataset.shuffle(shuffle_buffer_size)
    # 将字符串转化成需要的类型（Tensor）,将数据中每行数据传给'parse_csv_line'，并行处理
    dataset = dataset.map(parse_example, num_parallel_calls=n_parse_threads)
    # 按batch 生成数据
    dataset = dataset.batch(batch_size)
    return dataset

tfrecords_train = tfrecords_reader_dateset(train_record_filenames,batch_size=30)


In [8]:
batch_size = 32
tfrecords_train = tfrecords_reader_dateset(train_record_filenames, batch_size=batch_size)
tfrecords_valid = tfrecords_reader_dateset(valid_record_filenames, batch_size=batch_size)
tfrecords_test = tfrecords_reader_dateset(test_record_filenames, batch_size=batch_size)

In [10]:
# tf.keras.models.Sequential()
# version dnn, 多层神经网络，循环添加层次。
model = tf.keras.models.Sequential([
    keras.layers.Dense(30,activation='relu',input_shape=[8]),
    keras.layers.Dense(1),
])

# sparse_categorical_crossentropy:  如果y是一个数，会把它转化成一个向量，如 y->index., y->one hot->[]
model.compile(loss='mean_squared_error',
              optimizer='adam')

callbacks = [keras.callbacks.EarlyStopping(patience=5, min_delta=1e-3)]
history = model.fit(train_set, 
                    epochs=100,
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    validation_data=valid_set,
                    callbacks=callbacks)

Train for 348 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
