# Quickstart with Ray AI Runtime

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Generic/ray_logo.png" width="20%" loading="lazy">

## Preliminaries

### Install libraries

In [1]:
#! pip install -U ray==2.3.0 xgboost_ray==0.1.18 --quiet
! pip install xgboost_ray==0.1.18 --quiet


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Imports

In [2]:
import ray
from ray.air.config import ScalingConfig
from ray.data.preprocessors import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer

### Initialize Ray runtime

In [3]:
# Import pieces from codeflare-sdk
# !pip install -U scikit-learn scipy matplotlib --quiet

# !pip install scikit-learn
!pip install datasets  --quiet
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [4]:
# Create authentication object for user permissions
# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config
# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually
auth_token = "sha256~GgywL5jjJJSuqlWtu9DIDTpH2YEEAeI-lSDuJdMlQbU" # The auth_token is used later for the RayJobClient
auth = TokenAuthentication(
    token = auth_token,
    server = "https://api.cluster-98dpx.sandbox1573.opentlc.com:6443",
    skip_tls = False
)
auth.login()

'Logged into https://api.cluster-98dpx.sandbox1573.opentlc.com:6443'

In [5]:
# Create and configure our cluster object (and appwrapper)
cluster = Cluster(ClusterConfiguration(
    name='quickstart-ray-air',
    namespace='ray-distributed-workload',
    num_workers=2,
    min_cpus=8,
    max_cpus=8,
    min_memory=16,
    max_memory=16,
    num_gpus=2,
    image="quay.io/project-codeflare/ray:latest-py39-cu118",
    instascale=False, #<---instascale enabled
    # machine_types=["m5.xlarge", "g4dn.xlarge"]
    # machine_types=["m5.2xlarge", "g5.2xlarge"],
    openshift_oauth=True
))

Written to: /opt/app-root/src/.codeflare/appwrapper/quickstart-ray-air.yaml


In [6]:
cluster.up()
cluster.status()
cluster.wait_ready()
cluster.status()
cluster.details()


Waiting for requested resources to be set up...
Requested cluster is up and running!
Dashboard is ready!




RayCluster(name='quickstart-ray-air', status=<RayClusterStatus.READY: 'ready'>, head_cpus=2, head_mem=8, head_gpu=0, workers=2, worker_mem_min=16, worker_mem_max=16, worker_cpu=8, worker_gpu=2, namespace='ray-distributed-workload', dashboard='https://ray-dashboard-quickstart-ray-air-ray-distributed-workload.apps.cluster-98dpx.sandbox1573.opentlc.com')

In [7]:
ray_dashboard_uri = cluster.cluster_dashboard_uri()
ray_cluster_uri = cluster.cluster_uri()
print(ray_dashboard_uri)
print(ray_cluster_uri)

https://ray-dashboard-quickstart-ray-air-ray-distributed-workload.apps.cluster-98dpx.sandbox1573.opentlc.com
ray://quickstart-ray-air-head-svc.ray-distributed-workload.svc:10001


In [8]:
#before proceeding make sure the cluster exists and the uri is not empty
# assert ray_cluster_uri, "Ray cluster needs to be started and set before proceeding"

import ray
from ray.air.config import ScalingConfig

# reset the ray context in case there's already one. 
ray.shutdown()
# establish connection to ray cluster

#install additional libraries that will be required for model training
runtime_env = {"pip": ["transformers", "datasets", "evaluate", "pyarrow<7.0.0", "ipython","xgboost_ray","accelerate"]}

# NOTE: This will work for in-cluster notebook servers (RHODS/ODH), but not for local machines
# To see how to connect from your laptop, go to demo-notebooks/additional-demos/local_interactive.ipynb
ray.init(address=ray_cluster_uri, runtime_env=runtime_env)

print("Ray cluster is up and running: ", ray.is_initialized())

Ray cluster is up and running:  True


