<a href="https://colab.research.google.com/github/mingmingbupt/tensorflow/blob/master/tf_data_generate_tfrecord.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

2.2.0-rc2
sys.version_info(major=3, minor=6, micro=9, releaselevel='final', serial=0)
matplotlib 3.2.1
numpy 1.18.2
pandas 1.0.3
sklearn 0.22.2.post1
tensorflow 2.2.0-rc2
tensorflow.keras 2.3.0-tf


In [6]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

Downloading Cal. housing from https://ndownloader.figshare.com/files/5976036 to /root/scikit_learn_data


In [8]:
from sklearn.model_selection import train_test_split

x_train_all, x_test, y_train_all, y_test = train_test_split(
    housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(
    x_train_all, y_train_all, random_state = 11)
print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)

(11610, 8) (11610,)
(3870, 8) (3870,)
(5160, 8) (5160,)


In [0]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

In [0]:
#我们把scale后的数据呢，生产csv文件
output_dir = "generate_csv" #定义一个文件夹，把生成的文件放到这个文件夹下
if not os.path.exists(output_dir): #如果不存在文件夹的话，就用os.mkdir去创建这个文件夹
    os.mkdir(output_dir)

def save_to_csv(output_dir, data, name_prefix,
                header=None, n_parts=10): #把一个单独的dataset,train/valid/test,保存到文件当中去
                # output_dir： 输出文件夹，
                # data ： dataset
                # name_prefix： 因为我们要分别给train/valid/test分别生成csv文件，所以需要name_prefix去做区分
                # header： 对于csv文件呢，可能需要一个header,这里默认是None
                # n_parts: 我们的datesets是可以把多个文件去merge起来形成一个数据集的，
                #      所以说呢，为了测试这个场景，我们需要把我们的数据集去切分成多个文件来进行存储，这里默认是10个文件
    path_format = os.path.join(output_dir, "{}_{:02d}.csv") # 生成文件名 第一个{}里面天train,test还是valid. 第二个中括号填的是两位的一个整数
    # 如果是一位的一个整数，就需要补全一位，这里填每个n_parts的具体数字
    filenames = [] #最后会返回所有的文件名
    
    # enumerate就是给每一组标记一个值，也就是说每一组呢，我们可以通过这个row_indices获得，它标记的值呢，就是file_idx
    # 就是一个整数，这个整数可以用来当成一个file_id
    for file_idx, row_indices in enumerate(
        #我用一个np.arange生成了一个和data一样长度的数组，也就是说我data里面有n个元素，那我们这个数组里面也有n个元素
        #它的元素值就是0到n-1，用这个数值当索引。我们再把当索引的这个数值，分成了n_parts个部分
        #有了这n_parts部分以后，用每一组里面的索引去data里面去取元素，从而获得数据
        np.array_split(np.arange(len(data)), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx) #生成对应的子文件名
        filenames.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header + "\n")
            for row_index in row_indices: #行索引
                f.write(",".join(
                    [repr(col) for col in data[row_index]])) #用逗号分割，需要做字符串化处理
                f.write('\n')
    return filenames

train_data = np.c_[x_train_scaled, y_train]  # np.c_把数据按行merge
valid_data = np.c_[x_valid_scaled, y_valid]  
test_data = np.c_[x_test_scaled, y_test]
# 这样就把train valid test数据merge起来了
# 然后生成header,从housing数据集获得，因为我们把y也拼接起来了，所以也要加个y的名字MidianHouseValue
header_cols = housing.feature_names + ["MidianHouseValue"]
header_str = ",".join(header_cols) # 用逗号连接起来

train_filenames = save_to_csv(output_dir, train_data, "train",
                              header_str, n_parts=20) 
valid_filenames = save_to_csv(output_dir, valid_data, "valid",
                              header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",
                             header_str, n_parts=10)

In [19]:
#代码从这里开始
source_dir = "./generate_csv/" #csv的文件夹放到这下面
print(os.listdir(source_dir))
# 首先获取train test valid 的文件list
# 定义一个简单的函数来讲这40个文件进行区分，区分就是利用前缀进行划分
def get_filenames_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir, filename))
    return results

