# [Module 4] Distributed training with horovod 
Horovod는 MPI(Message Passing Interface; 메세지 전달 인터페이스)를 기반으로 하는 분산 학습 프레임워크(distributed training framework)입니다. Horovod는 TensorFlow 버전 1.12 이상에서만 사용할 수 있습니다. 자세한 내용은 [Horovod README](https://github.com/uber/horovod)에서 확인할 수 있습니다.

Horovod를 활성화하려면 학습 스크립트를 약간 수정해야 합니다. 본 실습에서 이를 직접 수행해 보겠습니다.

## Create a training script that support Horovod distributed training


----
### TODO 1. Start horovod
Horovod에 대응하기 위해 `main()` 함수에 아래 코드를 추가합니다.

```python
    import horovod.keras as hvd
    hvd.init()
    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    config.gpu_options.visible_device_list = str(hvd.local_rank())
    K.set_session(tf.Session(config=config))
```

----
### TODO 2. Configure callbacks
`main()` 함수에서 callbacks을 추가합니다.

```python
    callbacks.append(hvd.callbacks.BroadcastGlobalVariablesCallback(0))
    callbacks.append(hvd.callbacks.MetricAverageCallback())
    callbacks.append(hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1))
```

`hvd.rank () == 0` 에서만 실행되도록 체크포인트 및 TensorBoard 콜백을 변경해 주세요.
```python
    if hvd.rank() == 0:
        callbacks.append(ModelCheckpoint(args.output_dir + '/checkpoint-{epoch}.h5'))
```

----
### TODO 3. Configure the optimizer
Horovod에 대응하기 위해 아래의 절차들을 진행합니다.


1) `keras_model_fn` 함수에 hvd 인수를 추가합니다.
```python
# Add hvd to the function. also add it in the function call
def keras_model_fn(learning_rate, weight_decay, optimizer, momentum, hvd): 
```

2) `size=1`을 `size=hvd.size()`로 변경해 주세요.

3) 코드를 아래와 같이 수정합니다.

```python
 model.compile(loss='categorical_crossentropy',
                  optimizer=opt,
                  metrics=['accuracy'])
```
바로 앞에
```python
opt = hvd.DistributedOptimizer(opt)
```
라인을 추가해 주세요.

4) `main()` 함수에서 model 인스턴스를 만들 때 hvd를 인수로 전달하도록 수정합니다.

```python
model = keras_model_fn(args.learning_rate, args.weight_decay, args.optimizer, args.momentum, hvd)
```


In [2]:
import os
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()

In [3]:
from sagemaker.tensorflow import TensorFlow

train_instance_type='ml.p3.8xlarge'
train_instance_count = 1
gpus_per_host = 4

num_of_shards = gpus_per_host * train_instance_count

In [4]:
sagemaker.__version__

'1.72.1'

### Data split for Horovod and upload to S3

For Horovod, we need a dedicated input channel for each Horovod worker. In this example, we will use a instance with 4 GPUs (**ml.p3.8xlarge**). So we will shard the train data into four tfrecord files as below.

In [7]:
%load_ext autoreload
#%reload_ext autoreload
%autoreload 2

from shard import do_shard
from sagemaker.session import s3_input
#from sagemaker.inputs import TrainingInput
from sagemaker.tensorflow import TensorFlow
def shard_data_and_upload(local_data_dir, num_of_shards):
    do_shard(local_data_dir, num_of_shards)
    dataset_location = sagemaker_session.upload_data(path='data', key_prefix='data/DEMO-cifar10-tf')
    display(dataset_location)

    shuffle_config = sagemaker.session.ShuffleConfig(234)
    #shuffle_config = sagemaker.inputs.ShuffleConfig(234)
    
    print("shuffle_config:",shuffle_config)
    train_s3_uri_prefix = dataset_location

    remote_inputs = {}

    for idx in range(num_of_shards):
        train_s3_uri = f'{train_s3_uri_prefix}/train/{idx}/'
        train_s3_input = s3_input(train_s3_uri, shuffle_config=shuffle_config)
        #train_s3_input=TrainingInput(train_s3_uri, shuffle_config=shuffle_config)
        
        remote_inputs[f'train_{idx}'] = train_s3_input
        
        remote_inputs['validation_{}'.format(idx)] = '{}/validation'.format(dataset_location)

    remote_inputs['validation'] = '{}/validation'.format(dataset_location)
    remote_inputs['eval'] = '{}/eval'.format(dataset_location)
    
    return remote_inputs

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [8]:
remote_inputs = shard_data_and_upload('./data', num_of_shards)

