# Distributed Deep Learning training con Horovod y TensorFlow
## Sobre Horovod y Databricks

HorovodRunner es una API para ejecutar workloads de deep learning distribuidos en Databricks utilizando el framework Horovod (proyecto iniciado por Uber). Al integrar Horovod con el modo barrier de Spark, Databricks es capaz de ofrecer entrenamiento de modelos de machine learning utilizando deep learning. HorovodRunner permite ejecutar un metodo en Python que especifica una rutina de entrenamiento de un modelo que incluye hooks para Horovod. 

<img src="https://docs.databricks.com/_images/horovod-runner.png" />

In [2]:
import warnings
warnings.filterwarnings("ignore")

## Demo: MNIST dataset

The MNIST database (Modified National Institute of Standards and Technology database) is a large database of handwritten digits that is commonly used for training various image processing systems. The database is also widely used for training and testing in the field of machine learning.

<img src="https://greydanus.github.io/assets/subspace-nn/mnist.png" />

### Utilizando Deep Learning para resolver el problema
#### Preparando un directorio para checkpointing (Deep Learning Storage)
TensorFlow utiliza checkpoints para almacenar el estado del entrenamiento del modelo. Crearemos un directorio dentro de DBFS para este proposito. Durante un entrenamiento distribuido y para evitar que todos los nodos modifiquen este estado, el checkpoint será utilizado por solo uno de los nodos

In [5]:
import os
import time

checkpoint_dir = '/dbfs/ml/MNISTDemo/train/{}/'.format(time.time())
os.makedirs(checkpoint_dir)

Creamos una función get_dataset que nos permite obtener un dataset de MNIST. Las imagenes las obtenemos de los datasets standard de Keras

In [7]:
from tensorflow import keras