train_filenames = get_filenames_by_prefix(source_dir, "train")
valid_filenames = get_filenames_by_prefix(source_dir, "valid")
test_filenames = get_filenames_by_prefix(source_dir, "test")

import pprint
pprint.pprint(train_filenames)
pprint.pprint(valid_filenames)
pprint.pprint(test_filenames)


['train_08.csv', 'train_02.csv', 'test_01.csv', 'train_03.csv', 'train_09.csv', 'test_05.csv', 'test_03.csv', 'test_00.csv', 'valid_07.csv', 'train_15.csv', 'valid_03.csv', 'valid_00.csv', 'valid_08.csv', 'valid_09.csv', 'test_06.csv', 'train_16.csv', 'valid_02.csv', 'train_10.csv', 'test_04.csv', 'train_19.csv', 'train_17.csv', 'train_00.csv', 'train_13.csv', 'train_06.csv', 'train_07.csv', 'valid_05.csv', 'train_01.csv', 'train_14.csv', 'valid_06.csv', 'test_07.csv', 'test_02.csv', 'test_08.csv', 'valid_01.csv', 'test_09.csv', 'train_05.csv', 'train_18.csv', 'train_04.csv', 'valid_04.csv', 'train_11.csv', 'train_12.csv']
['./generate_csv/train_08.csv',
 './generate_csv/train_02.csv',
 './generate_csv/train_03.csv',
 './generate_csv/train_09.csv',
 './generate_csv/train_15.csv',
 './generate_csv/train_16.csv',
 './generate_csv/train_10.csv',
 './generate_csv/train_19.csv',
 './generate_csv/train_17.csv',
 './generate_csv/train_00.csv',
 './generate_csv/train_13.csv',
 './generate_csv/

In [0]:
def parse_csv_line(line, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

def csv_reader_dataset(filenames, n_readers=5,
                       batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)
# 这样就从csv文件中获取了训练集，验证集，测试集
# 接着对这三个文件进行遍历，把这三个文件写到tfrecord中去

In [0]:
#用来将一组样本转成tf.train.Example格式，并将它序列化
def serialize_example(x, y):
    """Converts x, y to tf.train.Example and serialize"""
    input_feautres = tf.train.FloatList(value = x) #x的类型
    label = tf.train.FloatList(value = y) #y的类型
    features = tf.train.Features( #构建features
        feature = { #构建feature, feature是一个字典，key就是变量的名字，value就是tf.train.Feature
            "input_features": tf.train.Feature(
                float_list = input_feautres),
            "label": tf.train.Feature(float_list = label) #同理label也是构建一个tf.train.Feature的
        }
    )
    example = tf.train.Example(features = features) #基于features构建example
    return example.SerializeToString() #构建完example以后，再去将example序列化
    # 这样我们就将x和y转成序列化的example字符串的形式

#将从csv文件中读取出来的dataset给转化成tf.exmaple格式，然后在写到tfrecord文件当中去
def csv_dataset_to_tfrecords(base_filename, dataset, #base_filename 依然是存成多个文件，所以说有个base_filename,后面还要对他编号
                            #dataset 读取出来的dataset
                             n_shards, steps_per_shard, #n_shards就是我存成多少个文件
                             compression_type = None): #steps_per_shard对于每一个小文件，我该在dataset里走多少步
                             #因为我在构建dataset的时候，我们用了repeat,所以dataset里面的遍历是永远也不会结束的
                             #所以需要我们自己去算我们需要遍历多少步
                             #compression_type我是不是使用某些压缩方法，默认为None
    options = tf.io.TFRecordOptions(
        compression_type = compression_type) # 定义TFRecordOopptions
    all_filenames = []
    for shard_id in range(n_shards): #n_shards一共有多少个文件
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(
            base_filename, shard_id, n_shards) #具体要生成的小文件的名字
        with tf.io.TFRecordWriter(filename_fullpath, options) as writer: #遍历每一个要生成的小文件，打开文件
            for x_batch, y_batch in dataset.take(steps_per_shard): #从dataset中取出steps_per_shard个数据，每个数据都是一个batch
                for x_example, y_example in zip(x_batch, y_batch): #batch里面的每一条数据
                    writer.write(
                        serialize_example(x_example, y_example)) #转成tf.exmaple 写到tfrecord文件当中去
        all_filenames.append(filename_fullpath) #把所有的文件名都保存下来
    return all_filenames #返回所有的文件名
    #这个函数生成了一堆小文件，每个小文件都包含了dataset中的一部分，具体的部分，就是从当前的位置取出前steps_per_shard个样本
    #因为这个样本都是一个batch，所以说我们把batch中样本都解开，每个样本都给生成它的字符串，然后写到文件当中去

In [0]:
#看下是如何被调用的
n_shards = 20 #表示需要生成多少个小文件，这里统一把训练集，验证集，测试集都生成20个小文件
train_steps_per_shard = 11610 // batch_size // n_shards #steps_per_shard对训练集 验证集 测试集需要走多少步
valid_steps_per_shard = 3880 // batch_size // n_shards #valid样本个数为3880，因为我每次取都是从dataset中取batchsize个，所以我要先除以batch_size
test_steps_per_shard = 5170 // batch_size // n_shards #因为有n_shards个文件，所以就得到在每个文件上需要多少个batch

output_dir = "generate_tfrecords" #定义输出文件夹
if not os.path.exists(output_dir): #验证它是不是存在，不存在创建一个
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir, "train") #定义basename，就是加个train
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")

train_tfrecord_filenames = csv_dataset_to_tfrecords( #调用上面函数
    train_basename, train_set, n_shards, train_steps_per_shard, None)
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, n_shards, valid_steps_per_shard, None)
test_tfrecord_fielnames = csv_dataset_to_tfrecords(
    test_basename, test_set, n_shards, test_steps_per_shard, None)


