Maggy  is an open-source framework that simplifies writing and maintaining distributed machine learning programs.
By encapsulating your training logic in a function,
the same code can be run unchanged with Python on your laptop or distributed using PySpark for hyperparameter tuning, 
data-parallel training, or model-parallel training. 
With the arrival of GPU support in Spark 3.0, 
PySpark can be now used to orchestrate distributed deep learning applications in TensorFlow and PySpark.  
We are pleased to announce we have now added support for Maggy on Databricks, 
so training machine learning models with many workers should be as easy as running Python programs on your laptop. 

### 0. Spark Session

First, make sure you have a running Spark Session/Context available.

In [None]:
from pyspark.sql import SparkSession

Make sure you have the right tensorflow version.

In [None]:
%pip install tensorflow-cpu==2.4.1
import tensorflow as tf

### 1. Model definition

Let's define the model we want to train. The layers of the model have to be defined in the \_\_init__ function.

Do not instantiate the class, otherwise you won't be able to use Maggy.

In [None]:
from tensorflow import keras 
from tensorflow.keras.layers import Dense
from tensorflow.keras import Sequential
from tensorflow.keras.optimizers import Adam

# you can use keras.Sequential(), you just need to override it 
# on a custom class and define the layers in __init__()
class NeuralNetwork(Sequential):
        
    def __init__(self, nl=4):
        
        super().__init__()
        self.add(Dense(10,input_shape=(None,4),activation='tanh'))
        if nl >= 4:
          for i in range(0, nl-2):
            self.add(Dense(8,activation='tanh'))
        self.add(Dense(3,activation='softmax'))

model = NeuralNetwork

### 2. Dataset creation

In this example, we are using the iris dataset. Let's download the dataset from https://www.kaggle.com/uciml/iris and upload it on your Databricks profile.

You can process the dataset in the notebook and pass it to the configuration classes, or process it during the experiment.
In order to do that you have to wrap the processing logic in a function and pass it to the training configuration (this step is currently supported only by TfDistributedConfig).

You need to change the dataset path is correct.

In [None]:
display(dbutils.fs.ls("/FileStore/tables/iris_train-2.csv"))

In [None]:
train_set_path = "dbfs:/FileStore/tables/iris_train-2.csv"
test_set_path = "dbfs:/FileStore/tables/iris_test-1.csv"

train_set = spark.read.format("csv").option("header","true")\
  .option("inferSchema", "true").load(train_set_path).drop('_c0')

test_set = spark.read.format("csv").option("header","true")\
  .option("inferSchema", "true").load(test_set_path).drop('_c0')

raw_train_set = train_set.toPandas().values
raw_test_set = test_set.toPandas().values

We can also wrap the data processing in a function and pass it to the training configuration, as we'll see later.

In [None]:
def process_data(train_set, test_set):
    
    X_train = train_set[:,0:4]
    y_train = train_set[:,4:]
    X_test = test_set[:,0:4]
    y_test = test_set[:,4:]

    return (X_train, y_train), (X_test, y_test)
  
train_set, test_set = process_data(raw_train_set, raw_test_set)

### 3. Wrap the training logics.

The programming model is that you wrap the code containing the logics of your experiment in a function.

For HPO, we have to define a function that has the HPs to be optimized as parameters. Inside the function we simply put
the training logic as we were training our model in a single machine using Tensorflow.

In [None]:
def hpo_function(number_layers, reporter):
  
  model = NeuralNetwork(nl=number_layers)
  model.build()
  
  #fitting the model and predicting
  model.compile(Adam(lr=0.04),'categorical_crossentropy',metrics=['accuracy'])
  train_input, test_input = process_data(raw_train_set, raw_test_set)

  train_batch_size = 75
  test_batch_size = 15
  epochs = 10
  
  model.fit(x=train_input[0], y=train_input[1],
            batch_size=train_batch_size,
            epochs=epochs,
            verbose=1)

  score = model.evaluate(x=test_input[0], y=test_input[1], batch_size=test_batch_size, verbose=1)
                         
  print(f'Test loss: {score[0]}')
  print(f'Test accuracy: {score[1]}')

  return score[1]