Generating ./data/train/0/train_0.tfrecords
Generating ./data/train/1/train_1.tfrecords
Generating ./data/train/2/train_2.tfrecords
Generating ./data/train/3/train_3.tfrecords


's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf'

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


shuffle_config: <sagemaker.session.ShuffleConfig object at 0x7ff8c00a0710>


In [9]:
remote_inputs

{'train_0': <sagemaker.inputs.s3_input at 0x7ff8c00a0a20>,
 'validation_0': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'train_1': <sagemaker.inputs.s3_input at 0x7ff8c00a0a58>,
 'validation_1': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'train_2': <sagemaker.inputs.s3_input at 0x7ff8b87c3c18>,
 'validation_2': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'train_3': <sagemaker.inputs.s3_input at 0x7ff8b87c3ba8>,
 'validation_3': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'validation': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'eval': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/eval'}

## Run Distributed training with File mode
아래의 설정을 Estimator 객체에 전달하여 Horovod 분산 학습에 대한 설정을 할 수 있습니다.
```python
distributions = {'mpi': {
                    'enabled': True,
                    'processes_per_host': # Number of Horovod processes per host
                        }
                }
```

In [10]:
distributions = {'mpi': {
                    'enabled': True,
                    'processes_per_host': gpus_per_host
                        }
                }

metric_definitions = [
    {'Name': 'train:loss', 'Regex': '.*loss: ([0-9\\.]+) - accuracy: [0-9\\.]+.*'},
    {'Name': 'train:accuracy', 'Regex': '.*loss: [0-9\\.]+ - accuracy: ([0-9\\.]+).*'},
    {'Name': 'validation:accuracy', 'Regex': '.* - loss: [0-9\\.]+ - accuracy: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_accuracy: ([0-9\\.]+).*'},
    {'Name': 'validation:loss', 'Regex': '.* - loss: [0-9\\.]+ - accuracy: [0-9\\.]+ - val_loss: ([0-9\\.]+) - val_accuracy: [0-9\\.]+.*'},
    {'Name': 'sec/steps', 'Regex': '.* ([0-9]+)ms/step - loss: [0-9\\.]+ - accuracy: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_accuracy: [0-9\\.]+'}
]
hyperparameters = {'epochs': 10, 'batch-size' : 256}

input_mode = 'File' 

In [13]:
source_dir = os.path.join(os.getcwd(), 'training_script')
estimator_dist = TensorFlow(base_job_name='horovod-cifar10-tf',
                       entry_point='cifar10_keras_dist_tf2.py',
                       source_dir=source_dir,
                       role=role,
                       framework_version='2.1.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=train_instance_count,
                       #instance_count=train_instance_count,
                       train_instance_type=train_instance_type,
                       #instance_type=train_instance_type,
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'horovod-file'}],
                       metric_definitions=metric_definitions,
                       distributions=distributions,
                       input_mode=input_mode)

Parameter distribution will be renamed to {'mpi': {'enabled': True, 'processes_per_host': 4}} in SageMaker Python SDK v2.


In [14]:
#estimator_dist.fit(remote_inputs, wait=False)
estimator_dist.fit(remote_inputs, wait=True)

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
'create_image_uri' will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.


