In [1]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn 
import pandas as pd
import os, gc, sys, time
import tensorflow as tf
from tensorflow import keras

import warnings
warnings.filterwarnings('ignore')

## **3.1 tfrecord文件**

tfrecord是一种文件格式
- 文件内部的数据以`tf.train.Example`的方式存储
    - `Example`数据以`tf.train.Features`保存
        - `Features`数据是k-v格式的，k指的是特征名，v是特征内容
            - `Features`也可以是不同的格式, `tf.train.ByteList  tf.train.FloatList  tf.train.Int64List`

### **3.1.1 Features格式**

**ByteList**

In [2]:
favorite_books = [name.encode('utf-8') for name in ["machine learning", 'Deep Learning']]
favorite_books_bytelist = tf.train.BytesList(value = favorite_books)
print(favorite_books)

[b'machine learning', b'Deep Learning']


**FloatList**

In [3]:
hours_floatlist = tf.train.FloatList(value=[1., 2.3, 5.])
print(hours_floatlist)

value: 1.0
value: 2.299999952316284
value: 5.0



**Int64List**

In [4]:
age_int64list = tf.train.Int64List(value=[22, 23, 21])
print(age_int64list)

value: 22
value: 23
value: 21



**Features**

In [5]:
features = tf.train.Features(
    feature = {
        'favorite_books' : tf.train.Feature(bytes_list = favorite_books_bytelist),
        'hours' : tf.train.Feature(float_list = hours_floatlist),
        'age' : tf.train.Feature(int64_list = age_int64list)
    }
)
print(features)

feature {
  key: "age"
  value {
    int64_list {
      value: 22
      value: 23
      value: 21
    }
  }
}
feature {
  key: "favorite_books"
  value {
    bytes_list {
      value: "machine learning"
      value: "Deep Learning"
    }
  }
}
feature {
  key: "hours"
  value {
    float_list {
      value: 1.0
      value: 2.299999952316284
      value: 5.0
    }
  }
}



**Example**

In [6]:
example = tf.train.Example(features=features)
print(example)

serialized_example = example.SerializeToString() # 序列化从而使得保存占用存储较小
print(serialized_example)

features {
  feature {
    key: "age"
    value {
      int64_list {
        value: 22
        value: 23
        value: 21
      }
    }
  }
  feature {
    key: "favorite_books"
    value {
      bytes_list {
        value: "machine learning"
        value: "Deep Learning"
      }
    }
  }
  feature {
    key: "hours"
    value {
      float_list {
        value: 1.0
        value: 2.299999952316284
        value: 5.0
      }
    }
  }
}

b'\nb\n5\n\x0efavorite_books\x12#\n!\n\x10machine learning\n\rDeep Learning\n\x0e\n\x03age\x12\x07\x1a\x05\n\x03\x16\x17\x15\n\x19\n\x05hours\x12\x10\x12\x0e\n\x0c\x00\x00\x80?33\x13@\x00\x00\xa0@'


### **3.1.2 tfrecord存储**

In [7]:
output_dir = 'tf.record_basic'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
filename = 'test.tfrecord'
filename_fullpath = os.path.join(output_dir, filename)
with tf.io.TFRecordWriter(filename_fullpath) as writer:
    # 打开tfrecord文件
    for i in range(3):
        writer.write(serialized_example)

### 3.1.3 **tfrecord读取**

In [8]:
dataset = tf.data.TFRecordDataset([filename_fullpath])
for  serialized_example_tensor in dataset:
    # 我们得到的是序列化的example
    print(serialized_example_tensor)

