In [None]:
import os
import boto3
import zipfile
import ray

from ray import tune
from ray import serve
from ray.air.config import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
from ray.train.xgboost import XGBoostPredictor
from ray.train.batch_predictor import BatchPredictor
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import pandas_read_json
from ray.tune import Tuner, TuneConfig

import requests

ray.init()

# Ray Train

## Intro

### Outline

-   Goals
-   Ray Air `Trainer`
    - Design
    - Flavors
    - In-depth with Tensorflow Trainer

### Model scenarios with Ray + Tensorflow Trainer

- Start with a minimal model and focus on key elements for Ray Train workflow
- Port a Tensorflow tutorial word2vec model to Ray Train

### Context: Ray AIR

Ray AIR is the Ray AI Runtime, a set of high-level easy-to-use APIs for
ingesting data, training models – including reinforcement learning
models – tuning those models and then serving them.

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Introduction_to_Ray_AIR/e2e_air.png" width=600 loading="lazy"/>

Key principles behind Ray and Ray AIR are
* Performance
* Developer experience and simplicity

__Read, preprocess with Ray Data__

In [None]:
dataset = ray.data.read_parquet("s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2021.parquet")

train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

__Fit model with Ray Train__

In [None]:
trainer = XGBoostTrainer(
    label_column="is_big_tip",
    scaling_config=ScalingConfig(num_workers=32, use_gpu=False),
    params={ "objective": "binary:logistic", },
    datasets={"train": train_dataset, "valid": valid_dataset},
)

result = trainer.fit()

__Optimize hyperparams with Ray Tune__

In [None]:
tuner = Tuner(trainer, 
            param_space={'params' : {'max_depth': tune.randint(2, 12)}},
            tune_config=TuneConfig(num_samples=10, metric='train-logloss', mode='min'))

checkpoint = tuner.fit().get_best_result().checkpoint

__Batch prediction__

In [None]:
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)

predicted_probabilities = batch_predictor.predict(valid_dataset.drop_columns(['is_big_tip']))

__Online prediction with Ray Serve__

In [None]:
deployment = PredictorDeployment.bind(XGBoostPredictor, result.checkpoint, http_adapter=pandas_read_json)

serve.run(deployment)

__HTTP or Python services__

In [None]:
sample_input = dict(valid_dataset.take(1)[0])
del(sample_input['is_big_tip'])
del(sample_input['__index_level_0__'])
requests.post("http://localhost:8000/", json=[sample_input]).json()

## Ray Train Goals

* Developer experience
* Flexibility
* Performance and simplicity via delegation

## API design and usage

### Ideas

* `Trainer` and `Checkpoint` are key classes
    * Supported by additional classes, e.g., `ScalingConfig`
* Train does not re-implement distributed optimizers
    * Train coordinates and delegates native library distributed training
* `Trainer` produces `Checkpoint`(s)
    * `Checkpoint` encapsulates outputs
        * Model weights/assets
            * Typically by reference to file storage
        * Scores/stats
* `Trainer` is used by Train, Tune
* `Checkpoint` is used for inference (Ray Data [batch], Serve [online]) and reporting
* These API patterns apply across __all__ `Trainer` flavors

### Trainer Flavors

* Tree - e.g., XGBoost
* Library - e.g., Huggingface
* DL Trainers
    * PyTorch, TensorFlow, Horovod, Lightning, Accelerate

### Focus: Tensorflow Trainer

Tensorflow Trainer automates deployment of MultiWorkerMirroredStrategy

