# Custom training with tf.distribute.Strategy

## Imports

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
import tensorflow_hub as hub

import numpy as np
import os
from tqdm import tqdm

## Download the dataset

In [2]:
import tensorflow_datasets as tfds
tfds.disable_progress_bar()

In [3]:
splits = ['train[:80%]', 'train[80%:90%]', 'train[90%:]']

(train_examples, validation_examples, test_examples), info = tfds.load('oxford_flowers102', with_info=True, as_supervised=True, split = splits, data_dir='data/')

num_examples = info.splits['train'].num_examples
num_classes = info.features['label'].num_classes

## Create a strategy to distribute the variables and the graph

In [4]:
strategy = tf.distribute.MirroredStrategy()





INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [5]:
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

Number of devices: 1


## Setup input pipeline

In [6]:
BUFFER_SIZE = num_examples
EPOCHS = 10
pixels = 224
MODULE_HANDLE = 'data/resnet_50_feature_vector'
IMAGE_SIZE = (pixels, pixels)
print("Using {} with input size {}".format(MODULE_HANDLE, IMAGE_SIZE))

Using data/resnet_50_feature_vector with input size (224, 224)


In [7]:
def format_image(image, label):
    image = tf.image.resize(image, IMAGE_SIZE) / 255.0
    return  image, label

## Set the global batch size

In [10]:
def set_global_batch_size(batch_size_per_replica, strategy):
    global_batch_size = batch_size_per_replica * strategy.num_replicas_in_sync    
    return global_batch_size

In [11]:
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = set_global_batch_size(BATCH_SIZE_PER_REPLICA, strategy)

print(GLOBAL_BATCH_SIZE)

64


In [12]:
train_batches = train_examples.shuffle(num_examples // 4).map(format_image).batch(BATCH_SIZE_PER_REPLICA).prefetch(1)
validation_batches = validation_examples.map(format_image).batch(BATCH_SIZE_PER_REPLICA).prefetch(1)
test_batches = test_examples.map(format_image).batch(1)

## Define the distributed datasets

In [15]:
def distribute_datasets(strategy, train_batches, validation_batches, test_batches):
    train_dist_dataset = strategy.experimental_distribute_dataset(train_batches)
    val_dist_dataset = strategy.experimental_distribute_dataset(validation_batches)
    test_dist_dataset = strategy.experimental_distribute_dataset(test_batches)    
    return train_dist_dataset, val_dist_dataset, test_dist_dataset

In [16]:
train_dist_dataset, val_dist_dataset, test_dist_dataset = distribute_datasets(strategy, train_batches, validation_batches, test_batches)

In [17]:
print(type(train_dist_dataset))
print(type(val_dist_dataset))
print(type(test_dist_dataset))

<class 'tensorflow.python.distribute.input_lib.DistributedDataset'>
<class 'tensorflow.python.distribute.input_lib.DistributedDataset'>
<class 'tensorflow.python.distribute.input_lib.DistributedDataset'>


In [18]:
x = iter(train_dist_dataset).get_next()
    
print(f"x is a tuple that contains {len(x)} values ")
print(f"x[0] contains the features, and has shape {x[0].shape}")
print(f"  so it has {x[0].shape[0]} examples in the batch, each is an image that is {x[0].shape[1:]}")
print(f"x[1] contains the labels, and has shape {x[1].shape}")

Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.


Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.


x is a tuple that contains 2 values 
x[0] contains the features, and has shape (64, 224, 224, 3)
  so it has 64 examples in the batch, each is an image that is (224, 224, 3)
x[1] contains the labels, and has shape (64,)


## Create the model

In [19]:
class ResNetModel(tf.keras.Model):
    def __init__(self, classes):
        super(ResNetModel, self).__init__()
        self._feature_extractor = hub.KerasLayer(MODULE_HANDLE,
                                                 trainable=False) 
        self._classifier = tf.keras.layers.Dense(classes, activation='softmax')

    def call(self, inputs):
        x = self._feature_extractor(inputs)
        x = self._classifier(x)
        return x

In [20]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

## Define the loss function

In [21]:
with strategy.scope():
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(reduction=tf.keras.losses.Reduction.NONE)
    def compute_loss(labels, predictions):
        per_example_loss = loss_object(labels, predictions)
        return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
    
    test_loss = tf.keras.metrics.Mean(name='test_loss')

## Define the metrics to track loss and accuracy

In [22]:
with strategy.scope():
    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
        name='train_accuracy')
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
        name='test_accuracy')

## Instantiate the model, optimizer, and checkpoints

In [23]:
with strategy.scope():
    model = ResNetModel(classes=num_classes)
    optimizer = tf.keras.optimizers.Adam()
    checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

## Training loop

In [24]:
def train_test_step_fns(strategy, model, compute_loss, optimizer, train_accuracy, loss_object, test_loss, test_accuracy):
    with strategy.scope():
        def train_step(inputs):
            images, labels = inputs

            with tf.GradientTape() as tape:
                predictions = model(images, training=True)
                loss = compute_loss(labels, predictions)

            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))

            train_accuracy.update_state(labels, predictions)
            return loss 

        def test_step(inputs):
            images, labels = inputs
            
            predictions = model(images, training=False)
            t_loss = compute_loss(labels, predictions)

            test_loss.update_state(t_loss)
            test_accuracy.update_state(labels, predictions)
        
        return train_step, test_step