def get_dataset(num_classes, rank=0, size=1):
  (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data('MNIST-data-%d' % rank)
  x_train = x_train[rank::size]
  y_train = y_train[rank::size]
  x_test = x_test[rank::size]
  y_test = y_test[rank::size]
  x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
  x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
  x_train = x_train.astype('float32')
  x_test = x_test.astype('float32')
  x_train /= 255
  x_test /= 255
  y_train = keras.utils.to_categorical(y_train, num_classes)
  y_test = keras.utils.to_categorical(y_test, num_classes)
  
  return (x_train, y_train), (x_test, y_test)

## Constuimos el modelo utilizando Keras API

Crearemos un modelo basado en CNN utilizando la API secuencial de Keras con 2 capas de convolucion, dropout para agregar regularizaton y finalmente un fully connected layer

In [9]:
from tensorflow.keras import models
from tensorflow.keras import layers

def get_model(num_classes):
  model = models.Sequential()
  model.add(layers.Conv2D(32, kernel_size=(3, 3),
                   activation='relu',
                   input_shape=(28, 28, 1)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D(pool_size=(2, 2)))
  model.add(layers.Dropout(0.25))
  model.add(layers.Flatten())
  model.add(layers.Dense(128, activation='relu'))
  model.add(layers.Dropout(0.5))
  model.add(layers.Dense(num_classes, activation='softmax'))
  
  return model

Inspeccionemos como luce este modelo

In [11]:
model = get_model(10)
model.summary()

#### Rutina de entrenamiento tradicional

Normalmente, en TensorFlow armariamos una rutina de entrenamiento como la siguiente, donde especificariamos el optimizer a utilizar, compilariamos el modelo e iniciariamos el entrenamiento utilizando el metodo fit.

In [13]:
num_classes = 10

In [14]:
def train(learning_rate=1.0, batch_size = 128, epochs = 5):
  (x_train, y_train), (x_test, y_test) = get_dataset(num_classes)
  model = get_model(num_classes)

  optimizer = keras.optimizers.Adadelta(lr=learning_rate)

  model.compile(optimizer=optimizer,
                loss='categorical_crossentropy',
                metrics=['accuracy'])

  training_history = model.fit(x_train, y_train,
                        batch_size=batch_size,
                        epochs=epochs,
                        verbose=2,
                        validation_data=(x_test, y_test))

#### Runtina de entrenamiento distribuida con Horovod

Para utilizar horovod necesitamos realizar algunas modificaciones a nuestra rutina de entrenamiento. Principalmente cambiaremos:


- Inicializar Horovod
- Configurar la session de TensorFlow para que utilice tantos procesos como nodos tenemos disponibles
- Configuramos un optimizer especifico para Horovod como un wrapper del optimizer utilizado en nuestro modelo
- Nos aseguramos que la inicializacion del modelo se realiza consistentemente en todos los nodos
- Guardamos el checkpoint del entrenamiento en el directorio creado anteriormente, pero solo en el nodo driver

In [16]:
# Horovod: Import the relevant submodule
import horovod.tensorflow.keras as hvd

from tensorflow.keras import backend as K
import tensorflow as tf
import matplotlib.pyplot as plt

def train_hvd(learning_rate=1.0, batch_size=512, epochs=5):
  # Horovod: initialize Horovod.
  hvd.init()

  # Horovod: pin GPU to be used to process local rank (one GPU per process)
  config = tf.ConfigProto()
  config.gpu_options.allow_growth = True
  config.gpu_options.visible_device_list = str(hvd.local_rank())
  K.set_session(tf.Session(config=config))

  (x_train, y_train), (x_test, y_test) = get_dataset(num_classes, hvd.rank(), hvd.size())
  model = get_model(num_classes)

  #hvd.size() returns the numer of GPUs
  optimizer = keras.optimizers.Adadelta(lr=learning_rate * hvd.size())

  # Add Horovod Distributed Optimizer.
  optimizer = hvd.DistributedOptimizer(optimizer)

  model.compile(optimizer=optimizer,
                loss='categorical_crossentropy',
                metrics=['accuracy'])

  callbacks = [
      # Horovod: broadcast initial variable states from rank 0 to all other processes.
      # This is necessary to ensure consistent initialization of all workers when
      # training is started with random weights or restored from a checkpoint.
      hvd.callbacks.BroadcastGlobalVariablesCallback(0),
  ]

  # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
  if hvd.rank() == 0:
      callbacks.append(keras.callbacks.ModelCheckpoint(checkpoint_dir + '/checkpoint-{epoch}.ckpt', save_weights_only = True))

  train_history = model.fit(x_train, y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
  
  
  if hvd.rank() == 0:
    # Plot training & validation accuracy values
    plt.plot(train_history.history['acc'])
    plt.plot(train_history.history['val_acc'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    plt.show()
    
    plt.savefig(checkpoint_dir + '/train_acc.png')

    # Plot training & validation loss values
    plt.plot(train_history.history['loss'])
    plt.plot(train_history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    plt.show()
    
    plt.savefig(checkpoint_dir + '/train_loss.png')

#### Iniciamos el entrenamiento

La siguiente porción de código inicia la rutina de entrenamiento utilizando un Horovod Runner. Como argumentos especificamos los parametros que deben ser enviados a la función que indicamos anteriormente

In [18]:
from sparkdl import HorovodRunner

hr = HorovodRunner(np=2)
model = hr.run(train_hvd, learning_rate=0.1, batch_size=512, epochs=5)

#### Veamos la performance del modelo en terminos de su loss function

Durante la rutina de entrenamiento estamos guardando el historial donde registramos el loss como el accuracy tanto para validation como para training en cada uno de los epochs

In [20]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
img=mpimg.imread('/dbfs/ml/MNISTDemo/train/1575377203.3795166/train_acc.png')
imgplot = plt.imshow(img)
display(plt.show())

In [21]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
img=mpimg.imread('/dbfs/ml/MNISTDemo/train/1575377203.3795166/train_loss.png')
imgplot = plt.imshow(img)
display(plt.show())