# Horovod with Petastorm

[Petastorm](https://github.com/uber/petastorm) enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as TensorFlow, Pytorch, and PySpark and can be used from pure Python code.

**Required Libraries**: 
* `petastorm==0.8.2` via PyPI

Run the following cell to set up our environment.

In [0]:
%run "./Includes/Classroom-Setup"

## Load data

In [0]:
from sklearn.datasets.california_housing import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
np.random.seed(0)

cal_housing = fetch_california_housing()

# split 80/20 train-test
X_train, X_test, y_train, y_test = train_test_split(cal_housing.data,
                                                        cal_housing.target,
                                                        test_size=0.2,
                                                        random_state=1)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

## Spark DataFrame

Let's concatenate our features and label, then create a Spark DataFrame from our Pandas DataFrame.

In [0]:
data = pd.concat([pd.DataFrame(X_train, columns=cal_housing.feature_names), pd.DataFrame(y_train, columns=["label"])], axis=1)
trainDF = spark.createDataFrame(data)
display(trainDF)

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,label
-0.3623260524334147,1.85890297046532,-0.1687733398561354,-0.168415371967935,-0.4861138032788396,0.0019013873805031,0.9722904618294768,-1.4225094202651083,2.556
-0.1410232927060988,1.064348229072506,-0.206655229462896,-0.2809812292178221,-0.4042430759059984,0.0094549615001264,1.0845962598673802,-1.382659193976686,1.146
-0.6614495628340722,-1.0014940985488086,1.1065836102381363,1.11813068653877,-1.2505372251622158,0.0205537201063545,1.061199218609482,-0.8297373042249129,1.375
-1.24864730585636,0.4287044359582555,-0.6256134552400134,0.098447157197629,1.8801281976383863,-0.0443537481674696,-0.7544111830032698,0.6098521204441189,1.188
1.828920193428455,0.7465263325153809,0.1736234464271357,-0.0833522671580553,-0.6009108014429322,-0.060915119605702,-0.8152434902738014,0.5899270072999078,4.227
-1.4284491897150795,-0.2863948312952766,-0.7472267955849092,-0.1547632542196419,-0.5617552361776603,-0.0862274356932861,2.446304061076944,-2.2494016157497407,0.75
0.936995000079304,-0.2069393571559952,0.8374018880054728,-0.1208691045883395,-0.6872310248686453,-0.0272696378989432,1.374719571465293,-0.8895126436575392,1.295
-0.8653589618470336,-1.0014940985488086,-0.0121878476501166,0.1651077095036401,0.3672995396619726,0.026621673885657,-1.086649168865398,1.3072310804913962,1.17
-0.6825965059518282,-0.7631276761309647,-0.6575323153922634,-0.0822285622132811,5.234158322293153,-0.0300057578517395,-0.6561436097201082,0.555058059297547,1.454
0.0985187054602814,0.1903380135404115,0.024679680663782,-0.0634553369065674,-0.0616318798348694,0.0615689251314448,-0.8479993480348564,0.779215582169882,2.259


## Create Dense Vectors for Features

In [0]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=cal_housing.feature_names, outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF).select("features", "label")
display(vecTrainDF)

