# Distribution strategies

---

## Mirrored strategy

In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
strategy=tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [8]:
strategy.num_replicas_in_sync

1

#### data

In [12]:
datasets,info=tfds.load(name='mnist',with_info=True, as_supervised=True)
train, test = datasets['train'],datasets['test']

In [14]:
num_train=info.splits['train'].num_examples
num_test=info.splits['test'].num_examples
BUFFER_SIZE=10000
BATCH_PER_REPLICA=64
BATCH_SIZE=BATCH_PER_REPLICA*strategy.num_replicas_in_sync

In [17]:
def scale(image,label):
    image=tf.cast(image,tf.float32)
    image=image/255
    return image, label

In [18]:
train=train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test=test.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

#### model

In [9]:
with strategy.scope():
    model=tf.keras.Sequential([
        tf.keras.layers.Conv2D(32,3,activation='relu',input_shape=(28,28,1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64,activation='relu'),
        tf.keras.layers.Dense(10,activation='softmax')
    ])

In [10]:
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])

In [20]:
model.fit(train,epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x250c67d16d0>

#### with no strategy:

In [21]:
model=tf.keras.Sequential([
    tf.keras.layers.Conv2D(32,3,activation='relu',input_shape=(28,28,1)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(64,activation='relu'),
    tf.keras.layers.Dense(10,activation='softmax')
])
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])

In [22]:
model.fit(train,epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x250c68540d0>

---

## Multiple GPU mirrored strategy

In [38]:
os.environ['TF_MIN_GPU_MULTIPROCESSOR_COUNT']='4'

In [40]:
strategy=tf.distribute.MirroredStrategy(cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [41]:
strategy.num_replicas_in_sync

1

#### data

In [29]:
datasets,info=tfds.load(name='mnist',with_info=True, as_supervised=True)
train, test = datasets['train'],datasets['test']

In [30]:
num_train=info.splits['train'].num_examples
num_test=info.splits['test'].num_examples
BUFFER_SIZE=10000
BATCH_PER_REPLICA=64
BATCH_SIZE=BATCH_PER_REPLICA*strategy.num_replicas_in_sync

In [31]:
def scale(image,label):
    image=tf.cast(image,tf.float32)
    image=image/255
    return image, label

In [32]:
train=train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test=test.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

In [33]:
train_dist=strategy.experimental_distribute_dataset(train)
test_dist=strategy.experimental_distribute_dataset(test)

#### model

In [42]:
def create_model():
    model=tf.keras.Sequential([
        tf.keras.layers.Conv2D(32,3,activation='relu',input_shape=(28,28,1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64,activation='relu'),
        tf.keras.layers.Dense(10,activation='softmax')
    ])
    return model

In [45]:
with strategy.scope():
    loss_object=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)
    
    def compute_loss(y,y_pred):
        example_loss=loss_object(y,y_pred)
        return tf.nn.compute_average_loss(example_loss,global_batch_size=BATCH_SIZE)
        
    test_loss=tf.keras.metrics.Mean(name='test_loss')
    train_acc=tf.keras.metrics.SparseCategoricalAccuracy(name='train_acc')
    test_acc=tf.keras.metrics.SparseCategoricalAccuracy(name='test_acc')

    optimizer=tf.keras.optimizers.Adam()
    model=create_model()

In [35]:
def train_step(inputs):
    images,labels=inputs
    with tf.GradientTape() as tape:
        preds=model(images,training=True)
        loss=compute_loss(labels,preds)
    gradients=tape.gradient(loss,model.trainable_variables)
    optimizer.apply_gradients(zip(gradients,model.trainable_variables))
    train_acc.update_state(labels,preds)
    return loss

In [34]:
@tf.function
def ditributed_train_step(data):
    per_replica_loss=strategy.run(train_step,args=(data,))
    tf.print(per_replica_loss)
    foo=strategy.reduce(tf.distribute.ReduceOp.SUM,per_replica_loss,axis=None)
    tf.print(foo)
    return foo

In [46]:
def test_step(inputs):
    images,labels=inputs
    with tf.GradientTape() as tape:
        preds=model(images,training=True)
        loss=compute_loss(labels,preds)
    test_loss.update_state(loss)
    test_acc.update_state(labels,preds)
    return loss

In [47]:
@tf.function
def ditributed_test_step(data):
    per_replica_loss=strategy.run(train_step,args=(data,))
    return per_replica_loss

In [None]:
epochs=5
for _ in range(epochs):
    loss=0.0
    num_batches=0
    for batch in train_dist:
        loss+=ditributed_train_step(batch)
        num_batches+=1
    train_loss=loss/num_batches
    for batch in test_dist:
        ditributed_test_step(batch)

---