<a href="https://colab.research.google.com/github/sourcecode369/TensorFlow-2.0/blob/master/tensorflow_2.0_docs/TensorFlow%20Core/Guide/Accelerators/Distributed%20Training/TensorFlow_2_0_Distributed_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Overview 

> `tf.distribute.Strategy is a TensorFlow API to distribute training across multiple GPUs, multiple machines or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes.`

    tf.distribute.Strategy has been designed with these key goals in mind:

* `Easy to use and support multiple user segments, including researchers, ML engineers, etc.`
* `Provide good performance out of the box.`
* `Easy switching between strategies.`


> `Use tf.distribute.Strategy with a high-level API like Keras, and can also be used to distribute custom training loops (and, in general, any computation using TensorFlow).`


> `In TensorFlow 2.0, you can execute your programs eagerly, or in a graph using tf.function. tf.distribute.Strategy intends to support both these modes of execution. Although we discuss training most of the time in this guide, this API can also be used for distributing evaluation and prediction on different platforms.`

> `You can use tf.distribute.Strategy with very few changes to your code, because we have changed the underlying components of TensorFlow to become strategy-aware. This includes variables, layers, models, optimizers, metrics, summaries, and checkpoints.`

In [1]:
!pip install --upgrade tensorflow

Collecting tensorflow
[?25l  Downloading https://files.pythonhosted.org/packages/46/0f/7bd55361168bb32796b360ad15a25de6966c9c1beb58a8e30c01c8279862/tensorflow-2.0.0-cp36-cp36m-manylinux2010_x86_64.whl (86.3MB)
[K     |████████████████████████████████| 86.3MB 36.3MB/s 
Collecting tensorflow-estimator<2.1.0,>=2.0.0 (from tensorflow)
[?25l  Downloading https://files.pythonhosted.org/packages/95/00/5e6cdf86190a70d7382d320b2b04e4ff0f8191a37d90a422a2f8ff0705bb/tensorflow_estimator-2.0.0-py2.py3-none-any.whl (449kB)
[K     |████████████████████████████████| 450kB 39.4MB/s 
Collecting tensorboard<2.1.0,>=2.0.0 (from tensorflow)
[?25l  Downloading https://files.pythonhosted.org/packages/9b/a6/e8ffa4e2ddb216449d34cfcb825ebb38206bee5c4553d69e7bc8bc2c5d64/tensorboard-2.0.0-py3-none-any.whl (3.8MB)
[K     |████████████████████████████████| 3.8MB 32.4MB/s 
Installing collected packages: tensorflow-estimator, tensorboard, tensorflow
  Found existing installation: tensorflow-estimator 1.15.1
   

In [0]:
from __future__ import absolute_import, print_function, division, unicode_literals
import tensorflow as tf

#### Types of Strategies

> `tf.distribute.Strategy` intends to cover a number of use cases along different axes. Some of these combinations are currently supported and others will be added in the future. Some of these axes are:

* **Synchronous vs asynchronous training:** `These are two common ways of distributing training with data parallelism. In sync training, all workers train over different slices of input data in sync, and aggregating gradients at each step. In async training, all workers are independently training over the input data and updating variables asynchronously. Typically sync training is supported via all-reduce and async through parameter server architecture.`


* **Hardware platform:** `You may want to scale your training onto multiple GPUs on one machine, or multiple machines in a network (with 0 or more GPUs each), or on Cloud TPUs.`

![img](https://images.idgesg.net/images/article/2019/06/tensorflow-2-figure-5-100800680-orig.jpg)

#### Mirrorerd Strategy

In [3]:
mirrored_strategy = tf.distribute.MirroredStrategy()



In [4]:
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])



In [0]:
mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops = tf.distribute.HierarchicalCopyAllReduce()
)

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops = tf.distribute.NcclAllReduce()
)

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops = tf.distribute.CrossDeviceOps()
)

#### Central Storage Strategy

In [6]:
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()

INFO:tensorflow:ParameterServerStrategy with compute_devices = ('/device:CPU:0',), variable_device = '/device:CPU:0'


#### Multi Worker Mirrored Strategy 

In [7]:
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:CPU:0',), communication = CollectiveCommunication.AUTO


In [8]:
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL
)

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.RING
)

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.AUTO
)


INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:CPU:0',), communication = CollectiveCommunication.NCCL
INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:CPU:0',), communication = CollectiveCommunication.RING
INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:CPU:0',), communication = CollectiveCommunication.AUTO


#### TPU Strategy

In [0]:
# cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
# tf.config.experimental_connect_to_cluster(cluster_spec_or_resolver=cluster_resolver)
# tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
# tpu_strategy = tf.distribute.experimental.TPUStrategy(cluster_resolver)

#### Parameter Server Strategy

In [0]:
# ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

#### One Device Strategy

In [0]:
one_device = tf.distribute.OneDeviceStrategy(device="/gpu:0")

### Using tf.distribute.Strategy with tf.keras

In [12]:
!pip install tensorflow==2.0.0



In [13]:
import tensorflow as tf
print(tf.__version__)

2.0.0


In [0]:
dataset = tf.data.Dataset.from_tensor_slices(([1.], [1.])).repeat(100).batch(10)

In [15]:
next(iter(dataset))

(<tf.Tensor: id=14, shape=(10,), dtype=float32, numpy=array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], dtype=float32)>,
 <tf.Tensor: id=15, shape=(10,), dtype=float32, numpy=array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], dtype=float32)>)

