# 分布式训练

https://www.tensorflow.org/guide/distributed_training

https://www.tensorflow.org/tutorials/distribute

- [1. 策略类型介绍](#1.策略类型介绍)
- [2. 使用Keras进行分布式训练](#2.使用Keras进行分布式训练)
- [3. 使用自定义训练循环进行分布式训练](#3.使用自定义训练循环进行分布式训练)
- [4. 使用Estimator进行多工作器训练](#4.使用Estimator进行多工作器训练)
- [5. 其他](#5.其他)

由于这边涉及到Estimator,可以转而了解下 [A08估计器](A08Estimator.ipynb)。tf.estimator中包含很多机器学习的算法，使用方法类似于sklearn。tf.keras.estimator中包含一个函数`model_to_estimator`，实现从Keras模型到Estimator模型。


In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
tf.keras.backend.clear_session()
import tensorflow.keras as keras
import tensorflow.keras.layers as layers

## 1.策略类型介绍

Training API | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | CentralStorageStrategy | ParameterServerStrategy | OneDeviceStrategy
---|---|---|---|---|---|---
Keras API | Supported | Experimental support | Experimental support | Experimental support | Supported planned post 2.0 | Supported
Custom training loop | Experimental support | Experimental support | Support planned post 2.0 | Support planned post 2.0 | No support yet | Supported
Estimator API | Limited Support | Not supported | Limited Support | Limited Support | Limited Support | Limited Support

- [1.1 镜像策略]()——单机多卡同步训练★★★

**这是在一台计算机上的多 GPU（单机多卡）进行同时训练的图形内复制（in-graph replication）。事实上，它会将所有模型的变量复制到每个处理器上，然后，通过使用 all-reduce 去整合所有处理器的梯度（gradients），并将整合的结果应用于所有副本之中。**

我们最常用的分布式策略是单机多卡同步训练，tf.distribute.MirroredStrategy完美支持这种策略。这种策略将在每个GPU设备上创建一个模型副本（replica），模型中的参数在所有replica之间映射，称之为MirroredVariables，当他们执行相同更新时将在所有设备间同步。

底层的通信采用all-reduce算法，all-reduce方法可以将多个设备上的Tensors聚合在每个设备上，这种通信方式比较高效，而all-reduce算法有多中实现方式，这里默认采用NVIDIA NCCL的all-reduce方法。

```python
mirrored_strategy = tf.distribute.MirroredStrategy()   # 使用TensorFlow可见的所有GPU，并将NCCL用作跨设备通信。
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])  # 这里将在GPU 0和1上同步训练
```

如果您希望**覆盖跨设备通信**，则可以`cross_device_ops`通过提供参数的实例来使用该参数tf.distribute.CrossDeviceOps。

参数cross_device_ops默认为 `tf.distribute.NcclAllReduce()`

其他支持的有`tf.distribute.HierarchicalCopyAllReduce`和`tf.distribute.ReductionToOneDevice`


- [1.2 中央存储策略]()——单机多卡同步训练，仍在开发中

**变量没有被镜像，而是被放置在CPU上，并且操作在所有本地GPU之间复制。如果只有一个GPU，则所有变量和操作都将放置在该GPU上。**

```python
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
```

这将创建一个CentralStorageStrategy实例，该实例将使用所有可见的GPU和CPU。在副本上对变量的更新将在应用于变量之前进行汇总。


- [1.3 多工镜像策略](#1.3MultiWorkerMirroredStrategy)——多机多卡同步训练，仍在开发中

它在所有工作人员的每台设备上的模型中创建所有变量的副本。
It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs. Similar to MirroredStrategy, it creates copies of all variables in the model on each device across all workers.

```python
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
```

**集群操作的不同实现**:

CollectiveCommunication.RING使用gRPC作为通信层实现基于环的集合。 

CollectiveCommunication.NCCL使用Nvidia的NCCL实施集体。 

CollectiveCommunication.AUTO将选择推迟到运行时。集体实施的最佳选择取决于GPU的数量和种类以及集群中的网络互连。

```python
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
```


[回到目录](#分布式训练)

- [1.4 TPU策略]()——TPU配置策略，仍在开发中

```python
cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.experimental.TPUStrategy(cluster_resolver)
```


- [1.5 ParameterServerStrategy]()——多机的参数分布策略

支持多台机器上的参数服务器培训。在此设置中，某些机器被指定为工作器，而另一些被指定为参数服务器。模型的每个变量都放在一个参数服务器上。计算在所有工作程序的所有GPU之间复制。
```python
ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
```


- [1.6 OneDeviceStrategy]()——单机训练

```python
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
```


[回到目录](#分布式训练)

## 2.使用Keras进行分布式训练 

第一步：创建 `tf.distribute.Strategy`

第二步：把模型的创建编译写在`strategy.scope`的下面.

```python
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  model.compile(loss='mse', optimizer='sgd')
    
# dataset的格式
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)

# numpy的格式
import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)

```

**在训练具有多个 GPU 的模型时，可以通过增加批量大小（batch size）来有效地使用额外的计算能力。**

**通常来说，使用适合 GPU 内存的最大批量大小（batch size），并相应地调整学习速率。**

```python
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA * mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
```

[回到目录](#分布式训练)

In [5]:
from __future__ import absolute_import, division, print_function, unicode_literals

# 导入 TensorFlow 和 TensorFlow 数据集 pip3 install tensorflow==2.0.0-beta1
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os

# 返回 tf.data 格式的数据集。
# 将 with_info 设置为 True 会包含整个数据集的元数据,其中这些数据集将保存在 info 中。 除此之外，该元数据对象包括训练和测试示例的数量。
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']

# 提供一个上下文管理器（tf.distribute.MirroredStrategy.scope）来构建你的模型。
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

W0109 16:21:59.019071 139963976697664 cross_device_ops.py:1168] There is non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.


Number of devices: 1


In [6]:
# 处理数据
# 执行 info.splits.total_num_examples 来获取总数
num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255

  return image, label

# 保留了训练数据的内存缓存以提高性能。
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

In [7]:
# 构建模型
with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])

  model.compile(loss='sparse_categorical_crossentropy',
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])
    

# 定义回调
# 定义检查点（checkpoint）目录以存储检查点（checkpoints）
checkpoint_dir = './training_checkpoints'
# 检查点（checkpoint）文件的名称
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

# 衰减学习率的函数。
# 可以定义所需的任何衰减函数
def decay(epoch):
  if epoch < 3:
    return 1e-3
  elif epoch >= 3 and epoch < 7:
    return 1e-4
  else:
    return 1e-5

# 在每个 epoch 结束时打印LR的回调（callbacks）。
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,model.optimizer.lr.numpy()))
    
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

In [8]:
# 训练模型
model.fit(train_dataset, epochs=12, callbacks=callbacks)

Train on None steps
Epoch 1/12
    938/Unknown - 24s 26ms/step - loss: 0.2113 - accuracy: 0.9398
Learning rate for epoch 1 is 0.0010000000474974513
Epoch 2/12
Learning rate for epoch 2 is 0.0010000000474974513
Epoch 3/12
Learning rate for epoch 3 is 0.0010000000474974513
Epoch 4/12
Learning rate for epoch 4 is 9.999999747378752e-05
Epoch 5/12
Learning rate for epoch 5 is 9.999999747378752e-05
Epoch 6/12
Learning rate for epoch 6 is 9.999999747378752e-05
Epoch 7/12
Learning rate for epoch 7 is 9.999999747378752e-05
Epoch 8/12
Learning rate for epoch 8 is 9.999999747378752e-06
Epoch 9/12
Learning rate for epoch 9 is 9.999999747378752e-06
Epoch 10/12
Learning rate for epoch 10 is 9.999999747378752e-06
Epoch 11/12
Learning rate for epoch 11 is 9.999999747378752e-06
Epoch 12/12
Learning rate for epoch 12 is 9.999999747378752e-06


<tensorflow.python.keras.callbacks.History at 0x7f4b7bf97748>

In [9]:
!ls {checkpoint_dir}

checkpoint		     ckpt_4.data-00000-of-00001
ckpt_10.data-00000-of-00001  ckpt_4.index
ckpt_10.index		     ckpt_5.data-00000-of-00001
ckpt_11.data-00000-of-00001  ckpt_5.index
ckpt_11.index		     ckpt_6.data-00000-of-00001
ckpt_12.data-00000-of-00001  ckpt_6.index
ckpt_12.index		     ckpt_7.data-00000-of-00001
ckpt_1.data-00000-of-00001   ckpt_7.index
ckpt_1.index		     ckpt_8.data-00000-of-00001
ckpt_2.data-00000-of-00001   ckpt_8.index
ckpt_2.index		     ckpt_9.data-00000-of-00001
ckpt_3.data-00000-of-00001   ckpt_9.index
ckpt_3.index


In [10]:
# 评估模型
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

eval_loss, eval_acc = model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

# 查看tensorboard
# tensorboard --logdir=path/to/log-directory

    157/Unknown - 3s 18ms/step - loss: 0.0405 - accuracy: 0.9865Eval loss: 0.040537810085080325, Eval Accuracy: 0.9865000247955322


In [11]:
# 导出为 SavedModel
# 将图形和变量导出为与平台无关的 SavedModel 格式。 保存模型后，可以在有或没有 scope 的情况下加载模型。

path = 'saved_model/'
tf.keras.experimental.export_saved_model(model, path)

# ******** 无须strategy.scope ************
unreplicated_model = tf.keras.experimental.load_from_saved_model(path)

unreplicated_model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=tf.keras.optimizers.Adam(),
    metrics=['accuracy'])

eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

# ******** 有须strategy.scope ************
with strategy.scope():
  replicated_model = tf.keras.experimental.load_from_saved_model(path)
  replicated_model.compile(loss='sparse_categorical_crossentropy',
                           optimizer=tf.keras.optimizers.Adam(),
                           metrics=['accuracy'])

  eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
  print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

W0109 16:32:05.694178 139963976697664 deprecation.py:323] From /usr/local/python3/lib/python3.6/site-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:253: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
W0109 16:32:05.696126 139963976697664 export_utils.py:182] Export includes no default signature!
W0109 16:32:06.089147 139963976697664 export_utils.py:182] Export includes no default signature!


    157/Unknown - 2s 14ms/step - loss: 0.0405 - accuracy: 0.9865Eval loss: 0.040538180162714925, Eval Accuracy: 0.9865000247955322
    157/Unknown - 2s 13ms/step - loss: 0.0405 - accuracy: 0.9865Eval loss: 0.040537810085080325, Eval Accuracy: 0.9865000247955322