> This strategy implements synchronous distributed training across multiple workers, each with potentially multiple GPUs. Similar to tf.distribute.MirroredStrategy, it replicates all variables and computations to each local device. The difference is that it uses a distributed collective implementation (e.g. all-reduce), so that multiple workers can work together.
> (https://www.tensorflow.org/api_docs/python/tf/distribute/MultiWorkerMirroredStrategy)

## "Hello TF Trainer World"

We'll build a minimal example example with the iris dataset and a trivial model -- the goal is to look at the data/train code structure using Ray

In [None]:
import tensorflow as tf
from tensorflow.keras import layers

from ray.data.preprocessors import Concatenator, OneHotEncoder
from ray.air import session
from ray.air.integrations.keras import ReportCheckpointCallback
from ray.train.tensorflow import TensorflowTrainer
from ray.air.config import ScalingConfig

In this example, we'll start with a Ray Datase. Using Ray Data and dataset is optional -- we can use existing `tf.data` datasets if we like.

In [None]:
ds = ray.data.read_csv("s3://air-example-data/iris.csv")
ds

In [None]:
ds.take(2)

Ray Data directly supports NumPy tensor data, but sometimes we are ingesting tabular business data with distinct columns.

If your dataset contains multiple features but your model accepts a single tensor as input, combine features with Concatenator.

https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.to_tf.html

In [None]:
feature_preprocessor = Concatenator(output_column_name="features", exclude="target")

ds = feature_preprocessor.transform(ds)

ds

One-hot encode target category label -- similar to `tf.keras.utils.to_categorical`

In [None]:
target_onehot = OneHotEncoder(columns=["target"])

ds = target_onehot.fit_transform(ds)

ds

Collect one-hot encoded columns into vectors

In [None]:
target_concat = Concatenator(output_column_name="target", exclude="features")

ds = target_concat.transform(ds)

ds

Define a function for building our model

In [None]:
def build_model() -> tf.keras.Model:
    model = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(4,)),
            tf.keras.layers.Dense(5, activation='relu'),
            tf.keras.layers.Dense(3, activation='softmax')
        ]
    )
    return model

### Per-worker training function

The key element of the Ray Train interface pattern is the per-worker training function.

* Will run on each worker
* Receives a dict of configurations
* Can interact with other parts of the distributed training collective via `session`

In [None]:
def train_func(config: dict):
    batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)

    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        multi_worker_model = build_model()
        multi_worker_model.compile(
            optimizer=tf.keras.optimizers.SGD(learning_rate=config.get("lr", 1e-3)),
            loss=tf.keras.metrics.categorical_crossentropy,
            metrics=[tf.keras.metrics.categorical_crossentropy],
        )

    dataset = session.get_dataset_shard("train")

    results = []
    
    tf_dataset = dataset.to_tf(feature_columns="features", label_columns="target", batch_size=batch_size)
        
    history = multi_worker_model.fit(tf_dataset, epochs=epochs, callbacks=[ReportCheckpointCallback()])

__Notes:__
* Model building/compiling takes place inside `strategy.scope()` context block
* Data access and model.fit(...) is inside the function
* Inputs are provided to this function via `config` and `session`

<img src='https://docs.ray.io/en/latest/_images/session.svg' width=800 />

