## Introduction

This example notebook reproduces the official Ray example - [Ray AIR XGBoostTrainer on Kubernetes](https://docs.ray.io/en/latest/cluster/kubernetes/examples/ml-example.html#) without deploying a Kubernetes Cluster. It demonstrates the seamless utilization of this component for launching Ray tasks in either a distributed or non-distributed manner. 

The serverless deployment of the Ray cluster brings significant advantages to users, eliminating the complexities of managing Kubernetes Clusters and leading to operational cost savings.Compared to a traditional Kubernetes cluster that runs continuously, incurring costs even during idle periods without active Ray tasks (such as training jobs or predictions), this implementation leverages the serverless nature of Vertex AI. As a result, users are charged only for the resources utilized during each task execution, ensuring a more efficient and cost-effective structure.

### Environment Setup
1. Go to the ray_cluster directory and install the dependencies required for running this notebook
2. Restart the notebook to see if the kernel is identified automatically. If not, run `python -m ipykernel install --user --name ray_cluster` and restart the notebook.
3. Choose the kernel installed with the name `ray_cluster`



In [1]:
# Import dependencies
from kfp.components import load_component_from_file
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from kfp import compiler
from kfp.dsl import pipeline

In [2]:
def trigger_pipeline(
    project_id: str,
    project_location: str,
    staging_bucket: str,
    vertex_pipe_sa: str,
    pipeline_func_name: str,
    parameter_values: dict = None,
    enable_cache=False
):
    """Compile and submit a pipeline job to Vertex

    Args:
        project_id (str): GCP project ID running the pipeline
        project_location (str): GCP project location where pipeline runs
        staging_bucket (str): GCS bucket to host pipeline files
        vertex_pipe_sa (str): Vertex service account to run the pipeline
        pipeline_func_name (str): The name of pipeline function
        parameter_values (dict, optional): The parameters passed to the pipeline function. Defaults to None.
        enable_cache (bool, optional):Whether to enable cache or not. Defaults to False.
    """
    compiler.Compiler().compile(pipeline_func_name, package_path=f"{pipeline_func_name}.json")
    aiplatform.init(
        project=project_id,
        location=project_location,
        staging_bucket=staging_bucket,
    )

    job = pipeline_jobs.PipelineJob(
        display_name="dummy_pipeline",
        template_path=f"{pipeline_func_name}.json",
        parameter_values=parameter_values,
        enable_caching=enable_cache,
    )

    job.submit(
        service_account=vertex_pipe_sa,
    )

In [3]:
# Setup variables
project_id = "" # GCP project id
project_location = "" # GCP project location
pipeline_staging_path = ""  # Vertex pipeline staging path
vertex_pipe_sa = "" # Vertex pipeline service account

ray_train_bucket = "" # GCS bucket to host ray model
smoke_test = " --smoke-test --disable-check"
batch_mark_100G = " --size 100G --disable-check"
ray_script= ( 
    # Clone ray. If ray is already present, don't clone again.
    "git clone --branch ray-2.6.3 https://github.com/ray-project/ray || true;"
    # Run the benchmark.
    "cd ray/release/air_tests/air_benchmarks/workloads;"
    f"sed -i 's#/mnt/cluster_storage#{ray_train_bucket}#g' xgboost_benchmark.py;"
    "cat xgboost_benchmark.py;"
    "python xgboost_benchmark.py"
)

In [4]:
# Import the Ray component
# from datatonic_pipeline_components.ray_cluster import ray_cluster
ray_cluster = load_component_from_file("../ray_cluster/component.yaml")

### Case 1: Launch a non-distributed training task

The following example indicates how to use this component to run a non-distributed task.

In [5]:
@pipeline(name='ray-non-dist-example')
def smoke_test_pipeline():
    ray_cluster(
        ray_task_cmd=ray_script+smoke_test,
        node_config_cmd="pip install xgboost xgboost_ray"
    ).set_display_name("ray_smoke_test")

In [6]:
trigger_pipeline(
    project_id=project_id,
    project_location=project_location,
    staging_bucket=pipeline_staging_path,
    vertex_pipe_sa=vertex_pipe_sa,
    pipeline_func_name=smoke_test_pipeline,
)

#### Results
The [job](https://console.cloud.google.com/vertex-ai/locations/europe-west2/pipelines/runs/ray-non-dist-example-20230821134932?project=dt-pc-dev) succeed with the following metrics

- training_time: 29.78 seconds
- prediction_time: 19.95 seconds

### Case 2: Launch a distirbuted training job

The following example shows how to leverage this component to run a distributed training job. Firstly, create a custom training job from this component, where you can specify the machine types, replicas and other computational resource parameters. The output is another component with distributed spec configured. Sebsequently, embed the new component in a vertex pipeline definition. 

In [7]:
from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_from_component

In [8]:
dist_ray = create_custom_training_job_from_component(
    component_spec=ray_cluster,
    display_name="ray_cluster",
    machine_type="e2-standard-16",
    replica_count=10)

In [9]:
@pipeline(name='ray-dist-example-100g')
def ray_training_100g():
    dist_ray(
        project=project_id,
        location=project_location,
        ray_task_cmd=ray_script+batch_mark_100G,
        node_config_cmd="pip install xgboost xgboost_ray"
    ).set_display_name("ray_100G")

In [10]:
trigger_pipeline(
    project_id=project_id,
    project_location=project_location,
    staging_bucket=pipeline_staging_path,
    vertex_pipe_sa=vertex_pipe_sa,
    pipeline_func_name=ray_training_100g,
)

#### Results
The [job](https://console.cloud.google.com/vertex-ai/locations/europe-west2/pipelines/runs/ray-dist-example-100g-20230807152129?project=dt-pc-dev) succeed with the following metrics

- training_time: 687.145 seconds
- prediction_time: 709.824 seconds