tf.Tensor(b'\nb\n5\n\x0efavorite_books\x12#\n!\n\x10machine learning\n\rDeep Learning\n\x0e\n\x03age\x12\x07\x1a\x05\n\x03\x16\x17\x15\n\x19\n\x05hours\x12\x10\x12\x0e\n\x0c\x00\x00\x80?33\x13@\x00\x00\xa0@', shape=(), dtype=string)
tf.Tensor(b'\nb\n5\n\x0efavorite_books\x12#\n!\n\x10machine learning\n\rDeep Learning\n\x0e\n\x03age\x12\x07\x1a\x05\n\x03\x16\x17\x15\n\x19\n\x05hours\x12\x10\x12\x0e\n\x0c\x00\x00\x80?33\x13@\x00\x00\xa0@', shape=(), dtype=string)
tf.Tensor(b'\nb\n5\n\x0efavorite_books\x12#\n!\n\x10machine learning\n\rDeep Learning\n\x0e\n\x03age\x12\x07\x1a\x05\n\x03\x16\x17\x15\n\x19\n\x05hours\x12\x10\x12\x0e\n\x0c\x00\x00\x80?33\x13@\x00\x00\xa0@', shape=(), dtype=string)


In [9]:
excepted_features = {
    'favorite_books' : tf.io.VarLenFeature(dtype=tf.string), # 变长的Features
    'hours' : tf.io.VarLenFeature(dtype=tf.float32), 
    'age' : tf.io.VarLenFeature(dtype=tf.int64) # 定长的Feature是FixedLenFeature  
}
dataset = tf.data.TFRecordDataset([filename_fullpath])
for  serialized_example_tensor in dataset:
    # 我们得到的是序列化的example
    print(tf.io.parse_single_example(serialized_example_tensor, excepted_features))

{'age': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211E00CF2E8>, 'favorite_books': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211AAB9E048>, 'hours': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C7668>}
{'age': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211AAB9E748>, 'favorite_books': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211E00B7E80>, 'hours': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C75C0>}
{'age': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211E00CF358>, 'favorite_books': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211AAB9E748>, 'hours': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C7860>}


### **3.1.4 存储tfrecord为压缩文件**

In [10]:
output_dir = 'tf.record_basic'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
filename_fullpath = os.path.join(output_dir, filename) + '.zip'
options = tf.io.TFRecordOptions(compression_type='GZIP')
with tf.io.TFRecordWriter(filename_fullpath, options=options) as writer:
    # 打开tfrecord文件
    for i in range(3):
        writer.write(serialized_example)

### **3.1.5读取压缩文件**

In [11]:
excepted_features = {
    'favorite_books' : tf.io.VarLenFeature(dtype=tf.string), # 变长的Features
    'hours' : tf.io.VarLenFeature(dtype=tf.float32), 
    'age' : tf.io.VarLenFeature(dtype=tf.int64) # 定长的Feature是FixedLenFeature  
}
dataset = tf.data.TFRecordDataset([filename_fullpath], compression_type='GZIP')
for  serialized_example_tensor in dataset:
    # 我们得到的是序列化的example
    print(tf.io.parse_single_example(serialized_example_tensor, excepted_features))

{'age': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C7470>, 'favorite_books': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C7518>, 'hours': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836CEE48>}
{'age': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C74A8>, 'favorite_books': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C7160>, 'hours': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836CE438>}
{'age': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C7358>, 'favorite_books': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836C76D8>, 'hours': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x00000211836CE0B8>}


## **3.2 使用csv生成tf.record**文件

### **3.2.1读取csv为dataset**

In [2]:
source_dir = './generate_csv'
print(os.listdir(source_dir))

def get_filenames_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir, filename))
    return results

['.ipynb_checkpoints', 'train_00.csv', 'train_01.csv', 'train_02.csv', 'train_03.csv', 'train_04.csv', 'train_05.csv', 'train_06.csv', 'train_07.csv', 'train_08.csv', 'train_09.csv', 'train_10.csv', 'train_11.csv', 'train_12.csv', 'train_13.csv', 'train_14.csv', 'train_15.csv', 'train_16.csv', 'train_17.csv', 'train_18.csv', 'train_19.csv', 'valid_00.csv', 'valid_01.csv', 'valid_02.csv', 'valid_03.csv', 'valid_04.csv', 'valid_05.csv', 'valid_06.csv', 'valid_07.csv', 'valid_08.csv', 'valid_09.csv']


