Skip to content

Commit

Permalink
Added new Sample (TensorFlow Multinode Training with Horovod) (#197)
Browse files Browse the repository at this point in the history
* Added new Sample (TensorFlow Multinode Training with Horovod)

Signed-off-by: Shailen Sobhee <shailen.sobhee@intel.com>

* Fixed assert reported by bandit code checker tool.

Signed-off-by: Shailen Sobhee <shailen.sobhee@gmail.com>

* Fix CI issue (MPI bug) - Upload to new folder structure

Signed-off-by: Shailen Sobhee <shailen.sobhee@gmail.com>

* Minor little fix in sample.json; A comma was missing.

Signed-off-by: Shailen Sobhee <shailen.sobhee@gmail.com>

* Removed old references to old folder structure

Signed-off-by: Shailen Sobhee <shailen.sobhee@gmail.com>
  • Loading branch information
shailensobhee committed Oct 6, 2020
1 parent 9347ac7 commit d420ff5
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 0 deletions.
@@ -0,0 +1,114 @@
# Distributed TensorFlow with Horovod Sample
Today's modern computer systems are becoming heavily distributed and it is important to capitalise on scaling techniques to maximise the efficiency and performance of training of neural networks, which is a resource intensive process.

| Optimized for | Description
|:--- |:---
| OS | Linux* Ubuntu* 18.04
| Hardware | Intel® Xeon® Scalable processor family or newer
| Software | Intel® oneAPI AI Analytics Toolkit
| What you will learn | How to get scale out (distribute) the training of a model on multiple compute nodes
| Time to complete | 10 minutes

## Purpose
This sample code shows how to get started with scaling out the training of a neural network in TensorFlow on multiple compute nodes in a cluster. The sample uses [Horovod](https://github.com/horovod/horovod)*, which is a distributed deep learning training framework, to facilitate the task of distributing the workload. Horovod's core principles are based on MPI concepts such as size, rank, local rank, allreduce, allgather and, broadcast.

Intel-optimized Tensorflow is available as part of Intel® AI Analytics Toolkit. For more information on the optimizations as well as performance data, see this blog post [TensorFlow* Optimizations on Modern Intel® Architecture](https://software.intel.com/content/www/us/en/develop/articles/tensorflow-optimizations-on-modern-intel-architecture.html).

## Key implementation details

- The training dataset is comes from Keras*'s build-in dataset.
- The dataset will be split based on the number of MPI ranks
- We load Horovod and initialize the framework using `hvd.init()`
- We then wrap the optimzier around Horovod's distributed optimizer with `opt = hvd.DistributedOptimizer(opt)`
- The appropriate hooks are configured and we make sure that only rank 0 writes the checkpoints

Runtime settings for `OMP_NUM_THREADS`, `KMP_AFFINITY`, and `Inter/Intra-op` Threads are set within the script. You can read more about these settings in this dedicated document: [Maximize TensorFlow Performance on CPU: Considerations and Recommendations for Inference Workloads](https://software.intel.com/en-us/articles/maximize-tensorflow-performance-on-cpu-considerations-and-recommendations-for-inference)

## License
This code sample is licensed under MIT license.

## Build and Run the Sample

### Running Samples In DevCloud (Optional)
If running a sample in the Intel DevCloud, please follow the below steps to build the python environment. Also remember that you must specify the compute node (CPU, GPU, FPGA) as well whether to run in batch or interactive mode. For more information see the Intel® oneAPI Base Toolkit Get Started Guide (https://devcloud.intel.com/oneapi/get-started/base-toolkit/)

### Pre-requirement

TensorFlow is ready for use once you finish the Intel AI Analytics Toolkit installation, and have run post installation script.

You can refer to the oneAPI [main page](https://software.intel.com/en-us/oneapi) for toolkit installation, and the Toolkit [Getting Started Guide for Linux](https://software.intel.com/en-us/get-started-with-intel-oneapi-linux-get-started-with-the-intel-ai-analytics-toolkit) for post-installation steps and scripts.


### On a Linux* System
#### Activate conda environment With Root Access

Navigate in linux shell to your oneapi installation path, typically `/opt/intel/oneapi`. Activate the conda environment with the following command:

```
source /opt/intel/oneapi/setvars.sh
source activate tensorflow
```

#### Activate conda environment Without Root Access (Optional)

By default, the Intel AI Analytics toolkit is installed in the `/opt/intel/oneapi` folder, which requires root privileges to manage it. If you would like to bypass using root access to manage your conda environment, then you can clone your desired conda environment using the following command:

```
conda create --name user_tensorflow --clone tensorflow
```

Then activate your conda environment with the following command:

```
source activate user_tensorflow
```

## Running the Sample

Before you proceed with running the sample, you will need to install the 3rd-party [Horovod](https://github.com/horovod/horovod) framework.

After you have activated your conda environment, you may wish to execute the following commands to install `horovod`:
```
export HOROVOD_WITHOUT_MPI=1 #Optional, in case you encouter MPI-related install issues
pip install horovod
```

To the script on one machine without invoking Horovod, type the following command in the terminal with Python installed:
```
python TensorFlow_Multinode_Training_with_Horovod.py
```

To run the script with Horovod, we invoke MPI:
```
horovodrun -np 2 TensorFlow_Multinode_Training_with_Horovod.py
```

In the example above, we run the script on two MPI threads but on the same node. To use multiple nodes, we will pass the `-hosts` flag, where host1 and host2 are the hostname of two nodes on your cluster.

Example:

```
horovodrun -n 2 -H host1,host2 TensorFlow_Multinode_Training_with_Horovod.py
```


### Example of Output
With successful execution, it will print out the following results:

```
[...]
I0930 19:50:23.496505 140411946298240 basic_session_run_hooks.py:606] Saving checkpoints for 0 into ./checkpoints/model.ckpt.
INFO:tensorflow:loss = 2.2964332, step = 1
I0930 19:50:25.164514 140410195934080 basic_session_run_hooks.py:262] loss = 2.2964332, step = 1
INFO:tensorflow:loss = 2.2851412, step = 1
I0930 19:50:25.186133 140411946298240 basic_session_run_hooks.py:262] loss = 2.2851412, step = 1
INFO:tensorflow:loss = 1.1958275, step = 101 (19.356 sec)
I0930 19:50:44.521067 140410195934080 basic_session_run_hooks.py:260] loss = 1.1958275, step = 101 (19.356 sec)
INFO:tensorflow:global_step/sec: 5.16882
[...]
============================
Number of tasks: 2
Total time is: 96.9508
```


@@ -0,0 +1,235 @@
#!/usr/bin/env python
# encoding: utf-8

'''
==============================================================
Copyright © 2019 Intel Corporation
SPDX-License-Identifier: MIT
==============================================================
==============================================================
Copyright 2015 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================
'''

"""A deep MNIST classifier using convolutional layers.
See extensive documentation at
https://www.tensorflow.org/get_started/mnist/pros
"""
"""Convolutional Neural Network Estimator for MNIST, built with tf.layers."""

import time
import os
import errno
import tensorflow as tf
import horovod.tensorflow as hvd
import numpy as np

from tensorflow import keras

tf.compat.v1.disable_eager_execution()
'''
Environment settings:
Set MKLDNN_VERBOSE=1 to show DNNL run time verbose
Set KMP_AFFINITY=verbose to show OpenMP thread information
'''
#import os; os.environ["MKLDNN_VERBOSE"] = "1"
import os; os.environ["KMP_AFFINITY"] = "granularity=fine,compact,1,0"

def cnn_model_fn(feature, target, mode):

"""Model function for CNN."""
"""2-layer convolution model."""

# Convert the target to a one-hot tensor of shape (batch_size, 10) and
# with a on-value of 1 for each one-hot vector of length 10.
target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0)


# Input Layer
# Reshape X to 4-D tensor: [batch_size, width, height, channels]
# MNIST images are 28x28 pixels, and have one color channel
feature = tf.reshape(feature, [-1, 28, 28, 1])

# First Convolutional Layer #1
# Computes 32 features using a 5x5 filter with ReLU activation.
# Padding is added to preserve width and height.
# Input Tensor Shape: [batch_size, 28, 28, 1]
# Output Tensor Shape: [batch_size, 28, 28, 32]
conv1 = tf.compat.v1.layers.conv2d(
inputs=feature,
filters=32,
kernel_size=[5, 5],
padding="SAME",
activation=tf.nn.relu)

# Pooling Layer #1
# First max pooling layer with a 2x2 filter and stride of 2
# Input Tensor Shape: [batch_size, 28, 28, 32]
# Output Tensor Shape: [batch_size, 14, 14, 32]
pool1 = tf.nn.max_pool2d(input=conv1,
ksize=[1, 2, 2, 1],
strides=[1, 2, 2, 1],
padding="SAME")

# Convolutional Layer #2
# Computes 64 features using a 5x5 filter.
# Padding is added to preserve width and height.
# Input Tensor Shape: [batch_size, 14, 14, 32]
# Output Tensor Shape: [batch_size, 14, 14, 64]
conv2 = tf.compat.v1.layers.conv2d(
inputs=pool1,
filters=64,
kernel_size=[5, 5],
padding="SAME",
activation=tf.nn.relu)

# Pooling Layer #2
# Second max pooling layer with a 2x2 filter and stride of 2
# Input Tensor Shape: [batch_size, 14, 14, 64]
# Output Tensor Shape: [batch_size, 7, 7, 64]
pool2 = tf.nn.max_pool2d(input=conv2,
ksize=[1, 2, 2, 1],
strides=[1, 2, 2, 1],
padding="SAME")

# Flatten tensor into a batch of vectors
# Reshape tensor into a batch of vectors
# Input Tensor Shape: [batch_size, 7, 7, 64]
# Output Tensor Shape: [batch_size, 7 * 7 * 64]
pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64])

# Dense Layer
# Densely connected layer with 1024 neurons
# Input Tensor Shape: [batch_size, 7 * 7 * 64]
# Output Tensor Shape: [batch_size, 1024]
dense = tf.compat.v1.layers.dense(inputs=pool2_flat,
units=1024,
activation=tf.nn.relu)

# Add dropout operation; 0.6 probability that element will be kept
dropout = tf.compat.v1.layers.dropout(inputs=dense,
rate=0.6,
training=mode == tf.estimator.ModeKeys.TRAIN)

# Logits layer
# Input Tensor Shape: [batch_size, 1024]
# Output Tensor Shape: [batch_size, 10]
logits = tf.compat.v1.layers.dense(dropout, 10, activation=None)

# Calculate Loss (for both TRAIN and EVAL modes)
loss = tf.compat.v1.losses.softmax_cross_entropy(target, logits)

return tf.argmax(input=logits, axis=1), loss

def train_input_generator(x_train, y_train, batch_size=64):
if len(x_train) == len(y_train):
p = np.random.permutation(len(x_train))
x_train, y_train = x_train[p], y_train[p]
index = 0
while index <= len(x_train) - batch_size:
yield x_train[index:index + batch_size], \
y_train[index:index + batch_size],
index += batch_size
else:
None

def main(unused_argv):
# Initialize Horovod
hvd.init()
# Keras automatically creates a cache directory in ~/.keras/datasets for
# storing the downloaded MNIST data. This creates a race
# condition among the workers that share the same filesystem. If the
# directory already exists by the time this worker gets around to creating
# it, ignore the resulting exception and continue.
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets')
if not os.path.exists(cache_dir):
try:
os.mkdir(cache_dir)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(cache_dir):
pass
else:
raise

# Load training and eval data
# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = \
keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())
# The shape of downloaded data is (-1, 28, 28), hence we need to reshape it
# into (-1, 784) to feed into our network. Also, need to normalize the
# features between 0 and 1.
x_train = np.reshape(x_train, (-1, 784)) / 255.0
x_test = np.reshape(x_test, (-1, 784)) / 255.0
# Build model...
with tf.compat.v1.name_scope('input'):
image = tf.compat.v1.placeholder(tf.float32, [None, 784], name='image')
label = tf.compat.v1.placeholder(tf.float32, [None], name='label')
predict, loss = cnn_model_fn(image, label, tf.estimator.ModeKeys.TRAIN)

# Horovod: adjust learning rate based on number of MPI Tasks.
opt = tf.compat.v1.train.RMSPropOptimizer(0.001 * hvd.size())
opt = hvd.DistributedOptimizer(opt)

global_step = tf.compat.v1.train.get_or_create_global_step()
train_op = opt.minimize(loss, global_step=global_step)

hooks = [
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states
# from rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights
# or restored from a checkpoint.
hvd.BroadcastGlobalVariablesHook(0),

# Horovod: adjust number of steps based on number of MPI tasks.
tf.estimator.StopAtStepHook(last_step=1000 // hvd.size()),

tf.estimator.LoggingTensorHook(tensors={'step': global_step, 'loss': loss},
every_n_iter=100),
]
# Horovod: save checkpoints only on worker 0 to prevent other workers from
# corrupting them.
checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None
training_batch_generator = train_input_generator(x_train,
y_train, batch_size=100)
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.

config = tf.compat.v1.ConfigProto()
# config.inter_op_parallelism_threads = 2
# config.intra_op_parallelism_threads = 4


time_start = time.time()

with tf.compat.v1.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
hooks=hooks,
config=config) as mon_sess:
while not mon_sess.should_stop():
# Run a training step synchronously.
image_, label_ = next(training_batch_generator)
mon_sess.run(train_op, feed_dict={image: image_, label: label_})

if hvd.rank() == 0:
print('============================')
print('Number of tasks: ', hvd.size())
print('Total time is: %g' % (time.time() - time_start))


if __name__ == '__main__':
tf.compat.v1.app.run()
@@ -0,0 +1,22 @@
{
"guid": "AA458E3A-932C-460E-97A7-5962AF0C41FA",
"name": "TensorFlow Multinode Training with Horovod",
"categories": ["Toolkit/Intel® AI Analytics Toolkit/TensorFlow"],
"description": "This sample shows how to train a TensorFlow model on multiple nodes in a cluster using Horovod.",
"builder": ["cli"],
"languages": [{"python":{}}],
"os":["linux"],
"ciTests": {
"linux": [
{
"id": "tensorflow horovod",
"steps": [
"source activate tensorflow",
"export HOROVOD_WITHOUT_MPI=1",
"pip install horovod",
"horovodrun -np 2 python TensorFlow_Multinode_Training_with_Horovod.py"
]
}
]
}
}
@@ -0,0 +1,2 @@
to delete - moved to new folder structure

@@ -0,0 +1,20 @@
{
"guid": "AA458E3A-932C-460E-97A7-5962AF0C41FA",
"name": "TensorFlow Multinode Training with Horovod",
"categories": ["Toolkit/Intel® AI Analytics Toolkit/TensorFlow"],
"description": "This sample shows how to train a TensorFlow and run inference with oneMKL and oneDNN.",
"builder": ["cli"],
"languages": [{"python":{}}],
"os":["linux"],
"ciTests": {
"linux": [
{
"id": "tensorflow horovod",
"steps": [
"source activate tensorflow",
"mpirun -n 2 python TensorFlow_Multinode_Training_with_Horovod.py"
]
}
]
}
}

0 comments on commit d420ff5

Please sign in to comment.