In [25]:
pprint.pprint(train_tfrecord_filenames)
pprint.pprint(valid_tfrecord_filenames)
pprint.pprint(test_tfrecord_fielnames)

['generate_tfrecords/train_00000-of-00020',
 'generate_tfrecords/train_00001-of-00020',
 'generate_tfrecords/train_00002-of-00020',
 'generate_tfrecords/train_00003-of-00020',
 'generate_tfrecords/train_00004-of-00020',
 'generate_tfrecords/train_00005-of-00020',
 'generate_tfrecords/train_00006-of-00020',
 'generate_tfrecords/train_00007-of-00020',
 'generate_tfrecords/train_00008-of-00020',
 'generate_tfrecords/train_00009-of-00020',
 'generate_tfrecords/train_00010-of-00020',
 'generate_tfrecords/train_00011-of-00020',
 'generate_tfrecords/train_00012-of-00020',
 'generate_tfrecords/train_00013-of-00020',
 'generate_tfrecords/train_00014-of-00020',
 'generate_tfrecords/train_00015-of-00020',
 'generate_tfrecords/train_00016-of-00020',
 'generate_tfrecords/train_00017-of-00020',
 'generate_tfrecords/train_00018-of-00020',
 'generate_tfrecords/train_00019-of-00020']
['generate_tfrecords/valid_00000-of-00020',
 'generate_tfrecords/valid_00001-of-00020',
 'generate_tfrecords/valid_00002

In [0]:
#生成压缩后的tfrecord文件
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = "generate_tfrecords_zip"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")

train_tfrecord_filenames = csv_dataset_to_tfrecords(
    train_basename, train_set, n_shards, train_steps_per_shard,
    compression_type = "GZIP")
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, n_shards, valid_steps_per_shard,
    compression_type = "GZIP")
test_tfrecord_fielnames = csv_dataset_to_tfrecords(
    test_basename, test_set, n_shards, test_steps_per_shard,
    compression_type = "GZIP")

