# Horovod with Petastorm

[Petastorm](https://github.com/uber/petastorm) enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as TensorFlow, Pytorch, and PySpark and can be used from pure Python code.

**Required Libraries**: 
* `petastorm==0.8.2` via PyPI

Imagine you and a bunch of friends are working together to build a super big LEGO castle. Each friend is in a different part of the room, and you all have your own pile of LEGO blocks. You want to build faster, so you each start building different parts of the castle at the same time, instead of waiting for one person to do it all.

- **Petastorm** is the helper who gives each of you just the right LEGO blocks you need to build your part of the castle. Instead of having one giant pile that everyone fights over, Petastorm organizes it, making sure everyone has their own pile to build with.

- **Horovod** is like the messenger who runs back and forth between all of you. It makes sure that everyone is building the castle in the same way, sharing any cool tricks or fixes along the way. If someone finds a better way to put pieces together, Horovod tells everyone else, so you all stay in sync.

So, Petastorm gives each friend the pieces they need, and Horovod helps everyone work together smoothly, making sure the LEGO castle comes together perfectly and really fast!

Run the following cell to set up our environment.

In [0]:
%run "./Includes/Classroom-Setup"

## Load data

In [0]:
from sklearn.datasets.california_housing import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
np.random.seed(0)

cal_housing = fetch_california_housing()

# split 80/20 train-test
X_train, X_test, y_train, y_test = train_test_split(cal_housing.data,
                                                        cal_housing.target,
                                                        test_size=0.2,
                                                        random_state=1)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

## Spark DataFrame

Let's concatenate our features and label, then create a Spark DataFrame from our Pandas DataFrame.

In [0]:
data = pd.concat([pd.DataFrame(X_train, columns=cal_housing.feature_names), pd.DataFrame(y_train, columns=["label"])], axis=1)
trainDF = spark.createDataFrame(data)
display(trainDF)

## Create Dense Vectors for Features

In [0]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=cal_housing.feature_names, outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF).select("features", "label")
display(vecTrainDF)

## Array

Petastorm requires an Array as input, not a Vector. Let's register a UDF in Scala and invoke it from Python for optimal performance.

In [0]:
%scala
import org.apache.spark.ml.linalg.Vector
val toArray = udf { v: Vector => v.toArray }
spark.udf.register("toArray", toArray)