features,label
"List(1, 8, List(), List(-0.3623260524334147, 1.8589029704653197, -0.1687733398561354, -0.16841537196793502, -0.48611380327883963, 0.0019013873805031264, 0.9722904618294769, -1.4225094202651083))",2.556
"List(1, 8, List(), List(-0.14102329270609887, 1.0643482290725061, -0.206655229462896, -0.28098122921782215, -0.40424307590599845, 0.00945496150012646, 1.0845962598673802, -1.382659193976686))",1.146
"List(1, 8, List(), List(-0.6614495628340722, -1.0014940985488086, 1.1065836102381363, 1.1181306865387701, -1.2505372251622158, 0.020553720106354596, 1.0611992186094819, -0.8297373042249129))",1.375
"List(1, 8, List(), List(-1.2486473058563599, 0.4287044359582555, -0.6256134552400134, 0.09844715719762903, 1.8801281976383866, -0.04435374816746968, -0.7544111830032698, 0.6098521204441189))",1.188
"List(1, 8, List(), List(1.8289201934284547, 0.7465263325153809, 0.17362344642713573, -0.08335226715805534, -0.6009108014429322, -0.06091511960570208, -0.8152434902738014, 0.5899270072999078))",4.227
"List(1, 8, List(), List(-1.4284491897150797, -0.2863948312952766, -0.7472267955849092, -0.1547632542196419, -0.5617552361776603, -0.08622743569328611, 2.446304061076944, -2.2494016157497407))",0.75
"List(1, 8, List(), List(0.9369950000793041, -0.20693935715599523, 0.8374018880054728, -0.12086910458833956, -0.6872310248686453, -0.027269637898943263, 1.374719571465293, -0.8895126436575392))",1.295
"List(1, 8, List(), List(-0.8653589618470336, -1.0014940985488086, -0.012187847650116622, 0.16510770950364012, 0.3672995396619726, 0.026621673885657098, -1.0866491688653979, 1.3072310804913962))",1.17
"List(1, 8, List(), List(-0.6825965059518282, -0.7631276761309647, -0.6575323153922634, -0.08222856221328118, 5.234158322293153, -0.030005757851739524, -0.6561436097201082, 0.555058059297547))",1.454
"List(1, 8, List(), List(0.09851870546028144, 0.1903380135404115, 0.02467968066378206, -0.06345533690656746, -0.06163187983486943, 0.06156892513144482, -0.8479993480348564, 0.779215582169882))",2.259


## Array

Petastorm requires an Array as input, not a Vector. Let's register a UDF in Scala and invoke it from Python for optimal performance.

In [0]:
%scala
import org.apache.spark.ml.linalg.Vector
val toArray = udf { v: Vector => v.toArray }
spark.udf.register("toArray", toArray)

## Save Data 

Let's write our DataFrame out as a parquet files to DBFS.

In [0]:
workingDir = "dbfs:/pinky.gtm@mail.kmutt.ac.th"
file_path = f"{workingDir}/deep-learning/petastorm.parquet"
vecTrainDF.selectExpr("toArray(features) AS features", "label").repartition(8).write.mode("overwrite").parquet(file_path)

## Remove committed/started metadata

Petastorm + Horovod do not work if you leave the committed/started metadata files in our Parquet folder. We will need to remove them.

In [0]:
[dbutils.fs.rm(i.path) for i in dbutils.fs.ls(file_path) if ("_committed_" in i.name) | ("_started_" in i.name)]

display(dbutils.fs.ls(file_path))

path,name,size
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/_SUCCESS,_SUCCESS,0
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/part-00000-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-22-1-c000.snappy.parquet,part-00000-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-22-1-c000.snappy.parquet,122253
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/part-00001-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-23-1-c000.snappy.parquet,part-00001-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-23-1-c000.snappy.parquet,121772
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/part-00002-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-24-1-c000.snappy.parquet,part-00002-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-24-1-c000.snappy.parquet,122049
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/part-00003-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-25-1-c000.snappy.parquet,part-00003-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-25-1-c000.snappy.parquet,122169
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/part-00004-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-26-1-c000.snappy.parquet,part-00004-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-26-1-c000.snappy.parquet,121958
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/part-00005-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-27-1-c000.snappy.parquet,part-00005-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-27-1-c000.snappy.parquet,122356
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/part-00006-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-28-1-c000.snappy.parquet,part-00006-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-28-1-c000.snappy.parquet,121418
dbfs:/pinky.gtm@mail.kmutt.ac.th/deep-learning/petastorm.parquet/part-00007-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-29-1-c000.snappy.parquet,part-00007-tid-8151245934285474382-83834c3d-e2f7-4921-9223-cbe4716500b1-29-1-c000.snappy.parquet,122205


## Define Model

In [0]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models, layers
# tf.set_random_seed(42)

