In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl,np,pd,sklearn,tf,keras:
    print(module.__name__,module.__version__)

2.0.0
sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)
matplotlib 3.1.1
numpy 1.16.5
pandas 0.25.2
sklearn 0.21.3
tensorflow 2.0.0
tensorflow_core.keras 2.2.4-tf


In [2]:
source_dir = './generate_csv/'
print(os.listdir(source_dir))

# 读取文件名
def get_filenames_by_prefix(source_dir,prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir,filename))
    return results

train_filenames = get_filenames_by_prefix(source_dir,'train')
valid_filenames = get_filenames_by_prefix(source_dir,'valid')
test_filenames = get_filenames_by_prefix(source_dir,'test')

import pprint
pprint.pprint(train_filenames)
pprint.pprint(valid_filenames)
pprint.pprint(test_filenames)

['test_00.csv', 'test_01.csv', 'test_02.csv', 'test_03.csv', 'test_04.csv', 'test_05.csv', 'test_06.csv', 'test_07.csv', 'test_08.csv', 'test_09.csv', 'train_00.csv', 'train_01.csv', 'train_02.csv', 'train_03.csv', 'train_04.csv', 'train_05.csv', 'train_06.csv', 'train_07.csv', 'train_08.csv', 'train_09.csv', 'train_10.csv', 'train_11.csv', 'train_12.csv', 'train_13.csv', 'train_14.csv', 'train_15.csv', 'train_16.csv', 'train_17.csv', 'train_18.csv', 'train_19.csv', 'valid_00.csv', 'valid_01.csv', 'valid_02.csv', 'valid_03.csv', 'valid_04.csv', 'valid_05.csv', 'valid_06.csv', 'valid_07.csv', 'valid_08.csv', 'valid_09.csv']
['./generate_csv/train_00.csv',
 './generate_csv/train_01.csv',
 './generate_csv/train_02.csv',
 './generate_csv/train_03.csv',
 './generate_csv/train_04.csv',
 './generate_csv/train_05.csv',
 './generate_csv/train_06.csv',
 './generate_csv/train_07.csv',
 './generate_csv/train_08.csv',
 './generate_csv/train_09.csv',
 './generate_csv/train_10.csv',
 './generate_csv/

In [None]:
# 以下三步从csv中读取数据

In [3]:
# 解析dataset 中的一行

def parse_csv_line(line,n_fields=9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line,record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    
    return x,y

In [6]:
def csv_reader_dataset(filenames,n_readers=5,batch_size=32,n_parse_threads=5,
                      shuffle_buffer_size=10000):
    
    # 1. filename -> dataset
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat() # 重复多少次
    # 文件名转换成文本内容
    dataset = dataset.interleave(
            lambda filename:tf.data.TextLineDataset(filename).skip(1),
            cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    # 解析
    dataset = dataset.map(parse_csv_line,
                         num_parallel_calls=n_parse_threads)
    
    dataset = dataset.batch(batch_size)
    return dataset

In [12]:
batch_size = 32
train_set = csv_reader_dataset(train_filenames,batch_size=batch_size)
valid_set = csv_reader_dataset(valid_filenames,batch_size=batch_size)
test_set = csv_reader_dataset(test_filenames,batch_size=batch_size)

In [25]:
# 将数据写入tf.record

def serialize_example(x,y):
    """ Converts x,y to tf.train.Example eand serialize"""
    input_features = tf.train.FloatList(value = x)
    label = tf.train.FloatList(value = y)
    # 转成feature
    features = tf.train.Features(
        feature = {
            "input_features":tf.train.Feature(float_list = input_features),
            "label":tf.train.Feature(float_list = label)
        }
    )
    # 转成example
    example = tf.train.Example(features = features)
    return example.SerializeToString() # 序列化

def csv_dataset_to_tfrecords(base_filename,dataset,n_shards,steps_per_shard,compression_type=None):
    options = tf.io.TFRecordOptions(compression_type = compression_type)
    all_filenames = []
    for shard_id in range(n_shards):
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(base_filename,shard_id,n_shards)
        with tf.io.TFRecordWriter(filename_fullpath,options) as writer:
            for x_batch,y_batch in dataset.take(steps_per_shard):
                for x_example,y_example in zip(x_batch,y_batch):
                    writer.write(serialize_example(x_example,y_example))
        all_filenames.append(filename_fullpath)
        
    return all_filenames

In [26]:
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = "generate_tfrecords"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
train_basename = os.path.join(output_dir,'train')
valid_basename = os.path.join(output_dir,'valid')
test_basename = os.path.join(output_dir,'test')

train_tfrecord_filenmaes = csv_dataset_to_tfrecords(train_basename,train_set,n_shards,train_steps_per_shard,None)
valid_tfrecord_filenames = csv_dataset_to_tfrecords(valid_basename,valid_set,n_shards,valid_steps_per_shard,None)
test_tfrecord_filenames = csv_dataset_to_tfrecords(test_basename,test_set,n_shards,valid_steps_per_shard,None)

In [27]:
# 生成压缩文件
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = "generate_tfrecords_zip"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
train_basename = os.path.join(output_dir,'train')
valid_basename = os.path.join(output_dir,'valid')
test_basename = os.path.join(output_dir,'test')

train_tfrecord_filenmaes = csv_dataset_to_tfrecords(train_basename,train_set,n_shards,train_steps_per_shard,compression_type="GZIP")
valid_tfrecord_filenames = csv_dataset_to_tfrecords(valid_basename,valid_set,n_shards,valid_steps_per_shard,compression_type="GZIP")
test_tfrecord_filenames = csv_dataset_to_tfrecords(test_basename,test_set,n_shards,valid_steps_per_shard,compression_type="GZIP")

In [28]:
# 读取tfrecord文件与tf.keras结合使用
# 打印文件名
pprint.pprint(train_tfrecord_filenmaes)
pprint.pprint(valid_tfrecord_filenames)
pprint.pprint(test_tfrecord_filenames)

['generate_tfrecords_zip\\train_00000-of-00020',
 'generate_tfrecords_zip\\train_00001-of-00020',
 'generate_tfrecords_zip\\train_00002-of-00020',
 'generate_tfrecords_zip\\train_00003-of-00020',
 'generate_tfrecords_zip\\train_00004-of-00020',
 'generate_tfrecords_zip\\train_00005-of-00020',
 'generate_tfrecords_zip\\train_00006-of-00020',
 'generate_tfrecords_zip\\train_00007-of-00020',
 'generate_tfrecords_zip\\train_00008-of-00020',
 'generate_tfrecords_zip\\train_00009-of-00020',
 'generate_tfrecords_zip\\train_00010-of-00020',
 'generate_tfrecords_zip\\train_00011-of-00020',
 'generate_tfrecords_zip\\train_00012-of-00020',
 'generate_tfrecords_zip\\train_00013-of-00020',
 'generate_tfrecords_zip\\train_00014-of-00020',
 'generate_tfrecords_zip\\train_00015-of-00020',
 'generate_tfrecords_zip\\train_00016-of-00020',
 'generate_tfrecords_zip\\train_00017-of-00020',
 'generate_tfrecords_zip\\train_00018-of-00020',
 'generate_tfrecords_zip\\train_00019-of-00020']
['generate_tfrecords

In [32]:
expected_features = {
    "input_features":tf.io.FixedLenFeature([8],dtype=tf.float32),
    "label":tf.io.FixedLenFeature([1],dtype=tf.float32)
}

def parse_example(serialized_example):
    example = tf.io.parse_single_example(serialized_example,expected_features)
    return example["input_features"],example["label"]


def tfrecords_reader_dataset(filenames,n_readers=5,batch_size=32,n_parse_threads=5,shuffle_buffer_size=10000):
    
    # 1. filename -> dataset
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat() # 重复多少次
    # 文件名转换成文本内容
    dataset = dataset.interleave(
            lambda filename:tf.data.TFRecordDataset(filename,compression_type="GZIP"),
            cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    # 解析
    dataset = dataset.map(parse_example,
                         num_parallel_calls=n_parse_threads)
    
    dataset = dataset.batch(batch_size)
    return dataset

In [33]:
# 测试
tfrecords_train = tfrecords_reader_dataset(train_tfrecord_filenmaes,batch_size=3)
for x_batch,y_batch in tfrecords_train.take(2):
    print(x_batch)
    print(y_batch)

tf.Tensor(
[[-0.097193   -1.2497431   0.36232963  0.02690608  1.0338118   0.04588159
   1.3418335  -1.635387  ]
 [-0.097193   -1.2497431   0.36232963  0.02690608  1.0338118   0.04588159
   1.3418335  -1.635387  ]
 [ 0.48530516 -0.8492419  -0.06530126 -0.02337966  1.4974351  -0.07790658
  -0.90236324  0.78145146]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[1.832]
 [1.832]
 [2.956]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-0.66722274 -0.04823952  0.34529406  0.53826684  1.8521839  -0.06112538
  -0.8417093   1.5204847 ]
 [-1.119975   -1.3298433   0.14190045  0.4658137  -0.10301778 -0.10744184
  -0.7950524   1.5304717 ]
 [ 0.04326301 -1.0895426  -0.38878718 -0.10789865 -0.68186635 -0.0723871
  -0.8883662   0.8213992 ]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[1.59 ]
 [0.66 ]
 [1.426]], shape=(3, 1), dtype=float32)


In [34]:
batch_size = 32
tfrecords_train_set = tfrecords_reader_dataset(train_tfrecord_filenmaes,batch_size=batch_size)
tfrecords_valid_set = tfrecords_reader_dataset(valid_tfrecord_filenames,batch_size=batch_size)
tfrecords_test_set = tfrecords_reader_dataset(test_tfrecord_filenames,batch_size=batch_size)

In [35]:
# 搭建模型
model = keras.models.Sequential([
    keras.layers.Dense(30,activation='relu',input_shape=[8]),
    keras.layers.Dense(1),
    
])

# 打印model信息
model.summary()
# 编译
model.compile(loss='mean_squared_error',optimizer="sgd")
# 回调函数
callbacks = [keras.callbacks.EarlyStopping(patience=5,min_delta=1e-3)]

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 30)                270       
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 31        
Total params: 301
Trainable params: 301
Non-trainable params: 0
_________________________________________________________________


In [36]:
#训练
history = model.fit(tfrecords_train_set,validation_data=tfrecords_valid_set,
                    steps_per_epoch=11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    epochs=100,callbacks=callbacks)

Train for 348 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100


In [38]:
model.evaluate(tfrecords_test_set,steps = 5160 // batch_size)



0.3182034741462388