This tutorial guides you on how to use the `tf.distribute.Strategy` APIs to train a `tf2.keras` model on a multiple workers distributed architecture.

References:
* Multi-worker training with Keras: https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
* Distributed Traning in Tensorflow: https://www.tensorflow.org/guide/distributed_training

# Environment Settings

In [None]:
# !apt-get update && apt-get install -y iputils-ping net-tools

In [None]:
!ifconfig

In [None]:
# !pip install -q tf-nightly
# !pip install -q tensorflow_datasets

In [12]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os
import json

tfds.disable_progress_bar()

print("Tensorflow Version: {}".format(tf.__version__))
print("Eager Mode: {}".format(tf.executing_eagerly()))
print("GPU {} available.".format("is" if tf.config.experimental.list_physical_devices("GPU") else "not"))

Tensorflow Version: 2.1.0
Eager Mode: True
GPU not available.


# Multi-Workers Configuration

In Tensorflow, `TF_CONFIG` environment variable is required for training on multiple machines, each of which has different roles. It is also used to specify the cluster configuration on each worker that is a part of the cluster.

In `TF_CONFIG`, a JSON object, two main components are required, `cluster` and `task`. The `cluster` provides information about the training cluster, which is a dictionary consisting of different types of jobs such as `workers`. In general, a worker is the computing core for training. One of which is much special and is responsible for saving the checkpoints and other information for monitoring the progress in Tensorboard. This worker is also called a `chief` worker which is at index 0 by default.  The other `task` tag provides information about the current node.

Each worker requires a copy of the `TF_CONFIG` environment variable.

A simple example is below.

```python
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})
```

The following is an example to simulate the multiple workers' environment on the Docker containers.

```bash
docker pull tensorflow/tensorflow:latest-py3-jupyter

# worker 1
# ports:
# |- 8888: jupyter notebook
# |- 6006: Tensorboard
# |- 12345: communicated with other workers
docker run --net=bridge --name worker1 -p 8889:8888 -p 6007:6006 -p 12345:12345 tensorflow/tensorflow:latest-py3-jupyter

# worker 2
docker run --net=bridge --name worker2 -p 8890:8888 -p 12346:12346 tensorflow/tensorflow:latest-py3-jupyter
```

You can also use the docker-compose file (docker-compose.yml) to define the configuration of model training.

```yml
version: "3"
services:
  worker1:
    image: tensorflow/tensorflow:latest-py3-jupyter
    container_name: worker1
    ports: 
      - "8889:8888"
      - "6007:6006"
      - "12345:12345"
    volumes: 
      - "/Users/jiankaiwang/devops/distributed_training:/tf/distributed_training"
  worker2:
    image: tensorflow/tensorflow:latest-py3-jupyter
    container_name: worker2
    ports:
      - "8890:8888"
      - "12346:12345"
    volumes: 
      - "/Users/jiankaiwang/devops/distributed_training:/tf/distributed_training"
```

You can start the docker containers.

```sh
# simple usage
docker-compose -f docker-compose.yml up
docker-compose -f docker-compose.yml ps
```

The following is the example output.

```sh
# worker 1
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 172.17.0.2  netmask 255.255.0.0  broadcast 172.17.255.255
        ether 02:42:ac:11:00:02  txqueuelen 0  (Ethernet)
        RX packets 3773  bytes 4867971 (4.8 MB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 2233  bytes 6349433 (6.3 MB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

# worker 2
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 172.17.0.3  netmask 255.255.0.0  broadcast 172.17.255.255
        ether 02:42:ac:11:00:03  txqueuelen 0  (Ethernet)
        RX packets 3704  bytes 4865869 (4.8 MB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 2043  bytes 6327067 (6.3 MB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0
```

In [None]:
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["172.19.0.2:12345", "172.19.0.3:12345"]
    },
    'task': {'type': 'worker', 'index': 0}
})

# Choose the Right Strategy

In Tensorflow, multiple workers distributed training consists of two training types, synchronous and asynchronous ones. A synchronous training is synced on the steps across the workers and replicas. On the other side, An asynchronous one is not strictly on the steps.

The `tf.distribute.experimental.MultiWorkerMirroredStrategy` API is the recommended way for synchronous multi-workers training.

In [None]:
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