## 3.使用自定义训练循环进行分布式训练

tf.distribute.MirroredStrategy 策略是如何运作的？

- 所有变量和模型图都复制在副本上。
- 输入都均匀分布在副本中。
- 每个副本在收到输入后计算输入的损失和梯度。
- 通过求和，每一个副本上的梯度都能同步。
- 同步后，每个副本上的复制的变量都可以同样更新。

[回到目录](#分布式训练)

In [6]:
from __future__ import absolute_import, division, print_function, unicode_literals

# 导入 TensorFlow
import tensorflow as tf

# 帮助库
import numpy as np
import os

print(tf.__version__)


fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# 向数组添加维度 -> 新的维度 == (28, 28, 1)
# 我们这样做是因为我们模型中的第一层是卷积层，而且它需要一个四维的输入 (批大小, 高, 宽, 通道). 批大小维度稍后将添加。
train_images = train_images[..., None]
test_images = test_images[..., None]

# 获取[0,1]范围内的图像。
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

2.0.0-beta1


In [7]:
# 准备策略和数据
strategy = tf.distribute.MirroredStrategy()
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

W0110 09:21:26.471938 140554294826816 cross_device_ops.py:1168] There is non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.


Number of devices: 1


In [8]:
def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

  return model

# 创建检查点目录以存储检查点。
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