In [16]:
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
    model = tf.keras.Sequential([
                                 tf.keras.layers.Dense(1, input_shape=(1,))
    ])
    model.compile(loss="mse", optimizer="sgd")



In [17]:
%%time
model.fit(dataset, epochs=5,verbose=2)
model.evaluate(dataset,verbose=0)

Epoch 1/5
10/10 - 2s - loss: 4.6459
Epoch 2/5
10/10 - 0s - loss: 2.1205
Epoch 3/5
10/10 - 0s - loss: 0.9372
Epoch 4/5
10/10 - 0s - loss: 0.4143
Epoch 5/5
10/10 - 0s - loss: 0.1831
CPU times: user 820 ms, sys: 33.5 ms, total: 854 ms
Wall time: 2.84 s


In [0]:
model = tf.keras.Sequential([
                            tf.keras.layers.Dense(1, input_shape=(1,))
    ])
model.compile(loss="mse", optimizer="sgd")

In [19]:
%%time
model.fit(dataset, epochs=5,verbose=2)
model.evaluate(dataset,verbose=0)

Epoch 1/5
10/10 - 0s - loss: 4.0343
Epoch 2/5
10/10 - 0s - loss: 1.8413
Epoch 3/5
10/10 - 0s - loss: 0.8139
Epoch 4/5
10/10 - 0s - loss: 0.3597
Epoch 5/5
10/10 - 0s - loss: 0.1590
CPU times: user 370 ms, sys: 16 ms, total: 386 ms
Wall time: 394 ms


In [0]:
import numpy as np
inputs, targets = np.ones((100,1)), np.ones((100,1))

In [21]:
%%time
model.fit(inputs, targets,epochs=5,batch_size=10)

Train on 100 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
CPU times: user 231 ms, sys: 11.9 ms, total: 243 ms
Wall time: 250 ms


<tensorflow.python.keras.callbacks.History at 0x7f1c54529048>

In [0]:
# compute global batch size using number of replicas

In [0]:
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA * mirrored_strategy.num_replicas_in_sync)

dataset = tf.data.Dataset.from_tensor_slices(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

In [0]:
LEARNING_RATES_BY_BATCH_SIZE = {5:.01, 10:0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

#### Current `tf.distribute.Strategy` support for Keras

* **Mirrored Strategy** -> *Supported*

* **Central Storage Strategy** -> *Experimental Supported*

* **Multi Worker Mirrored Strategy** -> *Experimental Supported*

* **TPU Strategy** -> *Experimental Supported*

* **Parameter Server Strategy** -> *Support planned post 2.0*

* **One Device Strategy** -> *Supported*

### Using tf.distribute.Strategy with custom training loops

In [46]:
!pip install --upgrade tensorflow

Requirement already up-to-date: tensorflow in /usr/local/lib/python3.6/dist-packages (2.0.0)


In [0]:
with mirrored_strategy.scope():
    model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

global_batch_size = 10 * mirrored_strategy.num_replicas_in_sync

def input_fn():
    dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
    global_batch_size)
    dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
    return dist_dataset

dataset = input_fn()

@tf.function
def train_step(dist_inputs):
    def step_fn(inputs):
        features, labels = inputs
        with tf.GradientTape() as tape:
            logits = model(features)
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
                logits=logits, labels=labels
            )
            loss = tf.reduce_sum(cross_entropy) * (1.0 / global_batch_size)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
        return cross_entropy

    per_example_losses = mirrored_strategy.experimental_run_v2(
        step_fn, args=(dist_inputs,)
    )
    mean_loss = mirrored_strategy.reduce(
        tf.distribute.ReduceOp.MEAN, per_example_losses, axis=0
    )    
    return mean_loss

# with mirrored_strategy.scope():
#     for input in dataset:
#         train_step(inputs)

In [0]:
with mirrored_strategy.scope():
    model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
    optimizer = tf.keras.optimizers.SGD()


In [0]:
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

In [0]:
@tf.function
def train_step(dist_inputs):
    def step_fn(inputs):
        features, labels = inputs
        with tf.GradientTape() as tape:
            logits = model(features)
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
                logits=logits, labels=labels
            )
            loss = tf.reduce_sum(cross_entropy) * (1.0 / global_batch_size)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
        return cross_entropy

    per_example_losses = mirrored_strategy.experimental_run_v2(
        step_fn, args=(dist_inputs,)
    )
    mean_loss = mirrored_strategy.reduce(
        tf.distribute.ReduceOp.MEAN, per_example_losses, axis=0
    )
    return mean_loss

In [0]:
# with mirrored_strategy.scope():
#     for inputs in dist_dataset:
#         print(train_step(inputs))

In [0]:
# with mirrored_strategy.scope():
#     iterator = iter(dist_dataset)
#     for _ in range(10):
#         print(train_step(next(iterator)))

#### Using tf.distribute.Strategy with Estimator

In [30]:
mirrored_strategy = tf.distribute.MirroredStrategy(cross_device_ops=tf.distribute.HierarchicalCopyAllReduce)
config = tf.estimator.RunConfig(train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer = "SGD",
    config=config
)

INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpu8equ7hc', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f1c52b48e48>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7f1c52b48e48>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object 

In [0]:
# def input_fn():
#   dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
#   return dataset.repeat(1000).batch(10)
# regressor.train(input_fn=input_fn, steps=10)
# regressor.evaluate(input_fn=input_fn, steps=10)

### Setting up the TF_CONFIG environment variable

In [0]:
import os
import json
os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})