In [4]:
train_filenames = get_filenames_by_prefix(source_dir, 'train')
valid_filenames = get_filenames_by_prefix(source_dir, 'valid')
print(train_filenames, valid_filenames)

['./generate_csv\\train_00.csv', './generate_csv\\train_01.csv', './generate_csv\\train_02.csv', './generate_csv\\train_03.csv', './generate_csv\\train_04.csv', './generate_csv\\train_05.csv', './generate_csv\\train_06.csv', './generate_csv\\train_07.csv', './generate_csv\\train_08.csv', './generate_csv\\train_09.csv', './generate_csv\\train_10.csv', './generate_csv\\train_11.csv', './generate_csv\\train_12.csv', './generate_csv\\train_13.csv', './generate_csv\\train_14.csv', './generate_csv\\train_15.csv', './generate_csv\\train_16.csv', './generate_csv\\train_17.csv', './generate_csv\\train_18.csv', './generate_csv\\train_19.csv'] ['./generate_csv\\valid_00.csv', './generate_csv\\valid_01.csv', './generate_csv\\valid_02.csv', './generate_csv\\valid_03.csv', './generate_csv\\valid_04.csv', './generate_csv\\valid_05.csv', './generate_csv\\valid_06.csv', './generate_csv\\valid_07.csv', './generate_csv\\valid_08.csv', './generate_csv\\valid_09.csv']


In [5]:
def parse_csv_line(line, n_fields = 9):
    # 解析单行数据
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

def read_csv_to_dataset(filenames, n_readers=5, batch_size=32, n_parse_threads=5,
                        shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename : tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line, num_parallel_calls=n_parse_threads) # 注意interleave(1->n)和map(1->1)的区别
    dataset = dataset.batch(batch_size)
    return dataset

train_set = read_csv_to_dataset(train_filenames, batch_size=32)
valid_set = read_csv_to_dataset(valid_filenames, batch_size=32)

### **3.2.2把dataset转变为tfrecord**

In [6]:
# 遍历dataset写入tfrecord
def serialize_example(x, y):
    """把x，y转换称为tf.train.Example然后序列化"""
    input_features = tf.train.FloatList(value = x)
    label = tf.train.FloatList(value=y)
    features = tf.train.Features(
        feature = {
            'input_features': tf.train.Feature(float_list = input_features), 
            'label': tf.train.Feature(float_list = label)
        }
    )
    example = tf.train.Example(features=features)
    return example.SerializePartialToString()

def csv_dataset_to_tfrecords(base_filename, dataset, n_shards, steps_per_shard, compression_type=None):
    """
    n_shards: 将文件存为多少片
    steps_per_shard: 因为dataset进行了repeat操作，需要告诉函数多少step是一个文件
    """
    options = tf.io.TFRecordOptions(compression_type=compression_type)
    all_filenames = []
    
    
    for shard_id in range(n_shards):
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(base_filename, shard_id, n_shards)
        with tf.io.TFRecordWriter(filename_fullpath, options) as writer:
            for x_batch, y_batch in dataset.skip(shard_id * steps_per_shard).take(steps_per_shard):
                for x_example, y_example in zip(x_batch, y_batch):
                    writer.write(serialize_example(x_example, y_example))
                    
                    
        all_filenames.append(filename_fullpath)
    return all_filenames

In [7]:
n_shards = 20
batch_size = 32
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards

output_dir = 'generate_tfrecords_zip'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
train_basename = os.path.join(output_dir, 'train')
valid_basename = os.path.join(output_dir, 'valid')
train_tfrecord_filenames = csv_dataset_to_tfrecords(train_basename, train_set, n_shards, train_steps_per_shard, 'GZIP')
valid_tfrecord_filenames = csv_dataset_to_tfrecords(valid_basename, valid_set, n_shards, valid_steps_per_shard, 'GZIP')

 ## **3.3 读取tfrecord文件**

In [8]:
print(train_tfrecord_filenames)
print(valid_tfrecord_filenames)