[定义损失函数](https://www.tensorflow.org/tutorials/distribute/custom_training#%E5%AE%9A%E4%B9%89%E6%8D%9F%E5%A4%B1%E5%87%BD%E6%95%B0)

通常，在一台只有一个 GPU / CPU 的机器上，损失需要除去输入批量中的示例数。

In [9]:
with strategy.scope():
  # 将减少设置为“无”，以便我们可以在之后进行这个减少并除以全局批量大小。
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(reduction=tf.keras.losses.Reduction.NONE)
  # 或者使用 loss_fn = tf.keras.losses.sparse_categorical_crossentropy
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

[定义衡量指标以跟踪损失和准确性]()

这些指标可以跟踪测试的损失，训练和测试的准确性。 可使用.result（）随时获取累积的统计信息。

In [10]:
with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

In [11]:
# 必须在`strategy.scope`下创建模型和优化器。
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

with strategy.scope():
  def train_step(inputs):
    images, labels = inputs

    with tf.GradientTape() as tape:
      predictions = model(images, training=True)
      loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_accuracy.update_state(labels, predictions)
    return loss 

  def test_step(inputs):
    images, labels = inputs

    predictions = model(images, training=False)
    t_loss = loss_object(labels, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(labels, predictions)
    

with strategy.scope():
  # `experimental_run_v2`将复制提供的计算并使用分布式输入运行它。
  @tf.function
  def distributed_train_step(dataset_inputs):
    per_replica_losses = strategy.experimental_run_v2(train_step,args=(dataset_inputs,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
 
  @tf.function
  def distributed_test_step(dataset_inputs):
    return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))

  for epoch in range(EPOCHS):
    # 训练循环
    total_loss = 0.0
    num_batches = 0
    for x in train_dist_dataset:
      total_loss += distributed_train_step(x)
      num_batches += 1
    train_loss = total_loss / num_batches

    # 测试循环
    for x in test_dist_dataset:
      distributed_test_step(x)

    if epoch % 2 == 0:
      checkpoint.save(checkpoint_prefix)

    template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
                "Test Accuracy: {}")
    print (template.format(epoch+1, train_loss,
                           train_accuracy.result()*100, test_loss.result(),
                           test_accuracy.result()*100))

    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

Epoch 1, Loss: 0.5166115164756775, Accuracy: 81.48666381835938, Test Loss: 0.43088409304618835, Test Accuracy: 85.05999755859375
Epoch 2, Loss: 0.3407503068447113, Accuracy: 87.63166809082031, Test Loss: 0.35095396637916565, Test Accuracy: 87.25
Epoch 3, Loss: 0.29075008630752563, Accuracy: 89.33499908447266, Test Loss: 0.31871673464775085, Test Accuracy: 88.41999816894531
Epoch 4, Loss: 0.25969064235687256, Accuracy: 90.49500274658203, Test Loss: 0.2988489270210266, Test Accuracy: 89.26000213623047
Epoch 5, Loss: 0.23618030548095703, Accuracy: 91.2933349609375, Test Loss: 0.2877892255783081, Test Accuracy: 89.56999969482422
Epoch 6, Loss: 0.2153063714504242, Accuracy: 92.12833404541016, Test Loss: 0.2837090492248535, Test Accuracy: 89.94000244140625
Epoch 7, Loss: 0.1971343457698822, Accuracy: 92.81999969482422, Test Loss: 0.28046807646751404, Test Accuracy: 90.1500015258789
Epoch 8, Loss: 0.18028132617473602, Accuracy: 93.47166442871094, Test Loss: 0.274112731218338, Test Accuracy: 9

以上示例中需要注意的事项：

我们使用`for x in ...`迭代构造`train_dist_dataset`和`test_dist_dataset`。

缩放损失是`distributed_train_step`的返回值。 这个值会在各个副本使用`tf.distribute.Strategy.reduce`的时候合并，然后通过`tf.distribute.Strategy.reduce`叠加各个返回值来跨批次。

在执行`tf.distribute.Strategy.experimental_run_v2`时，tf.keras.Metrics应在train_step和test_step中更新。

`tf.distribute.Strategy.experimental_run_v2`返回策略中每个本地副本的结果，并且有多种方法可以处理此结果。 可以执行`tf.distribute.Strategy.reduce`来获取汇总值。 还可以执行`tf.distribute.Strategy.experimental_local_results`来获取每个本地副本中结果中包含的值列表。

[恢复最新的检查点并进行测试]()

一个模型使用了tf.distribute.Strategy的检查点可以使用策略或者不使用策略进行恢复。

In [12]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)