def build_model():
  from tensorflow.keras import models, layers
  model = models.Sequential()
  model.add(layers.Dense(20, input_dim=8, activation='relu'))
  model.add(layers.Dense(20, activation='relu'))
  model.add(layers.Dense(1, activation='linear'))
  return model

## Single Node

Define shape of the input tensor and output tensor and fit the model (on the driver). We need to use Petastorm's [make_batch_reader](https://petastorm.readthedocs.io/en/latest/api.html#petastorm.reader.make_batch_reader) to create an instance of Reader for reading batches out of a non-Petastorm Parquet store.

In [0]:
from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset

abs_file_path = file_path.replace("dbfs:/", "/dbfs/")

with make_batch_reader("file://" + abs_file_path, num_epochs=None) as reader: 
  dataset = make_petastorm_dataset(reader).map(lambda x: (tf.reshape(x.features, [-1,8]), tf.reshape(x.label, [-1,1])))
  model = build_model()
  optimizer = keras.optimizers.Adam(lr=0.001)
  model.compile(optimizer=optimizer,
                loss='mse',
                metrics=['mse'])
  model.fit(dataset, steps_per_epoch=10, epochs=10)

## Horovod

Let's do the same thing, but let's add in Horovod for distributed model training.

In [0]:
import horovod.tensorflow.keras as hvd

def run_training_horovod():
  # Horovod: initialize Horovod.
  hvd.init()
  with make_batch_reader("file://" + abs_file_path, num_epochs=None, cur_shard=hvd.rank(), shard_count= hvd.size()) as reader:
    dataset = make_petastorm_dataset(reader).map(lambda x: (tf.reshape(x.features, [-1,8]), tf.reshape(x.label, [-1,1])))
    model = build_model()
    from tensorflow.keras import optimizers
    optimizer = optimizers.Adam(lr=0.001*hvd.size())
    optimizer = hvd.DistributedOptimizer(optimizer)
    model.compile(optimizer=optimizer,
                  loss='mse',
                  metrics=['mse'])
    history = model.fit(dataset, steps_per_epoch=10, epochs=10)

## Train on driver

In [0]:
from sparkdl import HorovodRunner
hr = HorovodRunner(np=-1)
hr.run(run_training_horovod)

## Better Horovod

In [0]:
import horovod.tensorflow.keras as hvd

ml_working_path = workingDir
dbutils.fs.rm(f"{ml_working_path}/petastorm_checkpoint_weights.ckpt", True)
def run_training_horovod():
  # Horovod: initialize Horovod.
  hvd.init()
  with make_batch_reader("file://" + abs_file_path, num_epochs=None, cur_shard=hvd.rank(), shard_count=hvd.size()) as reader:
    dataset = make_petastorm_dataset(reader).map(lambda x: (tf.reshape(x.features, [-1,8]), tf.reshape(x.label, [-1,1])))
    model = build_model()
    from tensorflow.keras import optimizers
    optimizer = optimizers.Adam(lr=0.001*hvd.size())
    optimizer = hvd.DistributedOptimizer(optimizer)
    model.compile(optimizer=optimizer,
                  loss='mse',
                  metrics=['mse'])
    checkpoint_dir = f"{ml_working_path}/petastorm_checkpoint_weights.ckpt"
    callbacks = [
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    hvd.callbacks.MetricAverageCallback(),
    hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="loss", patience=10, verbose=1)
    ]

    if hvd.rank() == 0:
      callbacks.append(tf.keras.callbacks.ModelCheckpoint(checkpoint_dir, save_weights_only=True))
  
    history = model.fit(dataset, steps_per_epoch=10, epochs=10, callbacks=callbacks)

In [0]:
import horovod.tensorflow.keras as hvd
from sparkdl import HorovodRunner
hr = HorovodRunner(np=-1)
hr.run(run_training_horovod)

## Run on all workers

In [0]:
from sparkdl import HorovodRunner
hr = HorovodRunner(np=0)
hr.run(run_training_horovod)