Imagine you have a special box of crayons (we'll call this a **vector**) where all the crayons are organized in a specific way—maybe in rows or slots. But you want them just laid out in a single line (an **array**) so you can see each color one after another without the slots.

In this case:
1. **Vector** is like your crayon box, where the crayons are organized in a specific structure.
2. **Array** is like a row of crayons lined up one by one.

#### What This Code Does

1. **Define the Function** (`toArray`): This function (`toArray`) tells Spark how to take crayons out of the box (convert a vector to an array). It’s written in **Scala**, which is a language Spark understands well.
   
   ```scala
   val toArray = udf { v: Vector => v.toArray }
   ```

   - **Scala UDF** (User-Defined Function): We create this function in Scala to take each vector (organized crayons) and flatten it out into a single line (array of crayons).

2. **Register the Function** with Spark:
   
   ```scala
   spark.udf.register("toArray", toArray)
   ```

   - By registering `toArray`, we make it available for **Python code** in Spark. So now, Python can use this function to take any vector in a Spark DataFrame and turn it into an array.

3. **Why This Matters for Petastorm**: 
   - Petastorm needs the crayon colors (data) laid out in a single row (array) instead of in a structured box (vector). So, we register `toArray` to help make this format change before Petastorm can use the data.

In simpler terms, we’re creating a special “crayon layout” tool in Scala that Python code can use to change the shape of data, making it easy for Petastorm to work with it!

## Save Data 

Let's write our DataFrame out as a parquet files to DBFS.

In [0]:
file_path = f"{workingDir}/deep-learning/petastorm.parquet"
vecTrainDF.selectExpr("toArray(features) AS features", "label").repartition(8).write.mode("overwrite").parquet(file_path)

## Remove committed/started metadata

Petastorm + Horovod do not work if you leave the committed/started metadata files in our Parquet folder. We will need to remove them.

In [0]:
[dbutils.fs.rm(i.path) for i in dbutils.fs.ls(file_path) if ("_committed_" in i.name) | ("_started_" in i.name)]

display(dbutils.fs.ls(file_path))

## Define Model

In [0]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models, layers
tf.set_random_seed(42)

def build_model():
  from tensorflow.keras import models, layers
  model = models.Sequential()
  model.add(layers.Dense(20, input_dim=8, activation='relu'))
  model.add(layers.Dense(20, activation='relu'))
  model.add(layers.Dense(1, activation='linear'))
  return model

## Single Node

Define shape of the input tensor and output tensor and fit the model (on the driver). We need to use Petastorm's [make_batch_reader](https://petastorm.readthedocs.io/en/latest/api.html#petastorm.reader.make_batch_reader) to create an instance of Reader for reading batches out of a non-Petastorm Parquet store.

In [0]:
from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset

abs_file_path = file_path.replace("dbfs:/", "/dbfs/")

with make_batch_reader("file://" + abs_file_path, num_epochs=None) as reader: 
  dataset = make_petastorm_dataset(reader).map(lambda x: (tf.reshape(x.features, [-1,8]), tf.reshape(x.label, [-1,1])))
  model = build_model()
  optimizer = keras.optimizers.Adam(lr=0.001)
  model.compile(optimizer=optimizer,
                loss='mse',
                metrics=['mse'])
  model.fit(dataset, steps_per_epoch=10, epochs=10)

## Horovod

Let's do the same thing, but let's add in Horovod for distributed model training.

In [0]:
import horovod.tensorflow.keras as hvd

def run_training_horovod():
  # Horovod: initialize Horovod.
  hvd.init()
  with make_batch_reader("file://" + abs_file_path, num_epochs=None, cur_shard=hvd.rank(), shard_count= hvd.size()) as reader:
    dataset = make_petastorm_dataset(reader).map(lambda x: (tf.reshape(x.features, [-1,8]), tf.reshape(x.label, [-1,1])))
    model = build_model()
    from tensorflow.keras import optimizers
    optimizer = optimizers.Adam(lr=0.001*hvd.size())
    optimizer = hvd.DistributedOptimizer(optimizer)
    model.compile(optimizer=optimizer,
                  loss='mse',
                  metrics=['mse'])
    history = model.fit(dataset, steps_per_epoch=10, epochs=10)

## Train on driver

In [0]:
from sparkdl import HorovodRunner
hr = HorovodRunner(np=-1)
hr.run(run_training_horovod)

## Better Horovod

In [0]:
import horovod.tensorflow.keras as hvd


dbutils.fs.rm(f"{ml_working_path}/petastorm_checkpoint_weights.ckpt", True)
def run_training_horovod():
  # Horovod: initialize Horovod.
  hvd.init()
  with make_batch_reader("file://" + abs_file_path, num_epochs=None, cur_shard=hvd.rank(), shard_count=hvd.size()) as reader:
    dataset = make_petastorm_dataset(reader).map(lambda x: (tf.reshape(x.features, [-1,8]), tf.reshape(x.label, [-1,1])))
    model = build_model()
    from tensorflow.keras import optimizers
    optimizer = optimizers.Adam(lr=0.001*hvd.size())
    optimizer = hvd.DistributedOptimizer(optimizer)
    model.compile(optimizer=optimizer,
                  loss='mse',
                  metrics=['mse'])
    checkpoint_dir = f"{ml_working_path}/petastorm_checkpoint_weights.ckpt"
    callbacks = [
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    hvd.callbacks.MetricAverageCallback(),
    hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="loss", patience=10, verbose=1)
    ]

    if hvd.rank() == 0:
      callbacks.append(tf.keras.callbacks.ModelCheckpoint(checkpoint_dir, save_weights_only=True))
  
    history = model.fit(dataset, steps_per_epoch=10, epochs=10, callbacks=callbacks)

In [0]:
import horovod.tensorflow.keras as hvd
from sparkdl import HorovodRunner
hr = HorovodRunner(np=-1)
hr.run(run_training_horovod)

## Run on all workers

In [0]:
from sparkdl import HorovodRunner
hr = HorovodRunner(np=0)
hr.run(run_training_horovod)

Let's go through each step -

### 6. **Saving Data to Parquet Format**

Imagine you just baked a batch of cookies, and you need to put them in little bags so your friends can each grab a bag. Here:
- **Saving as Parquet**: We’re saving our data (like the cookies) in a special format called **Parquet**, which makes it easy for Petastorm to read.
  
   ```python
   file_path = f"{workingDir}/deep-learning/petastorm.parquet"
   vecTrainDF.selectExpr("toArray(features) AS features", "label").repartition(8).write.mode("overwrite").parquet(file_path)
   ```