In [27]:
pprint.pprint(train_tfrecord_filenames)
pprint.pprint(valid_tfrecord_filenames)
pprint.pprint(test_tfrecord_fielnames)

['generate_tfrecords_zip/train_00000-of-00020',
 'generate_tfrecords_zip/train_00001-of-00020',
 'generate_tfrecords_zip/train_00002-of-00020',
 'generate_tfrecords_zip/train_00003-of-00020',
 'generate_tfrecords_zip/train_00004-of-00020',
 'generate_tfrecords_zip/train_00005-of-00020',
 'generate_tfrecords_zip/train_00006-of-00020',
 'generate_tfrecords_zip/train_00007-of-00020',
 'generate_tfrecords_zip/train_00008-of-00020',
 'generate_tfrecords_zip/train_00009-of-00020',
 'generate_tfrecords_zip/train_00010-of-00020',
 'generate_tfrecords_zip/train_00011-of-00020',
 'generate_tfrecords_zip/train_00012-of-00020',
 'generate_tfrecords_zip/train_00013-of-00020',
 'generate_tfrecords_zip/train_00014-of-00020',
 'generate_tfrecords_zip/train_00015-of-00020',
 'generate_tfrecords_zip/train_00016-of-00020',
 'generate_tfrecords_zip/train_00017-of-00020',
 'generate_tfrecords_zip/train_00018-of-00020',
 'generate_tfrecords_zip/train_00019-of-00020']
['generate_tfrecords_zip/valid_00000-of-

In [0]:
expected_features = {
    "input_features": tf.io.FixedLenFeature([8], dtype=tf.float32),
    "label": tf.io.FixedLenFeature([1], dtype=tf.float32)
}

def parse_example(serialized_example):
    example = tf.io.parse_single_example(serialized_example,
                                         expected_features)
    return example["input_features"], example["label"]

def tfrecords_reader_dataset(filenames, n_readers=5,
                             batch_size=32, n_parse_threads=5,
                             shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TFRecordDataset(
            filename, compression_type = "GZIP"),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_example,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

tfrecords_train = tfrecords_reader_dataset(train_tfrecord_filenames,
                                           batch_size = 3)
for x_batch, y_batch in tfrecords_train.take(2):
    print(x_batch)
    print(y_batch)

tf.Tensor(
[[ 2.5150437   1.0731637   0.5574401  -0.17273512 -0.6129126  -0.01909157
  -0.5710993  -0.02749031]
 [ 2.5150437   1.0731637   0.5574401  -0.17273512 -0.6129126  -0.01909157
  -0.5710993  -0.02749031]
 [ 2.5150437   1.0731637   0.5574401  -0.17273512 -0.6129126  -0.01909157
  -0.5710993  -0.02749031]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[5.00001]
 [5.00001]
 [5.00001]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[ 2.5150437   1.0731637   0.5574401  -0.17273512 -0.6129126  -0.01909157
  -0.5710993  -0.02749031]
 [ 2.5150437   1.0731637   0.5574401  -0.17273512 -0.6129126  -0.01909157
  -0.5710993  -0.02749031]
 [ 0.8015443   0.27216142 -0.11624393 -0.20231152 -0.5430516  -0.02103962
  -0.5897621  -0.08241846]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[5.00001]
 [5.00001]
 [3.226  ]], shape=(3, 1), dtype=float32)


In [0]:
batch_size = 32
tfrecords_train_set = tfrecords_reader_dataset(
    train_tfrecord_filenames, batch_size = batch_size)
tfrecords_valid_set = tfrecords_reader_dataset(
    valid_tfrecord_filenames, batch_size = batch_size)
tfrecords_test_set = tfrecords_reader_dataset(
    test_tfrecord_fielnames, batch_size = batch_size)


In [0]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',
                       input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(tfrecords_train_set,
                    validation_data = tfrecords_valid_set,
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100


In [0]:
model.evaluate(tfrecords_test_set, steps = 5160 // batch_size)



0.4428980045066857