2020-12-07 15:21:34 Starting - Starting the training job...
2020-12-07 15:21:38 Starting - Launching requested ML instances.........
2020-12-07 15:23:30 Starting - Preparing the instances for training......
2020-12-07 15:24:25 Downloading - Downloading input data
2020-12-07 15:24:25 Training - Downloading the training image........[34m2020-12-07 15:25:41,082 sagemaker-containers INFO     Imported framework sagemaker_tensorflow_container.training[0m
[34m2020-12-07 15:25:41,498 sagemaker-containers INFO     Starting MPI run as worker node.[0m
[34m2020-12-07 15:25:41,498 sagemaker-containers INFO     Creating SSH daemon.[0m
[34m2020-12-07 15:25:41,504 sagemaker-containers INFO     Waiting for MPI workers to establish their SSH connections[0m
[34m2020-12-07 15:25:41,504 sagemaker-containers INFO     Env Hosts: ['algo-1'] Hosts: ['algo-1:4'] process_per_hosts: 4 num_processes: 4[0m
[34m2020-12-07 15:25:41,506 sagemaker-containers INFO     Network interface name: eth0[0m
[34m202


2020-12-07 15:25:36 Training - Training image download completed. Training in progress.[34m[1,2]<stdout>:[2020-12-07 15:25:51.275 algo-1:60 INFO hook.py:364] Monitoring the collections: losses, sm_metrics, metrics[0m
[34m[1,1]<stdout>:[2020-12-07 15:25:51.320 algo-1:59 INFO hook.py:364] Monitoring the collections: metrics, losses, sm_metrics[0m
[34m[1,3]<stdout>:[2020-12-07 15:25:51.381 algo-1:61 INFO hook.py:364] Monitoring the collections: losses, sm_metrics, metrics[0m
[34m[1,0]<stdout>:[2020-12-07 15:25:51.819 algo-1:58 INFO hook.py:364] Monitoring the collections: metrics, losses, sm_metrics[0m
[34m[1,0]<stdout>:algo-1:58:218 [0] NCCL INFO NET/Socket : Using [0]eth0:10.0.86.31<0>[0m
[34m[1,0]<stdout>:algo-1:58:218 [0] NCCL INFO NET/Plugin : No plugin found (libnccl-net.so).[0m
[34m[1,0]<stdout>:[0m
[34m[1,0]<stdout>:algo-1:58:218 [0] misc/ibvwrap.cc:63 NCCL WARN Failed to open libibverbs.so[.1][0m
[34m[1,0]<stdout>:NCCL version 2.4.7+cuda10.1[0m
[34m[1,3]<stdou

[34m[1,0]<stdout>:39/39 - 13s - loss: 2.1935 - accuracy: 0.2336 - val_loss: 2.2584 - val_accuracy: 0.1783[0m
[34m[1,0]<stdout>:Epoch 2/10[0m
[34m[1,1]<stdout>:Epoch 1/10[0m
[34m[1,0]<stdout>:Epoch 1/10[0m
[34m[1,3]<stdout>:Epoch 1/10[0m
[34m[1,2]<stdout>:Epoch 1/10[0m
[34m[1,0]<stdout>:[0m
[34m[1,0]<stdout>:Epoch 00002: saving model to /opt/ml/output/checkpoint-2.ckpt[0m
[34m[1,1]<stdout>:39/39 - 2s - loss: 1.6824 - accuracy: 0.3677 - val_loss: 2.0392 - val_accuracy: 0.2536[0m
[34m[1,3]<stdout>:39/39 - 2s - loss: 1.7080 - accuracy: 0.3677 - val_loss: 2.0392 - val_accuracy: 0.2536[0m
[34m[1,2]<stdout>:39/39 - 2s - loss: 1.6566 - accuracy: 0.3677 - val_loss: 2.0392 - val_accuracy: 0.2536[0m
[34m[1,1]<stdout>:Epoch 3/10[0m
[34m[1,3]<stdout>:Epoch 3/10[0m
[34m[1,2]<stdout>:Epoch 3/10[0m
[34m[1,0]<stdout>:39/39 - 2s - loss: 1.6965 - accuracy: 0.3677 - val_loss: 2.0392 - val_accuracy: 0.2536[0m
[34m[1,0]<stdout>:Epoch 3/10[0m
[34m[1,1]<stdout>:Epoch 1/10[0m



2020-12-07 15:26:35 Uploading - Uploading generated training model
2020-12-07 15:26:35 Completed - Training job completed
[34m[1,0]<stderr>:Instructions for updating:[0m
[34m[1,0]<stderr>:If using Keras pass *_constraint arguments to layers.[0m
[34m[1,0]<stderr>:Instructions for updating:[0m
[34m[1,0]<stderr>:If using Keras pass *_constraint arguments to layers.[0m
[34m[1,0]<stderr>:INFO:tensorflow:Assets written to: /opt/ml/model/assets[0m
[34m[1,0]<stderr>:INFO:tensorflow:Assets written to: /opt/ml/model/assets[0m
[34m[1,0]<stdout>:[2020-12-07 15:26:22.678 algo-1:58 INFO utils.py:25] The end of training job file will not be written for jobs running under SageMaker.[0m
[34m2020-12-07 15:26:23,678 sagemaker-containers INFO     Reporting training SUCCESS[0m
Training seconds: 142
Billable seconds: 142


In this example, we set wait=False if you want to see the output logs, change this to wait=True

## Distributed training with Horovod and Pipe Mode input
Ditributed training with Horovod can also utilize SageMaker Pipe Mode.

Amazon SageMaker를 사용하면 Pipe 입력 모드를 사용하여 교육 작업을 생성할 수 있습니다. **Pipe 입력 모드를 사용하면 S3의 학습 데이터셋을 노트북 인스턴스의 로컬 디스크로 다운로드하는 대신 학습 인스턴스로 직접 스트리밍합니다.** 즉, 학습 작업이 더 빨리 시작되고 더 빨리 완료되며 더 적은 디스크 공간이 필요합니다.

SageMaker TensorFlow는 SageMaker에서 Pipe 입력 모드를 쉽게 활용할 수있는 `tf.data.Dataset`의 구현을 제공합니다. `tf.data.Dataset`을`sagemaker_tensorflow.PipeModeDataset`으로 바꾸면 학습 인스턴스로 스트리밍되는 TFRecord를 읽을 수 있습니다.

여러분의 entry_point 스크립트에서 `PipeModeDataset`을 `Dataset`처럼 사용할 수 있습니다. 아래 예시는 '학습' 채널에서 TFRecords를 읽을 수있는`PipeModeDataset`을 생성하는 예시입니다.

```python
from sagemaker_tensorflow import PipeModeDataset

features = {
    'data': tf.FixedLenFeature([], tf.string),
    'labels': tf.FixedLenFeature([], tf.int64),
}

def parse(record):
    parsed = tf.parse_single_example(record, features)
    return ({
        'data': tf.decode_raw(parsed['data'], tf.float64)
    }, parsed['labels'])

def train_input_fn(training_dir, hyperparameters):
    ds = PipeModeDataset(channel='training', record_format='TFRecord')
    ds = ds.repeat(20)
    ds = ds.prefetch(10)
    ds = ds.map(parse, num_parallel_calls=10)
    ds = ds.batch(64)
    return ds
```

Pipe 입력 모드에서 학습 작업을 구동하려면, 아래 예시처럼 여러분의 TensorFlow Estimator에서 `input_mode='Pipe'` 인자를 추가해 주세요.

```python
from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='tf-train-with-pipemodedataset.py', role='SageMakerRole',
                          train_instance_count=1, train_instance_type='ml.c5.2xlarge',
                          framework_version='1.14.0', input_mode='Pipe')

tf_estimator.fit('s3://bucket/path/to/training/data')
```

## Create a training script that support pipemode datasets

---- 
### TODO 1.
`cifar10_keras_pipe_tf2.py`에서 아래와 같이 `PipeModeDataset`를 import해 주세요.
```python
from sagemaker_tensorflow import PipeModeDataset
```
----
### TODO 2.
```python
def _input(epochs, batch_size, channel, channel_name):
```
함수 내에서
```python
dataset = tf.data.TFRecordDataset(filenames)
```
을
```python
dataset = PipeModeDataset(channel=channel_name, record_format='TFRecord')
```
로 수정해 주세요.

자세한 내용은 SageMaker-python-sdk [documentation](https://sagemaker.readthedocs.io/en/stable/using_tf.html#training-with-pipe-mode-using-pipemodedataset)를 참조해 주세요.


In [15]:
from sagemaker.tensorflow import TensorFlow

train_instance_type='ml.p3.8xlarge'
train_instance_count = 1
gpus_per_host = 4

num_of_shards = gpus_per_host * train_instance_count

In [16]:
distributions = {'mpi': {
                    'enabled': True,
                    'processes_per_host': gpus_per_host
                        }
                }

metric_definitions = [
    {'Name': 'train:loss', 'Regex': '.*loss: ([0-9\\.]+) - acc: [0-9\\.]+.*'},
    {'Name': 'train:accuracy', 'Regex': '.*loss: [0-9\\.]+ - acc: ([0-9\\.]+).*'},
    {'Name': 'validation:accuracy', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: ([0-9\\.]+).*'},
    {'Name': 'validation:loss', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: ([0-9\\.]+) - val_acc: [0-9\\.]+.*'},
    {'Name': 'sec/steps', 'Regex': '.* - \d+s (\d+)[mu]s/step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: [0-9\\.]+'}
]

hyperparameters = {'epochs': 10, 'batch-size' : 256}

input_mode = 'Pipe'

In [17]:
remote_inputs = shard_data_and_upload('./data', num_of_shards)

Generating ./data/train/0/train_0.tfrecords
Generating ./data/train/1/train_1.tfrecords
Generating ./data/train/2/train_2.tfrecords
Generating ./data/train/3/train_3.tfrecords


's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf'

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


shuffle_config: <sagemaker.session.ShuffleConfig object at 0x7ff861331fd0>


In [18]:
remote_inputs

{'train_0': <sagemaker.inputs.s3_input at 0x7ff861331f98>,
 'validation_0': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'train_1': <sagemaker.inputs.s3_input at 0x7ff861522940>,
 'validation_1': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'train_2': <sagemaker.inputs.s3_input at 0x7ff86132a048>,
 'validation_2': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'train_3': <sagemaker.inputs.s3_input at 0x7ff86132a668>,
 'validation_3': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'validation': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/validation',
 'eval': 's3://sagemaker-us-east-2-870180618679/data/DEMO-cifar10-tf/eval'}

In [19]:
source_dir = os.path.join(os.getcwd(), 'training_script')
estimator_dist = TensorFlow(base_job_name='horovod-cifar10-tf',
                       entry_point='cifar10_keras_dist_tf2.py',
                       source_dir=source_dir,
                       role=role,
                       framework_version='2.1.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=train_instance_count,
                       train_instance_type=train_instance_type,
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'horovod'}],
                       metric_definitions=metric_definitions,
                       distributions=distributions,
                       input_mode=input_mode)

Parameter distribution will be renamed to {'mpi': {'enabled': True, 'processes_per_host': 4}} in SageMaker Python SDK v2.


In [20]:
estimator_dist.fit(remote_inputs, wait=False)

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
'create_image_uri' will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.


학습 완료 후 Billable seconds도 확인해 보세요. Billable seconds는 실제로 학습 수행 시 과금되는 시간입니다.
```
Billable seconds: <time>
```

참고로, `ml.p2.xlarge` 인스턴스로 10 epoch 학습 시 전체 6분~7분이 소요되고, 실제 학습에 소요되는 시간은 3분~4분이 소요됩니다.

**수고 하셨습니다.**  

여러 분은 이제 분산 학습에 SageMaker 학습 작업을 사용할 수 있습니다.
다음 노트북으로 계속 진행하기 전에 CloudWatch 및 TensorBoard의 distribution job metrics를 살펴 보세요.
TensorBoard를 사용하여 여러분이 실행한 다른 작업을 비교할 수 있습니다.

TensorBoard 실행 시 아래의 인자값을 참조해 주세요.<br>
`--logdir dist:dist_model_dir,pipe:pipe_model_dir,file:normal_job_model_dir`