- **Repartitioning**: Instead of putting all cookies in one giant bag, we split them into 8 smaller bags. This way, when training starts, it’s faster for each computer to pick up a bag of cookies (data) and start working.

### 7. **Removing Metadata Files**

Some extra "sticky notes" or tags (metadata files) get left behind in our Parquet folder, which can confuse Petastorm and Horovod when they try to read data for training. We remove these extra tags so training runs smoothly.

   ```python
   [dbutils.fs.rm(i.path) for i in dbutils.fs.ls(file_path) if ("_committed_" in i.name) | ("_started_" in i.name)]
   ```

### 8. **Defining the Model**

Now we’re building our "brain" model, or neural network, which will learn from the housing data and make predictions. Think of this as setting up a robot that will be taught to guess house prices.

   ```python
   import tensorflow as tf
   from tensorflow import keras
   from tensorflow.keras import models, layers

   def build_model():
       model = models.Sequential()
       model.add(layers.Dense(20, input_dim=8, activation='relu'))
       model.add(layers.Dense(20, activation='relu'))
       model.add(layers.Dense(1, activation='linear'))
       return model
   ```

- **Sequential Model**: We create a model with two layers to process the data and an output layer to predict the house price.

### 9. **Single Node Training**

This is where we begin training the model. We take small batches of data from the Parquet file and give them to our model to learn step-by-step.

   ```python
   from petastorm import make_batch_reader
   from petastorm.tf_utils import make_petastorm_dataset

   abs_file_path = file_path.replace("dbfs:/", "/dbfs/")
   with make_batch_reader("file://" + abs_file_path, num_epochs=None) as reader:
       dataset = make_petastorm_dataset(reader).map(lambda x: (tf.reshape(x.features, [-1,8]), tf.reshape(x.label, [-1,1])))
       model = build_model()
       optimizer = keras.optimizers.Adam(lr=0.001)
       model.compile(optimizer=optimizer, loss='mse', metrics=['mse'])
       model.fit(dataset, steps_per_epoch=10, epochs=10)
   ```

- **Petastorm Reader**: It’s like having a teacher who hands out homework one page at a time so the model can practice on small pieces instead of the whole dataset at once.
- **Model Training**: We tell the model to minimize mistakes (using the "mse" loss function) as it learns, making better predictions over time.

### 10. **Distributed Training with Horovod**

Now we want to speed up training by splitting the work among multiple computers. Horovod makes it possible for these computers to work together on the same task, so they each work on a portion of the data and share their progress.

   ```python
   import horovod.tensorflow.keras as hvd

   def run_training_horovod():
       hvd.init()
       with make_batch_reader("file://" + abs_file_path, num_epochs=None, cur_shard=hvd.rank(), shard_count= hvd.size()) as reader:
           dataset = make_petastorm_dataset(reader).map(lambda x: (tf.reshape(x.features, [-1,8]), tf.reshape(x.label, [-1,1])))
           model = build_model()
           optimizer = keras.optimizers.Adam(lr=0.001 * hvd.size())
           optimizer = hvd.DistributedOptimizer(optimizer)
           model.compile(optimizer=optimizer, loss='mse', metrics=['mse'])
           history = model.fit(dataset, steps_per_epoch=10, epochs=10)
   ```

- **Horovod Initialization**: This step organizes each computer’s role, giving each computer a "shard" or part of the data, like assigning each friend a part of a puzzle to work on.
- **Distributed Training**: Each computer works on its shard of data, and Horovod keeps them all in sync, so the model learns faster than if just one computer was working on the task.

### 11. **Running Horovod on All Workers**

Finally, we need to launch our training function across all computers in the cluster. The HorovodRunner helps us do this, sending the `run_training_horovod` function to all computers so they can work together.

   ```python
   from sparkdl import HorovodRunner
   hr = HorovodRunner(np=-1)
   hr.run(run_training_horovod)
   ```

- **Horovod Runner**: Think of it as the coordinator who tells everyone (all computers) to start working on their pieces of the data. The computers work in sync to complete the task quickly.

And that’s how everything works together to train the model! Petastorm helps with reading the data, Horovod splits the task among many computers, and together they make training faster and more efficient.