@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
    
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(eval_accuracy.result()*100))

Accuracy after restoring the saved model without strategy: 90.58999633789062


[迭代一个数据集的替代方法](https://www.tensorflow.org/tutorials/distribute/custom_training#%E8%BF%AD%E4%BB%A3%E4%B8%80%E4%B8%AA%E6%95%B0%E6%8D%AE%E9%9B%86%E7%9A%84%E6%9B%BF%E4%BB%A3%E6%96%B9%E6%B3%95)

- 使用迭代器
- 在 tf.function 中迭代
- [跟踪副本中的训练的损失](https://www.tensorflow.org/tutorials/distribute/custom_training#%E8%B7%9F%E8%B8%AA%E5%89%AF%E6%9C%AC%E4%B8%AD%E7%9A%84%E8%AE%AD%E7%BB%83%E7%9A%84%E6%8D%9F%E5%A4%B1)

In [13]:
"""
如果你想要迭代一个已经给定步骤数量而不需要整个遍历的数据集，你可以创建一个迭代器并在迭代器上调用iter和显式调用next。 
可以选择在 tf.function 内部和外部迭代数据集。 这是一个小片段，演示了使用迭代器在 tf.function 外部迭代数据集。
"""
with strategy.scope():
  for _ in range(EPOCHS):
    total_loss = 0.0
    num_batches = 0
    train_iter = iter(train_dist_dataset)

    for _ in range(10):
      total_loss += distributed_train_step(next(train_iter))
      num_batches += 1
    average_train_loss = total_loss / num_batches

    template = ("Epoch {}, Loss: {}, Accuracy: {}")
    print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
    train_accuracy.reset_states()

Epoch 10, Loss: 0.11914627254009247, Accuracy: 96.09375
Epoch 10, Loss: 0.0768473818898201, Accuracy: 97.65625
Epoch 10, Loss: 0.048872582614421844, Accuracy: 99.0625
Epoch 10, Loss: 0.03392244130373001, Accuracy: 99.6875
Epoch 10, Loss: 0.025061551481485367, Accuracy: 100.0
Epoch 10, Loss: 0.020258614793419838, Accuracy: 100.0
Epoch 10, Loss: 0.016797250136733055, Accuracy: 100.0
Epoch 10, Loss: 0.01446988433599472, Accuracy: 100.0
Epoch 10, Loss: 0.012694062665104866, Accuracy: 100.0
Epoch 10, Loss: 0.011253146454691887, Accuracy: 100.0


In [14]:
"""
您还可以使用for x in ...构造在 tf.function 内部迭代整个输入train_dist_dataset，或者像上面那样创建迭代器。
下面的例子演示了在 tf.function 中包装一个 epoch 并在功能内迭代train_dist_dataset。
"""
with strategy.scope():
  @tf.function
  def distributed_train_epoch(dataset):
    total_loss = 0.0
    num_batches = 0
    for x in dataset:
      per_replica_losses = strategy.experimental_run_v2(train_step, args=(x,))
      total_loss += strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
      num_batches += 1
    return total_loss / tf.cast(num_batches, dtype=tf.float32)

  for epoch in range(EPOCHS):
    train_loss = distributed_train_epoch(train_dist_dataset)

    template = ("Epoch {}, Loss: {}, Accuracy: {}")
    print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

    train_accuracy.reset_states()


Epoch 1, Loss: 0.13831807672977448, Accuracy: 95.05332946777344
Epoch 2, Loss: 0.14503231644630432, Accuracy: 94.60832977294922
Epoch 3, Loss: 0.12937283515930176, Accuracy: 95.13833618164062
Epoch 4, Loss: 0.11664383113384247, Accuracy: 95.71333312988281
Epoch 5, Loss: 0.10745175182819366, Accuracy: 95.99833679199219
Epoch 6, Loss: 0.0979103222489357, Accuracy: 96.32499694824219
Epoch 7, Loss: 0.09036801010370255, Accuracy: 96.62332916259766
Epoch 8, Loss: 0.08281902223825455, Accuracy: 96.87999725341797
Epoch 9, Loss: 0.07657422125339508, Accuracy: 97.18833923339844
Epoch 10, Loss: 0.06852100044488907, Accuracy: 97.46333312988281


## 4.使用Estimator进行多工作器训练

[回到目录](#分布式训练)

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals 
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os, json

"""
MultiWorkerMirroredStrategy 创建了每个设备中模型层里所有变量的拷贝，且是跨工作器的。
其用到了 CollectiveOps，这是 TensorFlow 里的一种操作，用来整合梯度以及确保变量同步。
"""
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

"""
多工作器配置
"""
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})


BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else datasets['test'])  # 训练 or 测试

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)
  return mnist_dataset.map(scale).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