[2m[36m(TunerInternal pid=1180)[0m [output] This will use the new output engine with verbosity 1. To disable the new output and use the legacy output engine, set the environment variable RAY_AIR_NEW_OUTPUT=0. For more information, please see https://github.com/ray-project/ray/issues/36949


[2m[36m(TunerInternal pid=1180)[0m 
[2m[36m(TunerInternal pid=1180)[0m View detailed results here: /home/ray/ray_results/XGBoostTrainer_2024-04-13_03-14-51
[2m[36m(TunerInternal pid=1180)[0m To visualize your results with TensorBoard, run: `tensorboard --logdir /home/ray/ray_results/XGBoostTrainer_2024-04-13_03-14-51`


[2m[36m(TunerInternal pid=1180)[0m AIR_VERBOSITY is set, ignoring passed-in ProgressReporter for now.


[2m[36m(TunerInternal pid=1180)[0m 
[2m[36m(TunerInternal pid=1180)[0m Training started without custom configuration.


[2m[36m(XGBoostTrainer pid=1311)[0m [RayXGBoost] Created 2 new actors (2 total actors). Waiting until actors are ready for training.
[2m[36m(XGBoostTrainer pid=1311)[0m Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
[2m[36m(XGBoostTrainer pid=1311)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(XGBoostTrainer pid=1311)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(XGBoostTrainer pid=1311)[0m Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
[2m[36m(XGBoostTrainer pid=1311)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enable

[2m[36m(TunerInternal pid=1180)[0m 
[2m[36m(TunerInternal pid=1180)[0m Training finished iteration 1 at 2024-04-13 03:15:02. Total running time: 7s
[2m[36m(TunerInternal pid=1180)[0m ╭───────────────────────────────╮
[2m[36m(TunerInternal pid=1180)[0m │ Training result               │
[2m[36m(TunerInternal pid=1180)[0m ├───────────────────────────────┤
[2m[36m(TunerInternal pid=1180)[0m │ checkpoint_dir_name           │
[2m[36m(TunerInternal pid=1180)[0m │ time_this_iter_s      6.42428 │
[2m[36m(TunerInternal pid=1180)[0m │ time_total_s          6.42428 │
[2m[36m(TunerInternal pid=1180)[0m │ training_iteration          1 │
[2m[36m(TunerInternal pid=1180)[0m │ train-error           0.39698 │
[2m[36m(TunerInternal pid=1180)[0m │ train-logloss         0.66884 │
[2m[36m(TunerInternal pid=1180)[0m │ valid-error           0.39574 │
[2m[36m(TunerInternal pid=1180)[0m │ valid-logloss         0.66836 │
[2m[36m(TunerInternal pid=1180)[0m ╰───────────────

[2m[36m(XGBoostTrainer pid=1311)[0m Training in progress (30 seconds since last restart).


[2m[36m(TunerInternal pid=1180)[0m 
[2m[36m(TunerInternal pid=1180)[0m Training finished iteration 62 at 2024-04-13 03:15:30. Total running time: 35s
[2m[36m(TunerInternal pid=1180)[0m ╭───────────────────────────────╮
[2m[36m(TunerInternal pid=1180)[0m │ Training result               │
[2m[36m(TunerInternal pid=1180)[0m ├───────────────────────────────┤
[2m[36m(TunerInternal pid=1180)[0m │ checkpoint_dir_name           │
[2m[36m(TunerInternal pid=1180)[0m │ time_this_iter_s      1.01915 │
[2m[36m(TunerInternal pid=1180)[0m │ time_total_s          34.0501 │
[2m[36m(TunerInternal pid=1180)[0m │ training_iteration         62 │
[2m[36m(TunerInternal pid=1180)[0m │ train-error           0.38648 │
[2m[36m(TunerInternal pid=1180)[0m │ train-logloss         0.65601 │
[2m[36m(TunerInternal pid=1180)[0m │ valid-error           0.38817 │
[2m[36m(TunerInternal pid=1180)[0m │ valid-logloss         0.65845 │
[2m[36m(TunerInternal pid=1180)[0m ╰─────────────

[2m[36m(XGBoostTrainer pid=1311)[0m [RayXGBoost] Finished XGBoost training on training data with total N=1,893,432 in 51.27 seconds (48.23 pure XGBoost training time).
[2m[36m(XGBoostTrainer pid=1311)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/ray/ray_results/XGBoostTrainer_2024-04-13_03-14-51/XGBoostTrainer_ab0d8_00000_0_2024-04-13_03-14-54/checkpoint_000000)


## Load and prepare data with Ray Datasets

### Read Parquet file to Ray Dataset

In [9]:
dataset = ray.data.read_parquet(
    "s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2021.parquet"
)

2024-04-13 10:13:08,897	INFO read_api.py:406 -- To satisfy the requested parallelism of 147, each read task output is split into 147 smaller blocks.


Returned `dataset` is [Ray Dataset](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html#ray-data-dataset) - standard way to load and exchange data in Ray AI Runtime.

In AIR, Datasets are used extensively for data loading and transformation. They are meant as a last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.

### Split data into training and validation subsets

In [10]:
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

Read progress 0:   0%|          | 0/1 [00:00<?, ?it/s]

### Split datasets into blocks for parallel preprocessing

In [11]:
train_dataset = train_dataset.repartition(num_blocks=3)
valid_dataset = valid_dataset.repartition(num_blocks=3)

`num_blocks` should be lower than number of cores in the cluster

### Define a preprocessor to normalize the columns by their range

In [12]:
preprocessor = MinMaxScaler(columns=["trip_distance", "trip_duration"])

[Preprocessors](https://docs.ray.io/en/latest/ray-air/key-concepts.html#preprocessors) are primitives that transform input data into features. They operate on Datasets, making them scalable and compatible with a variety of datasources and dataframe libraries.

Ray AI Runtime comes with a collection of built-in preprocessors, and you can also define your own with simple templates (see [Using preprocessors](https://docs.ray.io/en/latest/ray-air/preprocessors.html) for more information).

## Train the model with Ray Train

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Scaling_model_training/data_parallelism.png" width="50%" loading="lazy">|
|:--|
|Ray Train provides distributed data parallel training capabilities. A large dataset is sharded across multiple worker nodes each containing a model copy. Gradients calculated on independent nodes are continuously synchronized with others to produce a final trained model.|

### Create XGBoost trainer

In [13]:
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

from ray.train.torch import prepare_model

trainer = XGBoostTrainer(
    label_column="is_big_tip",
    num_boost_round=100,
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration. Set to True to schedule GPU workers.
        use_gpu=True,
    ),
    params={
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "tree_method": "approx",  # use "gpu_hist" for GPU training
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    # preprocessor=None,
)

# def train_loop_per_worker():
#     # Get an iterator to the dataset we passed in below.
#     it = train.get_dataset_shard("train")
#     for _ in range(2):
#         # Prefetch 10 batches at a time.
#         for batch in it.iter_batches(batch_size=128, prefetch_batches=10):
#              print("Do some training on batch", batch)


# trainer = TorchTrainer(
#     train_loop_per_worker=train_loop_per_worker,
#     train_loop_config={"batch_size": 200},
#     # datasets={"train": train_dataset},
#     datasets={"train": train_dataset, "valid": valid_dataset},
#     scaling_config = ScalingConfig(
#             num_workers=2,
#             use_gpu=True,
#             resources_per_worker={"GPU": 1}
#     )
# )



During training, `trainer` will use `num_blocks` workers, defined when repartitioning dataset.

Ray AI Runtime comes with built-in integrations with mang popular ML projects like PyTorch, Keras, LightGBM and more. Refer to the [Ray Train docs](https://docs.ray.io/en/latest/train/train.html#quick-start-to-distributed-training-with-ray-train) for more details. Optionally, read more about the Ray-XGBoost integration in the [Introducing Distributed XGBoost Training with Ray](https://www.anyscale.com/blog/distributed-xgboost-training-with-ray) blog post.

### Invoke training - this is computationally intensive operation

In [14]:
result = trainer.fit()

The resulting object grants access to metrics, checkpoints, and errors

### Report results

In [15]:
print(f"train acc = {1 - result.metrics['train-error']:.4f}")
print(f"valid acc = {1 - result.metrics['valid-error']:.4f}")
print(f"iteration = {result.metrics['training_iteration']}")

train acc = 0.6153
valid acc = 0.6117
iteration = 101


## Shutdown Ray runtime

In [16]:
ray.shutdown()

Disconnect the worker and terminate processes started by `ray.init()`.

# Connect with the Ray community

You can learn and get more involved with the Ray community of developers and researchers:

* [**Ray documentation**](https://docs.ray.io/en/latest)

* [**Official Ray site**](https://www.ray.io/)  
Browse the ecosystem and use this site as a hub to get the information that you need to get going and building with Ray.

* [**Join the community on Slack**](https://forms.gle/9TSdDYUgxYs8SA9e8)  
Find friends to discuss your new learnings in our Slack space.

* [**Use the discussion board**](https://discuss.ray.io/)  
Ask questions, follow topics, and view announcements on this community forum.

* [**Join a meetup group**](https://www.meetup.com/Bay-Area-Ray-Meetup/)  
Tune in on meet-ups to listen to compelling talks, get to know other users, and meet the team behind Ray.

* [**Open an issue**](https://github.com/ray-project/ray/issues/new/choose)  
Ray is constantly evolving to improve developer experience. Submit feature requests, bug-reports, and get help via GitHub issues.

* [**Become a Ray contributor**](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html)  
We welcome community contributions to improve our documentation and Ray framework.

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Generic/ray_logo.png" width="20%" loading="lazy">