**背景：**  
  
后面继续介绍除了之前第1、2部分模型结构之外的其他内容，包括：
1. 实战中的data load
2. callback和tensorboard
3. 自定义training loop
4. distributed training
5. model save and load

## [TFRecord](https://www.tensorflow.org/tutorials/load_data/tfrecord)

**关于TFRecord和tf.train.Example的几点备注：**
1. The TFRecord format is a simple format for storing a sequence of binary records.
2. The tf.train.Example is a method of serializing dictionaries to byte-strings. 
3. There is no need to convert existing code to use TFRecords, unless you are using tf.data and reading data is still the bottleneck to training. 
4. There is no requirement to use tf.train.Example in TFRecord files.
5. [official example](https://github.com/tensorflow/models/blob/8367cf6dabe11adf7628541706b660821f397dce/research/slim/datasets/download_and_convert_flowers.py)：Downloads and converts Flowers data to TFRecords of TF-Example protos.

以下摘录tf.data相关的内容：

### 把标准standard TensorFlow type转化为tf.train.Example兼容的类型

In [1]:
import tensorflow as tf

import numpy as np
import IPython.display as display

In [2]:
# The following functions can be used to convert a value to a type compatible with tf.train.Example.

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

### 创建一条tf.train.Example message

In [3]:
def serialize_example(feature0, feature1, feature2, feature3):
    """
    Creates a tf.train.Example message ready to be written to a file.
    """
    # Create a dictionary mapping the feature name to the tf.train.Example-compatible
    # data type.
    feature = {
        'feature0': _int64_feature(feature0),
        'feature1': _int64_feature(feature1),
        'feature2': _bytes_feature(feature2),
        'feature3': _float_feature(feature3),
    }
  
    # Create a Features message using tf.train.Example.
  
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString() # proto messages serialized to a binary-string

In [4]:
serialized_example = serialize_example(False, 4, b'goat', 0.9876)
serialized_example

2023-06-01 01:23:18.612979: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2023-06-01 01:23:18.613378: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


b'\nR\n\x11\n\x08feature0\x12\x05\x1a\x03\n\x01\x00\n\x14\n\x08feature3\x12\x08\x12\x06\n\x04[\xd3|?\n\x14\n\x08feature2\x12\x08\n\x06\n\x04goat\n\x11\n\x08feature1\x12\x05\x1a\x03\n\x01\x04'

In [5]:
# 如果想要反序列化：
example_proto = tf.train.Example.FromString(serialized_example)
example_proto

features {
  feature {
    key: "feature0"
    value {
      int64_list {
        value: 0
      }
    }
  }
  feature {
    key: "feature1"
    value {
      int64_list {
        value: 4
      }
    }
  }
  feature {
    key: "feature2"
    value {
      bytes_list {
        value: "goat"
      }
    }
  }
  feature {
    key: "feature3"
    value {
      float_list {
        value: 0.9876000285148621
      }
    }
  }
}

### 使用**tf.data写**TFRecord文件  

In [6]:
# 创建一个数据集 

# The number of observations in the dataset.
n_observations = int(100)

# Boolean feature, encoded as False or True.
feature0 = np.random.choice([False, True], n_observations)

# Integer feature, random from 0 to 4.
feature1 = np.random.randint(0, 5, n_observations)

# String feature.
strings = np.array([b'cat', b'dog', b'chicken', b'horse', b'goat'])
feature2 = strings[feature1]

# Float feature, from a standard normal distribution.
feature3 = np.random.randn(n_observations)

features_dataset = tf.data.Dataset.from_tensor_slices((feature0, feature1, feature2, feature3))
features_dataset

<TensorSliceDataset shapes: ((), (), (), ()), types: (tf.bool, tf.int64, tf.string, tf.float64)>

  The mapped function must operate in TensorFlow graph mode—it must operate on and return tf.Tensors. **A non-tensor function, like serialize_example, can be wrapped with tf.py_function to make it compatible.**

In [7]:
def tf_serialize_example(f0,f1,f2,f3):
    tf_string = tf.py_function(
        serialize_example,
        (f0, f1, f2, f3),  # Pass these args to the above function.
        tf.string)      # The return type is `tf.string`.
    return tf.reshape(tf_string, ()) # The result is a scalar.

In [8]:
serialized_features_dataset = features_dataset.map(tf_serialize_example)
serialized_features_dataset

<MapDataset shapes: (), types: tf.string>

In [9]:
filename = 'data/test.tfrecord'
writer = tf.data.experimental.TFRecordWriter(filename)
writer.write(serialized_features_dataset)

2023-06-01 01:23:18.673492: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)


### 使用**tf.data读**TFRecord文件

In [10]:
filename = 'data/test.tfrecord'
filenames = [filename]
raw_dataset = tf.data.TFRecordDataset(filenames)
raw_dataset

<TFRecordDatasetV2 shapes: (), types: tf.string>

In [11]:
# At this point the dataset contains serialized tf.train.Example messages.
for raw_record in raw_dataset.take(1):
    print(repr(raw_record))

