In [1]:
# tf.distribute.Strategy
## High level APIs
## Custom training loops
## Tensorflow 2 eager mode & graph mode
## Supported on multiple configurations
## Convenient to use with little to no code changes

In [2]:
# Commonly used terms
## Device : CPU, GPU, TPU
## Replica : copy of the models variables on several devices
## Worker : software running on a device that's dedicated to traning the replica
## Mirrored variable : some variables that you want to be in sync across all devices.  
# ###                   So The variables within these models that we want to keep in sync across all of the devices we'll call mirrored variables

In [3]:
# Classifying strategies
## Hardware platforms : single - machine multi - device, Multi - machine
## Training : 
# ### Synchronous (all - reduce) : all workers train over different slices of input data in sync with each other
#                                  they'll aggregate gradients at each step using an all-reduce algorithm
# ### Asynchronous (Parameter Server) : all workers are independently training over the input data 
#                                        and they'are updating that variables asynchronously

In [4]:
# MirrorStrategy: Single machine multi GPU, creates a replica per GPU, Each variable is mirrored, All-reduce across devices
# TPUStrategy : Same as MirroredStrategy, All-reduce across TPU cores
# MultiWorkerMirroredStrategy : Multi machine multi GPU, Replicates variables per device across workers, All-reduce based on hardware, network topology, tensor sizes
# CentralStorageStrategy : Variables are not mirrored (Instead placed on the CPU), Done in-memory on a device
# ParameterServerStragey : Some machines designated as workers, Some others as parameter servers
# DefaultStrategy : Simple Passthrough
# OneDeviceStrategy : Single device

In [5]:
# Mirrored Strategy (Code Changes)
## Model declaration
## Data Preprocessing

In [15]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os 



In [16]:
datasets , info = tfds.load(name = 'mnist', with_info = True, as_supervised = True)
mnist_train, mnist_test = datasets['train'], datasets['test']


In [7]:
# model = tf.keras.Sequential([ 
#     tf.keras.layers.Conv2D(32, 3, activation = 'relu', input_shape = (28, 28, 1)), 
#     tf.keras.layers.MaxPooling2D(), 
#     tf.keras.layers.Flatten(), 
#     tf.keras.layers.Dense(64, activation = 'relu'), 
#     tf.keras.layers.Dense(10)
# ])

# model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True), 
# optimizer = tf.keras.optimizers.Adam(), 
# metrics = ['accuracy'])

2022-01-07 11:57:26.541045: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2022-01-07 11:57:26.541217: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-01-07 11:57:26.542138: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


In [None]:
# def scale(image, label): 
#     image = tf.cast(image, tf.float32)
#     image /= 255.0 
#     return image

# num_train_examples = info.splits['train'].num_examples
# num_test_examples = info.splits['test'].num_examples

# BUFFER_SIZE = 10000
# BATCH_SIZE = 64

# train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
# eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

In [None]:
# Distributed Training : Mirrored

In [17]:
strategy = tf.distribute.MirroredStrategy()
print(strategy.num_replicas_in_sync)





INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


1


In [26]:
def scale(image, label): 
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

In [27]:
with strategy.scope():
    model = tf.keras.Sequential([ 
        tf.keras.layers.Conv2D(32, 3, activation = 'relu', input_shape = (28, 28, 1)), 
        tf.keras.layers.MaxPooling2D(), 
        tf.keras.layers.Flatten(), 
        tf.keras.layers.Dense(64, activation = 'relu'), 
        tf.keras.layers.Dense(10, activation = 'softmax')
    ])

model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True), 
optimizer = tf.keras.optimizers.Adam(), 
metrics = ['accuracy'])


In [28]:
model.fit(train_dataset, epochs=12)

2022-01-07 12:36:59.576722: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:454] The `assert_cardinality` transformation is currently not handled by the auto-shard rewrite and will be removed.


Epoch 1/12
Epoch 2/12
Epoch 3/12
Epoch 4/12
Epoch 5/12
Epoch 6/12
195/938 [=====>........................] - ETA: 42s - loss: 0.0300 - accuracy: 0.9915

In [None]:
# Training across local GPUs
## Each variable in the model is mirrored across all replicas
## Variables are treated as MirroredVariable
## Synchronization done with NVIDIA NCCL