W0110 10:41:40.797106 139944007546688 cross_device_ops.py:1168] There is non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.


In [2]:
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
    model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
    ])
    logits = model(features, training=False)

    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {'logits': logits}
        return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

    optimizer = tf.compat.v1.train.GradientDescentOptimizer(learning_rate=LEARNING_RATE)
    loss = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
    loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
    if mode == tf.estimator.ModeKeys.EVAL:
        return tf.estimator.EstimatorSpec(mode, loss=loss)

    return tf.estimator.EstimatorSpec(mode=mode, loss=loss,
          train_op=optimizer.minimize(loss, tf.compat.v1.train.get_or_create_global_step()))

In [None]:
"""
在 RunConfig 中为 estimator 指明分布式策略，同时通过调用 tf.estimator.train_and_evaluate 训练和评估模型。
此处指明 train_distribute 进行分布式训练。同样也可以指明 eval_distribute 来进行分布式评估。
"""
config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(model_fn=model_fn, model_dir='./multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)

W0110 10:41:41.039667 139944007546688 distribute_coordinator.py:829] `eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
W0110 10:41:41.044196 139944007546688 cross_device_ops.py:1164] Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:worker/replica:0/task:0/device:CPU:0
W0110 10:41:41.054024 139944007546688 cross_device_ops.py:1164] Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:worker/replica:0/task:0/device:CPU:0
W0110 10:41:42.008032 139940201412352 deprecation.py:323] From /usr/local/python3/lib/python3.6/site-packages/tensorflow/python/ops/array_ops.py:1340: add_dispatch_support.<locals>.wrapper (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
W0110 10:41:42.017798 139944007546688 monitored_session.py:347] Collective ops may deadloc

[优化多工作器训练的性能]()

增加单批次的大小： 此处的批次大小指的是每个 GPU 上的批次大小。通常来说，最大的批次大小应该适应 GPU 的内存大小。

变量转换： 尽可能将变量转换成 tf.float。官方的 ResNet 模型包括了如何完成的样例。

使用集群通信： MultiWorkerMirroredStrategy 提供了好几种集群通信的实现.
- RING 实现了基于环状的集群，使用了 gRPC 作为跨主机通讯层。
- NCCL 使用了 英伟达的 NCCL 来实现集群。
- AUTO 将选择延后至运行时。

集群实现的最优选择不仅基于 GPU 的数量和种类，也基于集群间的通信网络。**想要覆盖自动的选项，需要指明 MultiWorkerMirroredStrategy 的构造器里的 communication 参数**，例如让 communication=tf.distribute.experimental.CollectiveCommunication.NCCL 

## 5.其他

[回到目录](#分布式训练)

★ 以下是使用 keras fit/compile 分布式策略的一些示例：

使用tf.distribute.MirroredStrategy 训练 [Transformer](https://github.com/tensorflow/models/blob/master/official/transformer/v2/transformer_main.py) 的示例。

使用tf.distribute.MirroredStrategy 训练 [NCF](https://github.com/tensorflow/models/blob/master/official/recommendation/ncf_keras_main.py) 的示例。

使用tf.distribute.MirroredStrategy 训练 [MNIST](https://www.tensorflow.org/tutorials/distribute/keras) 的示例。

使用tf.distribute.MirroredStrategy 训练 [ResNet50](https://github.com/tensorflow/models/blob/master/official/vision/image_classification/resnet_imagenet_main.py) 的示例。

使用❤tf.distribute.MultiWorkerMirroredStrategy❤训练 [MNIST](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras) 的示例。 

使用tf.distribute.TPUStrategy 训练 [ResNet50](https://github.com/tensorflow/tpu/blob/master/models/experimental/resnet50_keras/resnet50_tf2.py) 的示例。


★ 以下是一些使用自定义训练循环来分发策略的示例：

使用tf.distribute.MirroredStrategy 训练 [DenseNet](https://github.com/tensorflow/examples/blob/master/tensorflow_examples/models/densenet/distributed_train.py)

使用tf.distribute.MirroredStrategy 训练 [NMT](https://github.com/tensorflow/examples/blob/master/tensorflow_examples/models/nmt_with_attention/distributed_train.py)

[BERT](https://github.com/tensorflow/models/blob/master/official/bert/run_classifier.py) 使用 MirroredStrategy 和TPUStrategy来训练的例子。 此示例对于了解如何在分发训练过程中如何载入一个检测点和定期生成检查点特别有帮助。

[NCF](https://github.com/tensorflow/models/blob/master/official/recommendation/ncf_keras_main.py) 使用 MirroredStrategy 来启用 keras_use_ctl 标记。