<tf.Tensor: shape=(), dtype=string, numpy=b'\nS\n\x14\n\x08feature3\x12\x08\x12\x06\n\x04y\xf7\x91>\n\x15\n\x08feature2\x12\t\n\x07\n\x05horse\n\x11\n\x08feature0\x12\x05\x1a\x03\n\x01\x01\n\x11\n\x08feature1\x12\x05\x1a\x03\n\x01\x03'>


把tf.train.Example解析成standard tensors：

In [12]:
# Create a description of the features.
feature_description = {
    'feature0': tf.io.FixedLenFeature([], tf.int64, default_value=0),
    'feature1': tf.io.FixedLenFeature([], tf.int64, default_value=0),
    'feature2': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'feature3': tf.io.FixedLenFeature([], tf.float32, default_value=0.0),
}

def _parse_function(example_proto):
    # Parse the input `tf.train.Example` proto using the dictionary above.
    return tf.io.parse_single_example(example_proto, feature_description)

parsed_dataset = raw_dataset.map(_parse_function)
for record in parsed_dataset.take(1):
    print(repr(record))

{'feature0': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'feature1': <tf.Tensor: shape=(), dtype=int64, numpy=3>, 'feature2': <tf.Tensor: shape=(), dtype=string, numpy=b'horse'>, 'feature3': <tf.Tensor: shape=(), dtype=float32, numpy=0.2850912>}


### 多维数组的情形

以上代码都是针对scalar，如果遇到多维的情况，可以使用以下两个函数来进行序列化和解析：  
- [tf.io.serialize_tensor](https://www.tensorflow.org/api_docs/python/tf/io/serialize_tensor)
- [tf.io.parse_tensor](https://www.tensorflow.org/api_docs/python/tf/io/parse_tensor)

In [13]:
t = tf.constant([1,2])
serialized_t = tf.io.serialize_tensor(t)
serialized_t

<tf.Tensor: shape=(), dtype=string, numpy=b'\x08\x03\x12\x04\x12\x02\x08\x02"\x08\x01\x00\x00\x00\x02\x00\x00\x00'>

In [14]:
# _bytes_feature(t) 没有经过序列化的多维tensor，这样会报错
_bytes_feature(serialized_t)

bytes_list {
  value: "\010\003\022\004\022\002\010\002\"\010\001\000\000\000\002\000\000\000"
}

In [15]:
tf.io.parse_tensor(
    serialized_t, out_type=tf.int32 
)

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([1, 2], dtype=int32)>

## 从HDFS导入数据

这部分假设已经有TFRecords在hdfs数据节点上了。**这里copy一份前面生成的TFRecord，并且重命名为test2.tfrecord用于测试。**  
有一份[文档](https://medium.com/@matthewyeung/hadoop-file-system-with-tensorflow-dataset-api-13ce9aeaa107)供参考。

In [16]:
filename = 'data/test2.tfrecord'
writer = tf.data.experimental.TFRecordWriter(filename)
writer.write(serialized_features_dataset)

### 使用[tf.data.Dataset.list_files](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#list_files)基于通配符找到所有数据文件

In [17]:
#dataset_files=hdfs://NamenodeIP:Port/path/to/tfrecord_dir/*.tfrecord 
dataset_files="data/*.tfrecord" 
dataset = tf.data.Dataset.list_files(dataset_files)
print("file match cnt:", len(dataset))

file match cnt: 2


### 使用[tf.data.Dataset.interleave](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#interleave)
进行数据交叉。跨文件shuffle，打乱得更彻底。  
*Note: While large buffer_sizes shuffle more thoroughly, they can take a lot of memory, and significant time to fill. Consider using Dataset.interleave across files if this becomes a problem.*

In [18]:
dataset = dataset.interleave(
    lambda x: tf.data.TFRecordDataset(x),
    cycle_length=4, num_parallel_calls=tf.data.AUTOTUNE,
    deterministic=False
)

### 解析tf.Example message

In [19]:
dataset = dataset.map(_parse_function)
for record in dataset.take(1):
    print(repr(record))

{'feature0': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'feature1': <tf.Tensor: shape=(), dtype=int64, numpy=3>, 'feature2': <tf.Tensor: shape=(), dtype=string, numpy=b'horse'>, 'feature3': <tf.Tensor: shape=(), dtype=float32, numpy=0.2850912>}


### batch + prefetch
这里也支持对每个batch进行[padding](https://www.tensorflow.org/guide/data#simple_batching)。最后加入prefetch，可以改善延迟和吞吐量，但同时会消耗额外的内存。

In [20]:
batched_dataset = dataset.batch(64, drop_remainder=True).prefetch(2)
batched_dataset 

<PrefetchDataset shapes: {feature0: (64,), feature1: (64,), feature2: (64,), feature3: (64,)}, types: {feature0: tf.int64, feature1: tf.int64, feature2: tf.string, feature3: tf.float32}>