# Distributed training with Horovod
This notebook shows how to train and host a Keras Sequential model on SageMaker. The model used for this notebook is a simple deep CNN that was extracted from [the Keras examples](https://github.com/keras-team/keras/blob/master/examples/cifar10_cnn.py).

## The dataset
The [CIFAR-10 dataset](https://www.cs.toronto.edu/~kriz/cifar.html) is one of the most popular machine learning datasets. It consists of 60,000 32x32 images belonging to 10 different classes (6,000 images per class). Here are the classes in the dataset, as well as 10 random images from each:

![cifar10](https://maet3608.github.io/nuts-ml/_images/cifar10.png)

In this tutorial, we will train a deep CNN to recognize these images.

We'll compare trainig with file mode, pipe mode datasets and distributed training with Horovod

## Set up the environment

In [28]:
import os
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()

In [29]:
bucket = '<BUCKET NAME>'
prefix = 'sagemaker/script-mode-distributed'

## Download the CIFAR-10 dataset
Downloading the test and training data takes around 5 minutes.

In [30]:
#!pip install wget
# import wget # for TF2

In [31]:
!python generate_cifar10_tfrecords_v2.py --data-dir data/

Download from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz and extract.
data/
100% [..................................................] 170498071 / 170498071Generating data//train/train.tfrecords
Generating data//validation/validation.tfrecords
Generating data//eval/eval.tfrecords
Done!


### Uploading the data to s3

In [32]:
dataset_location = sagemaker_session.upload_data(bucket=bucket, path='data', key_prefix=prefix)
display(dataset_location)

's3://demo-saeed/sagemaker/script-mode-distributed'

### Configuring metrics from the job logs
SageMaker can get training metrics directly from the logs and send them to CloudWatch metrics.

In [33]:
keras_metric_definition = [
    {'Name': 'train:loss', 'Regex': '.*loss: ([0-9\\.]+) - acc: [0-9\\.]+.*'},
    {'Name': 'train:accuracy', 'Regex': '.*loss: [0-9\\.]+ - acc: ([0-9\\.]+).*'},
    {'Name': 'validation:accuracy', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: ([0-9\\.]+).*'},
    {'Name': 'validation:loss', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: ([0-9\\.]+) - val_acc: [0-9\\.]+.*'},
    {'Name': 'sec/steps', 'Regex': '.* - \d+s (\d+)[mu]s/step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: [0-9\\.]+'}
]

### Train image classification based on the cifar10 dataset

In [34]:
hyperparameters = {'epochs': 10, 'batch-size' : 256}

## Distributed training with horovod
Horovod is a distributed training framework based on MPI. Horovod is only available with TensorFlow version 1.12 or newer. You can find more details at Horovod README.

To enable Horovod, we need to add the following code to our script:
```python
import horovod.keras as hvd
hvd.init()
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))
```

Add the following callbacks:
```python
hvd.callbacks.BroadcastGlobalVariablesCallback(0)
hvd.callbacks.MetricAverageCallback()
```

Configure the optimizer:
```python
opt = Adam(lr=learning_rate * size, decay=weight_decay)
opt = hvd.DistributedOptimizer(opt)
```
Choose to save checkpoints and send TensorBoard logs only from the ```python hvd.rank() == 0``` instance

To start a distributed training job with Horovod, configure the job distribution:
```python
distributions = {'mpi': {
                    'enabled': True,
                    'processes_per_host': # Number of Horovod processes per host
                        }
                }
```

In [35]:
from sagemaker.tensorflow import TensorFlow

distributions = {'mpi': {
                    'enabled': True,
                    'processes_per_host': 1
                        }
                }
hyperparameters = {'epochs': 10, 'batch-size' : 256}

source_dir = os.path.join(os.getcwd(), 'source_dir')
estimator_dist = TensorFlow(base_job_name='dist-cifar10-tf',
                       entry_point='cifar10_keras_main.py',
                       source_dir=source_dir,
                       role=role,
                       framework_version='1.12.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=2, train_instance_type='ml.p3.2xlarge',
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'dist'}],
                       metric_definitions=keras_metric_definition,
                       distributions=distributions)

In [36]:
remote_inputs = {'train' : dataset_location+'/train', 'validation' : dataset_location+'/validation', 'eval' : dataset_location+'/eval'}
estimator_dist.fit(remote_inputs)

ResourceLimitExceeded: An error occurred (ResourceLimitExceeded) when calling the CreateTrainingJob operation: The account-level service limit 'ml.p3.2xlarge for training job usage' is 4 Instances, with current utilization of 4 Instances and a request delta of 2 Instances. Please contact AWS support to request an increase for this limit.

### Local TensorBoard command
Using TensorBoard we can compare the jobs we ran. The following command prints the TensorBoard command.
Run it in any environment where you have TensorBoard installed.  

In [24]:
!pwd

/root/MLAI/script-mode/keras_cifar10


In [25]:
!python source_dir/generate_tensorboard_command.py

AWS_REGION=us-east-1 tensorboard --logdir dist:"s3://sagemaker-us-east-1-079329190341/dist-cifar10-tf-2020-06-23-21-14-29-072/model",dist:"s3://sagemaker-us-east-1-079329190341/dist-cifar10-tf-2020-06-23-21-14-08-840/model"


You can install [TensorBoard](https://github.com/tensorflow/tensorboard) locally using `pip install tensorboard`.  
To access an S3 log directory, configure the TensorBoard default region. You can do this by configuring an environment variable named AWS_REGION, and setting the value of the environment variable to the AWS region your training jobs run in.  
For example, `export AWS_REGION = 'us-east-1'`

You can access TensorBoard locally at http://localhost:6006

Based on the TensorBoard metrics, we can see that:
1. All jobs run for 10 epochs (0 - 9).
2. File mode and Pipe mode runs for ~1 minute - Pipemode doesn't effect training performance.
3. Distributed mode runs for 45 seconds.
4. All of the training jobs resulted in similar validation accuracy.

This example uses a small dataset (179 MB). For larger datasets, pipemode can significantly reduce training time because it does not copy the entire dataset into local memory.

## Deploy the trained model

The deploy() method creates an endpoint that serves prediction requests in real-time.
The model saves keras artifacts, to use TensorFlow serving for deployment, you'll need to save the artifacts in SavedModel format.

We are using the solutions from the [deploy trained keras or tensorflow models using amazon sagemaker](https://aws.amazon.com/blogs/machine-learning/deploy-trained-keras-or-tensorflow-models-using-amazon-sagemaker/) blog post.

In [26]:
predictor = estimator_dist.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')

ValueError: Estimator is not associated with a training job

## Make some predictions
To verify the that the endpoint functions properly, we generate random data in the correct shape and get a prediction.

In [None]:
# Creating fake prediction data
import numpy as np
data = np.random.randn(1, 32, 32, 3)
print("Predicted class is {}".format(np.argmax(predictor.predict(data)['predictions'])))

### Calculating accuracy and create a confusion matrix based on the test dataset

Our endpoint works as expected, we'll now use the test dataset for predictions and calculate our model accuracy

In [None]:
from keras.datasets import cifar10
from keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import confusion_matrix
datagen = ImageDataGenerator()

(x_train, y_train), (x_test, y_test) = cifar10.load_data()

def predict(data):
    predictions = predictor.predict(data)['predictions']
    return predictions

In [None]:
batch_size = 128
predicted = []
actual = []
batches = 0
for data in datagen.flow(x_test,y_test,batch_size=batch_size):
    for i,prediction in enumerate(predict(data[0])):
        predicted.append(np.argmax(prediction))
        actual.append(data[1][i][0])
    batches += 1
    if batches >= len(x_test) / batch_size:
        break

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix

accuracy = accuracy_score(y_pred=predicted,y_true=actual)
display('Average accuracy: {}%'.format(round(accuracy*100,2)))

In [None]:
%matplotlib inline
import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt

cm = confusion_matrix(y_pred=predicted,y_true=actual)
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
sn.set(rc={'figure.figsize':(11.7,8.27)})
sn.set(font_scale=1.4)#for label size
sn.heatmap(cm, annot=True,annot_kws={"size": 10})# font size

Using this heatmap we can calculate the accuracy of each one of the labels

# Cleaning up
To avoid incurring charges to your AWS account for the resources used in this tutorial you need to delete the SageMaker Endpoint:

In [None]:
sagemaker_session.delete_endpoint(predictor.endpoint)