**If you see `untimeError: Collective ops must be configured at program startup`, try creating the instance of MultiWorkerMirroredStrategy at the beginning of the program and put the code that may create ops after the strategy is instantiated.** (Try to run the above script right after importing the Tensorflow library.)

# Preparing Datasets

In this tutorial, you are going to build a CNN model on the FASHION MNIST dataset.

In [None]:
BUFFER_SIZE = int(1e6)
BATCH_SIZE = 64

def make_datasets_unbatched(data_label='train', repeat=False):
  def normalize(image, label):
    img = tf.cast(image, tf.float32)
    img /= 255.0
    return img, label

  datasets, info = tfds.load(name='fashion_mnist', with_info=True, as_supervised=True)
  if repeat:
    return datasets[data_label].repeat(repeat).map(normalize).cache().shuffle(BUFFER_SIZE)
  else:
    return datasets[data_label].map(normalize).cache().shuffle(BUFFER_SIZE)

train_datasets = make_datasets_unbatched('train').batch(BATCH_SIZE)

# Build a TF2.Keras Model

In [None]:
def build_model():
  def _model(inputs):
    x = tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='elu')(inputs)
    x = tf.keras.layers.MaxPool2D()(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(units=64, activation='elu')(x)
    outputs = tf.keras.layers.Dense(units=10, activation='softmax', name='class_result')(x)
    return outputs

  inputs = tf.keras.Input(shape=(28, 28, 1))
  outputs = _model(inputs)
  model = tf.keras.Model(inputs, outputs)

  model.compile(loss=tf.keras.losses.sparse_categorical_crossentropy, 
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

  return model

Let's try to train a model on a single device first.

In [None]:
single_worker_model = build_model()
single_worker_model.fit(train_datasets, epochs=3, steps_per_epoch=5)

# Train the Model with MultiWorkerMirroredStrategy

When you are going to integrate the `TF2.Keras` model with `tf.distribute.Strategy`, the only change is to enclose the model building and compiling (`model.compile()`) inside the scope of the strategy(`strategy.scope()`).

Since `MultiWorkerMirroredStrategy` does not support last partial batch handling, pass the `steps_per_epoch` argument to ``model.fit().

**Here you have to set up the data sharding policy to `DATA` or `OFF`. If `DATA` is set up, each worker would process the whole dataset and discard those not for itself. If `OFF` is set up, each worker would receive the whole dataset. However, if the `AUTO` is set up (by default), the training goes failed. The AUTO mode would first separate the data into several parts (`FILE` mode), so you have to make sure enough datasets provided. otherwise, an error occurred.**

In [None]:
NUM_WORKERS = 2
GLOBAL_BATCH_SIZE = 32 * NUM_WORKERS
EPOCHS = 3

with strategy.scope():
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_datasets = make_datasets_unbatched('train', EPOCHS).batch(GLOBAL_BATCH_SIZE)
  train_datasets_no_auto_shard = train_datasets.with_options(options)  

  # build and compile the model under the strategy scope
  # Tensorflow would auto decides which devices the variables were placed.
  multi_worker_model = build_model()
    
multi_worker_model.fit(train_datasets_no_auto_shard, epochs=EPOCHS, steps_per_epoch=5)

## Dataset Sharding and Batch Size

The `tf.distribute.Strategy` APIs would take care of data sharding automatically in multi-worker training during the `model.fit()`.

You can also manual sharding for your datasets.

```python
options = tf.data.Options()

# you can also set the policy to one of `OFF`, `AUTO`, `DATA`, `FILE`
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

train_datasets_no_auto_shard = train_datasets.with_options(options)
```

# Fault Tolerance

The `tf.distribute.Strategy` APIs come with advanced fault tolerance by preserving the training state. If one worker fails or unstable, the state would be recovered after it is reset. Such an advantage relies on the syncing of the global step. However, the training continues after the failing worker was recovered. 

## ModelCheckpoint Callback

In [None]:
callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath='/tmp/keras-ckpt')]

with strategy.scope():
  multi_worker_ckpt_model = build_model()

multi_worker_ckpt_model.fit(train_datasets_no_auto_shard, 
                            epochs=3, 
                            steps_per_epoch=5, 
                            callbacks=callbacks)

If a worker gets preempted, the whole training progress pauses until the preempted worker is restarted. You can inspect the checkpoint files, the chief worker takes responsibility for the model saving and the other workers keep the temp training state.