['generate_tfrecords_zip\\train_00000-of-00020', 'generate_tfrecords_zip\\train_00001-of-00020', 'generate_tfrecords_zip\\train_00002-of-00020', 'generate_tfrecords_zip\\train_00003-of-00020', 'generate_tfrecords_zip\\train_00004-of-00020', 'generate_tfrecords_zip\\train_00005-of-00020', 'generate_tfrecords_zip\\train_00006-of-00020', 'generate_tfrecords_zip\\train_00007-of-00020', 'generate_tfrecords_zip\\train_00008-of-00020', 'generate_tfrecords_zip\\train_00009-of-00020', 'generate_tfrecords_zip\\train_00010-of-00020', 'generate_tfrecords_zip\\train_00011-of-00020', 'generate_tfrecords_zip\\train_00012-of-00020', 'generate_tfrecords_zip\\train_00013-of-00020', 'generate_tfrecords_zip\\train_00014-of-00020', 'generate_tfrecords_zip\\train_00015-of-00020', 'generate_tfrecords_zip\\train_00016-of-00020', 'generate_tfrecords_zip\\train_00017-of-00020', 'generate_tfrecords_zip\\train_00018-of-00020', 'generate_tfrecords_zip\\train_00019-of-00020']
['generate_tfrecords_zip\\valid_00000-o

In [10]:
expected_features = {
    'input_features': tf.io.FixedLenFeature([8], dtype=tf.float32),
    'label': tf.io.FixedLenFeature([1], dtype=tf.float32)
}

def parse_example(emp):
    print(emp)
    emp = tf.io.parse_single_example(emp, expected_features)
    #print(example)
    return emp['input_features'], emp['label']

def read_tfrecords_to_dataset(filenames, n_readers=5, batch_size=32, n_parse_threads=5,
                        shuffle_buffer_size=10000):
    """把tfrecord转换成为dataset"""
    dataset_t = tf.data.Dataset.list_files(filenames)
    dataset_t = dataset_t.repeat()
    dataset_t = dataset_t.interleave(
        lambda filename : tf.data.TFRecordDataset(filename, compression_type='GZIP'),
        cycle_length = n_readers
    )
    dataset_t.shuffle(shuffle_buffer_size)
    dataset_t = dataset_t.map(parse_example, num_parallel_calls=n_parse_threads) # 注意interleave(1->n)和map(1->1)的区别
    dataset_t = dataset_t.batch(batch_size)
    return dataset_t


tf_record_train = read_tfrecords_to_dataset(train_tfrecord_filenames, batch_size=3)
for x_batch, y_batch in tf_record_train.take(3):
    print(x_batch, y_batch)

Tensor("args_0:0", shape=(), dtype=string)
tf.Tensor(
[[ 1.5891896   0.9090429  -0.2174371  -0.14523432 -0.90626943 -0.06638897
  -0.7274083   0.48690954]
 [ 0.5387951   0.6698735  -0.28103194 -0.25065798 -0.6060004  -0.00485437
  -0.8164591   0.7159614 ]
 [-0.11477906  0.59015036 -0.1510448  -0.05572433 -0.395548   -0.09082542
  -0.74615586  0.5466622 ]], shape=(3, 8), dtype=float32) tf.Tensor(
[[3.8    ]
 [1.796  ]
 [5.00001]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-0.01726831 -0.28680417  0.02679312  0.18271473 -0.46511176 -0.04461862
   0.9504959  -1.0965359 ]
 [-1.0123403   1.4671049   0.19829997  0.34417567 -0.91947776  0.10741415
  -0.7367821   0.9051781 ]
 [ 0.05155662 -0.60569674 -0.47390437  0.0880845   0.33002868  0.04120428
  -0.6477313   0.5616004 ]], shape=(3, 8), dtype=float32) tf.Tensor(
[[1.93 ]
 [0.926]
 [1.625]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-0.07316887  0.35098094 -0.10012846 -0.13981359  1.3479494   0.09893484
  -0.7602165   0.800611  ]
 [ 0.121905

**训练过程和3.2的一致**