We do the same for the training function, this time passing the model, train_set, test_set and hparams.

In [None]:
def training_function(model, train_set, test_set, hparams):
    
    model = model(nl=hparams['number_layers'])
    model.build()
    #fitting the model and predicting

    model.compile(Adam(lr=hparams['learning_rate']),'categorical_crossentropy',metrics=['accuracy'])
    
    #raise ValueError(list(train_set.as_numpy_iterator()))

    model.fit(train_set,epochs=hparams['epochs'])

    accuracy = model.evaluate(test_set)

    return accuracy

In the next step we have to create a configuration instance for Maggy. Since in this example we are using Maggy for hyperparameter optimization and distributed training using TensorFlow, we will use OptimizationConfig and TfDistributedConfig.

### 4. Configure and run distributed HPO


OptimizationConfig contains the information about the hyperparameter optimization.
We need to define a Searchspace class that contains the hyperparameters we want to get optimized. In this example we want to search for the optimal number of layers of the neural network from 2 to 8 layers.

OptimizationConfig the following parameters:
* num_trials: Controls how many separate runs are conducted during the hp search.
* optimizer: Optimizer type for searching the hp searchspace.
* searchspace: A Searchspace object configuring the names, types and ranges of hps.
* optimization_key: Name of the metric to use for hp search evaluation.
* direction: Direction of optimization.
* es_interval: Early stopping polling frequency during an experiment run.
* es_min: Minimum number of experiments to conduct before starting the early stopping mechanism. Useful to establish a baseline for performance estimates.
* es_policy: Early stopping policy which formulates a rule for triggering aborts.
* name: Experiment name.
* description: A description of the experiment.
* hb_interval: Heartbeat interval with which the server is polling.
* fixed_hp: Hyperparamets not to be tuned.

In [None]:
from maggy.experiment_config import OptimizationConfig
from maggy import Searchspace

# The searchspace can be instantiated with parameters
sp = Searchspace(number_layers=('INTEGER', [2, 8]))

hpo_config = OptimizationConfig(num_trials=4, optimizer="randomsearch", searchspace=sp, direction="max", es_interval=1, es_min=5, name="hp_tuning_test")

Our HPO function and configuration class are now ready, so we can go on and run the HPO experiment. In order to do that, we run the lagom function, passing our training function and the configuration object we instantiated during the last step.
Lagom is a swedish word meaning "just the right amount".

In [None]:
from maggy import experiment

result = experiment.lagom(train_fn=hpo_function, config=hpo_config)

print(result)

### 5. Configure and run distributed training


Now it's time to run the final step of our ML program. Let's initialize the configuration class for the distributed training. First, we need to define our hyperparameters, we want to take the best hyperparameters from the HPO.

In [None]:
#define the constructor parameters of your model
model_params = {
    #train dataset entries / num_workers
    'train_batch_size': 75,
    #test dataset entries / num_workers
    'test_batch_size': 15,
    'learning_rate': 0.04,
    'epochs': 20,
    'number_layers': result['best_config']['number_layers'],
}

TfDistributedConfig class contains the following parameters:
* name: the name of the experiment.
* module: the model to be trained (defined in the first step of this guideline).
* train_set: the train set as a tuple (x_train, y_train) or the train set path.
* test_set: the test set as a tuple (x_test, y_test) or the test set path.
* process_data: the function to process the data (if needed).
* hparams: the model and dataset parameters. In this case we also need to provide the 'train_batch_size' and the 'test_batch_size', these values represent the subset sizes of the sharded dataset. It's value is usually the dataset_size/number_workers but can change depending on your needs.

In [None]:
from maggy.experiment_config.tf_distributed import TfDistributedConfig

training_config = TfDistributedConfig(name="tf_test", model=model, train_set=train_set, test_set=test_set, process_data=process_data, hparams = model_params)

Finally, we are ready to launch the maggy experiment. You just need to pass 2 parameters: the training function and the configuration variable we defined in the previous steps.

In [None]:
experiment.lagom(training_function, training_config)