(https://docs.ray.io/en/latest/ray-air/api/session.html)

Function output is via Callbacks with Checkpoints
* ray.air.integrations.keras.ReportCheckpointCallback https://docs.ray.io/en/latest/tune/api/doc/ray.air.integrations.keras.ReportCheckpointCallback.html
* *To save a model to use for the TensorflowPredictor, you must save it under the “model” kwarg in Checkpoint passed to session.report().*
    * https://docs.ray.io/en/latest/train/api/doc/ray.train.tensorflow.TensorflowTrainer.html#ray.train.tensorflow.TensorflowTrainer

If we don't want to use Callbacks, we can use `session` to manually report information including Checkpoints:

```python
session.report(
    {},
    checkpoint=Checkpoint.from_dict(dict(epoch=epoch, model=model.get_weights())
    ),
)
```
*In this latter example, we're responsible for checkpointing frequency, which means we may need to train on one epoch at a time*

(https://docs.ray.io/en/latest/train/dl_guide.html)

### Using the Trainer

The per-worker training function contains our logic. The `TensorflowTrainer` instance ties that function together with configuration and orchestrates the training operation.

In [None]:
train_config = {"lr": 1e-3, "batch_size": 32, "epochs": 4}

scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

trainer = TensorflowTrainer(
    train_loop_per_worker=train_func,
    train_loop_config=train_config,
    scaling_config=scaling_config,
    datasets={"train": ds},
)

We fit a trainer to get a result, which wraps metadata and a model checkpoint

In [None]:
result = trainer.fit()

In [None]:
result.metrics

In [None]:
result.checkpoint

## Porting a word2vec model from TF to Ray

In this example, we'll use the skip-gram word2vec model from the Tensorflow word2vec tutorial (https://www.tensorflow.org/tutorials/text/word2vec)
* Focus on adapting training to Ray
* Start with existing training dataset in `tf.data.Dataset` form

In [None]:
class Word2Vec(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim):
        super(Word2Vec, self).__init__()
        self.target_embedding = layers.Embedding(vocab_size,
                                      embedding_dim,
                                      input_length=1,
                                      name="w2v_embedding")

        num_ns = 4 # from dataset construction
        self.context_embedding = layers.Embedding(vocab_size,
                                       embedding_dim,
                                       input_length=num_ns+1)

    def call(self, pair):
        target, context = pair
        if len(target.shape) == 2:
            target = tf.squeeze(target, axis=1)
        word_emb = self.target_embedding(target)
        context_emb = self.context_embedding(context)
        dots = tf.einsum('be,bce->bc', word_emb, context_emb)
        return dots

The following code is adapted from Ray's documentation walkthrough for porting TF code to Ray AIR: https://docs.ray.io/en/latest/ray-air/examples/convert_existing_tf_code_to_ray_air.html

In [None]:
# 1. Pass in the hyperparameter config
def train_func(config: dict):
    epochs = config.get("epochs", 5)
    batch_size_per_worker = config.get("batch_size", 32)
    buffer_size = config.get("buffer_size", 8192)
    
    # 2. Synchronized model setup
    # 
    # Important: The strategy must be instantiated at the beginning
    #     of the function, since the tf.Dataset that we load later needs
    #     to be auto-sharded.
    #     See https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
    #     for more details.
    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        vocab_size = 4096
        embedding_dim = 128
        model = Word2Vec(vocab_size, embedding_dim)
        model.compile(optimizer='adam',
                 loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
                 metrics=['accuracy'])

    # 3. Set a `global_batch_size` so that every worker gets the specified
    #    `batch_size_per_worker` regardless of the number of workers.
    #    This is needed because the datasets are sharded across `session.get_world_size()` workers.
    global_batch_size = batch_size_per_worker * session.get_world_size()
    
    # Download data
    s3 = boto3.client('s3')
    s3.download_file('anyscale-training-data', config.get('tf_data'), 'tfdata.zip')
    unique_tempdir = '/tmp/' + str(session.get_world_rank())
    with zipfile.ZipFile('tfdata.zip', 'r') as zip_ref:
        zip_ref.extractall(unique_tempdir)
    
    ds_path = ds_path = unique_tempdir + '/w2v.data.tf'
    train_ds = tf.data.Dataset.load(ds_path).shuffle(buffer_size).batch(global_batch_size).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
    # ^ Even though we are loading the dataset as a standard TF dataset, 
    #   TF will automatically shard the datasets across workers, according to the strategy.
      
    # 4. Report metrics and checkpoint the model
    report_metrics_and_checkpoint_callback = ReportCheckpointCallback(report_metrics_on="epoch_end")
    model.fit(
        train_ds,
        batch_size=batch_size_per_worker,
        epochs=epochs,
        callbacks=[report_metrics_and_checkpoint_callback],
    )

The `TensorflowTrainer` setup links our configurations and training function.

In [None]:
BUFFER_SIZE = 10000
BATCH_SIZE = 1024

train_config = {"batch_size": BATCH_SIZE, "epochs": 4, "buffer_size" : BUFFER_SIZE, "tf_data" : "tf-w2v-sample-data/w2v.data.tf.zip" }

scaling_config = ScalingConfig(num_workers=8, use_gpu=False)

trainer = TensorflowTrainer(
    train_loop_per_worker=train_func,
    train_loop_config=train_config,
    scaling_config=scaling_config,
)

In [None]:
result = trainer.fit()

In [None]:
result

In [None]:
result.checkpoint

We could use this checkpoint for batch processing or online inference.

There are a few additional options for checkpoints (e.g., number of checkpoints to retain and the mechanism for ranking them to support the `best_checkpoints` API) which can be set via `CheckpointConfig` https://docs.ray.io/en/latest/ray-air/api/doc/ray.air.CheckpointConfig.html#ray.air.CheckpointConfig