# A Horovod with HyperOpt + MLflow integration example on Keras + Spark + Mnist 

In [None]:
import argparse
import os
import subprocess
import sys
from distutils.version import LooseVersion

import numpy as np

import pyspark
import pyspark.sql.types as T
from pyspark import SparkConf
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
if LooseVersion(pyspark.__version__) < LooseVersion('3.0.0'):
    from pyspark.ml.feature import OneHotEncoderEstimator as OneHotEncoder
else:
    from pyspark.ml.feature import OneHotEncoder
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D

import horovod.spark.keras as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

import mlflow

## Initialize SparkSession

In [None]:
conf = SparkConf().setAppName('keras_spark_mnist').set('spark.sql.shuffle.partitions', '16')

##### Get Spark Properties

In [None]:
spark_conf = !echo $SPARK_HOME/conf/spark-defaults.conf

In [None]:
print(spark_conf.s)

In [None]:
def get_master():
    with open(spark_conf.s) as f:
        for line in f:
            if not line.startswith('#') and line.split():
                line = line.split()
                if line[0] == "spark.master":
                        spark_master = line[1]
                        return spark_master

set_master = get_master()

print(set_master)

In [None]:
conf.setMaster(set_master)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

#### Setup our store for intermediate data and Download Mnist dataset and upload to HDFS.

##### Get Master IP 

In [None]:
master_ip = !hostname -I | awk '{print $1}'

In [None]:
print(master_ip)

##### Set up our store HDFS path for intermediate data

In [None]:
hdfs_path = "hdfs://" + master_ip.s + ":9000/tmp"

In [None]:
print(hdfs_path)

In [None]:
store = Store.create(hdfs_path)

##### Download MNIST dataset

In [None]:
data_url = 'https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2'
libsvm_path = os.path.join('/tmp', 'mnist.bz2')
if not os.path.exists(libsvm_path):
    subprocess.check_output(['wget', data_url, '-O', libsvm_path])

##### Upload Mnist dataset to HDFS

In [None]:
!hadoop fs -mkdir /tmp
!hadoop fs -put   /tmp/mnist.bz2  /tmp

## Load dataset into a Spark DataFrame

In [None]:
df = spark.read.format('libsvm') \
    .option('numFeatures', '784') \
    .load(libsvm_path)

## One-hot encode labels into SparseVectors

In [None]:
encoder = OneHotEncoder(inputCols=['label'],
                        outputCols=['label_vec'],
                        dropLast=False)
model = encoder.fit(df)
train_df = model.transform(df)

# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])

In [None]:
# Disable GPUs when building the model to prevent memory leaks
if LooseVersion(tf.__version__) >= LooseVersion('2.0.0'):
    # See https://github.com/tensorflow/tensorflow/issues/33168
    os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
else:
    keras.backend.set_session(tf.Session(config=tf.ConfigProto(device_count={'GPU': 0})))

##   Set Traing parameters
##### Set the number of worker processes for training

Please set number of worker processes for training, default: `spark.executor.cores`

In [None]:
def get_executor_cores():
    with open(spark_conf.s) as f:
        for line in f:
            if not line.startswith('#') and line.split():
                line = line.split()
                if line[0] == "spark.executor.cores":
                        spark_executor_cores = line[1]
                        return spark_executor_cores

executor_cores = get_executor_cores()
set_num_proc = int(executor_cores)

# input_proc = input('Please set number of worker processes for training: ').strip()
# set_num_proc = int(input_proc) if input_proc else executor_cores
print(set_num_proc)

##### Set the batch size
input batch size for training, default: `128`

In [None]:
input_size = input('Please set the input batch size for training: ').strip()
set_batch_size = int(input_size) if input_size else 128

print(set_batch_size)

##### Set the number of epochs to train
number of epochs to train, default: `1`

In [None]:
input_epochs = input('Please set the number of epochs to train: ').strip()
set_epochs = int(input_epochs) if input_epochs else 1

print(set_epochs)

## Define the Keras model 

In [None]:

def train(learning_rate):
    model = Sequential()
    model.add(Conv2D(32, kernel_size=(3, 3),
                     activation='relu',
                     input_shape=(28, 28, 1)))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(10, activation='softmax'))

    optimizer = keras.optimizers.Adadelta(learning_rate)
    loss = keras.losses.categorical_crossentropy
    
    
    backend = SparkBackend(num_proc=set_num_proc,
                       stdout=sys.stdout, stderr=sys.stderr,
                       prefix_output_with_timestamp=True)
    keras_estimator = hvd.KerasEstimator(backend=backend,
                                         store=store,
                                         model=model,
                                         optimizer=optimizer,
                                         loss=loss,
                                         metrics=['accuracy'],
                                         feature_cols=['features'],
                                         label_cols=['label_vec'],
                                         batch_size=set_batch_size,
                                         epochs=set_epochs,
                                         verbose=1)

    keras_model = keras_estimator.fit(train_df).setOutputCols(['label_prob'])
    
    pred_df = keras_model.transform(test_df)
    argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
    pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
    evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred', labelCol='label', metricName='accuracy')
    
    accuracy = evaluator.evaluate(pred_df)
    print('Test accuracy:', accuracy)
    with mlflow.start_run():
      mlflow.log_metric("learning_rate", learning_rate)
      mlflow.log_metric("loss", 1-accuracy)
    return {'loss': 1-accuracy, 'status': STATUS_OK}


In [None]:
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK, Trials
search_space = hp.uniform('learning_rate', 0, 1)
mlflow.set_tracking_uri(f"http://{master_ip}:5001")
mlflow.set_experiment("HyperOpt + Horovod on Spark + Mlflow")
argmin = fmin(
    fn=train,
    space=search_space,
    algo=tpe.suggest,
    max_evals=16)
print("Best value found: ", argmin)


In [None]:
def train_and_returnModel(learning_rate):
    model = Sequential()
    model.add(Conv2D(32, kernel_size=(3, 3),
                     activation='relu',
                     input_shape=(28, 28, 1)))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(10, activation='softmax'))

    optimizer = keras.optimizers.Adadelta(learning_rate)
    loss = keras.losses.categorical_crossentropy
    
    
    backend = SparkBackend(num_proc=set_num_proc,
                       stdout=sys.stdout, stderr=sys.stderr,
                       prefix_output_with_timestamp=True)
    keras_estimator = hvd.KerasEstimator(backend=backend,
                                         store=store,
                                         model=model,
                                         optimizer=optimizer,
                                         loss=loss,
                                         metrics=['accuracy'],
                                         feature_cols=['features'],
                                         label_cols=['label_vec'],
                                         batch_size=set_batch_size,
                                         epochs=set_epochs,
                                         verbose=1)

    keras_model = keras_estimator.fit(train_df).setOutputCols(['label_prob'])
    return keras_model

In [None]:
model_2_mlflow = train_and_returnModel(argmin.get('learning_rate'))
metadata = model_2_mlflow._get_metadata()
floatx = model_2_mlflow._get_floatx()
mlflow.keras.log_model(model_2_mlflow.getModel(), "Keras-Sequential-model",registered_model_name="Keras-Sequential-model-reg")

In [None]:
model_uri = "models:/Keras-Sequential-model-reg/1"
loaded_model = mlflow.keras.load_model(model_uri)

hvdKerasModel_from_mlfow = hvd.KerasModel(model=loaded_model,
                                      feature_columns=['features'],
                                      label_columns=['label_vec'],
                                     _floatx = floatx,
                                     _metadata = metadata)

pred_df = hvdKerasModel_from_mlfow.transform(test_df)
pred_df.show(10)

In [None]:
spark.stop()