In [25]:
train_step, test_step = train_test_step_fns(strategy, model, compute_loss, optimizer, train_accuracy, loss_object, test_loss, test_accuracy)

## Distributed training and testing

In [26]:
def fun1(args=()):
    print(f"number of arguments passed is {len(args)}")
    
    
list_of_inputs = [1,2]
print("When passing in args=list_of_inputs:")
fun1(args=list_of_inputs)
print()
print("When passing in args=(list_of_inputs)")
fun1(args=(list_of_inputs))
print()
print("When passing in args=(list_of_inputs,)")
fun1(args=(list_of_inputs,))

When passing in args=list_of_inputs:
number of arguments passed is 2

When passing in args=(list_of_inputs)
number of arguments passed is 2

When passing in args=(list_of_inputs,)
number of arguments passed is 1


In [33]:
def distributed_train_test_step_fns(strategy, train_step, test_step, model, compute_loss, optimizer, train_accuracy, loss_object, test_loss, test_accuracy):
    with strategy.scope():
        @tf.function
        def distributed_train_step(dataset_inputs):
            per_replica_losses = strategy.run(train_step,args=(dataset_inputs,))
            return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                                   axis=None)

        @tf.function
        def distributed_test_step(dataset_inputs):
            return strategy.run(test_step, args=(dataset_inputs,))
    
        return distributed_train_step, distributed_test_step

In [34]:
distributed_train_step, distributed_test_step = distributed_train_test_step_fns(strategy, train_step, test_step, model, compute_loss, optimizer, train_accuracy, loss_object, test_loss, test_accuracy)

## Run the distributed training in a loop

In [35]:
with strategy.scope():
    for epoch in range(EPOCHS):
        total_loss = 0.0
        num_batches = 0
        for x in tqdm(train_dist_dataset):
            total_loss += distributed_train_step(x)
            num_batches += 1
        train_loss = total_loss / num_batches

        for x in test_dist_dataset:
            distributed_test_step(x)

        template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
                    "Test Accuracy: {}")
        print (template.format(epoch+1, train_loss,
                               train_accuracy.result()*100, test_loss.result(),
                               test_accuracy.result()*100))

        test_loss.reset_states()
        train_accuracy.reset_states()
        test_accuracy.reset_states()

13it [01:51,  8.54s/it]
0it [00:00, ?it/s]

Epoch 1, Loss: 4.595747947692871, Accuracy: 6.740196228027344, Test Loss: 0.05948949232697487, Test Accuracy: 15.686275482177734


13it [01:42,  7.87s/it]
0it [00:00, ?it/s]

Epoch 2, Loss: 2.53836989402771, Accuracy: 50.98039627075195, Test Loss: 0.04145458713173866, Test Accuracy: 48.039215087890625


13it [01:40,  7.73s/it]
0it [00:00, ?it/s]

Epoch 3, Loss: 1.4094295501708984, Accuracy: 85.6617660522461, Test Loss: 0.032817449420690536, Test Accuracy: 62.74510192871094


13it [01:43,  7.97s/it]
0it [00:00, ?it/s]

Epoch 4, Loss: 0.822101891040802, Accuracy: 94.9754867553711, Test Loss: 0.027745109051465988, Test Accuracy: 62.74510192871094


13it [01:41,  7.77s/it]
0it [00:00, ?it/s]

Epoch 5, Loss: 0.5378504991531372, Accuracy: 97.30392456054688, Test Loss: 0.024688323959708214, Test Accuracy: 64.70588684082031


13it [01:44,  8.05s/it]
0it [00:00, ?it/s]

Epoch 6, Loss: 0.3714151680469513, Accuracy: 98.4068603515625, Test Loss: 0.023377155885100365, Test Accuracy: 65.68627166748047


13it [01:43,  7.93s/it]
0it [00:00, ?it/s]

Epoch 7, Loss: 0.2776481807231903, Accuracy: 99.38725280761719, Test Loss: 0.021725326776504517, Test Accuracy: 66.66667175292969


13it [01:44,  8.04s/it]
0it [00:00, ?it/s]

Epoch 8, Loss: 0.21669873595237732, Accuracy: 99.75489807128906, Test Loss: 0.02106780931353569, Test Accuracy: 69.60784149169922


13it [01:43,  7.96s/it]
0it [00:00, ?it/s]

Epoch 9, Loss: 0.17298102378845215, Accuracy: 99.87745666503906, Test Loss: 0.02041405811905861, Test Accuracy: 71.5686264038086


13it [01:39,  7.65s/it]


Epoch 10, Loss: 0.14275483787059784, Accuracy: 99.87745666503906, Test Loss: 0.019934987649321556, Test Accuracy: 68.62745666503906


# Save the Model

In [36]:
model_save_path = "./tmp/mymodel/1/"
tf.saved_model.save(model, model_save_path)

Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


INFO:tensorflow:Assets written to: ./tmp/mymodel/1/assets


INFO:tensorflow:Assets written to: ./tmp/mymodel/1/assets


## Zip the SavedModel Directory

In [37]:
import os
import zipfile

def zipdir(path, ziph):
    for root, dirs, files in os.walk(path):
        for file in files:
            ziph.write(os.path.join(root, file))

zipf = zipfile.ZipFile('./mymodel.zip', 'w', zipfile.ZIP_DEFLATED)
zipdir('./tmp/mymodel/1/', zipf)
zipf.close()

## References
##### Coursera: Custom and Distributed Training with TensorFlow [course](https://www.coursera.org/learn/custom-distributed-training-with-tensorflow).