In this notebook, we will go over how to leverage the SDK to directly work interactively with a Ray Cluster during development.

In [None]:
# Import pieces from codeflare-sdk
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication

In [None]:
# Create authentication object for user permissions
# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config
# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually
auth = TokenAuthentication(
    token = "XXXXX",
    server = "XXXXX",
    skip_tls=False
)
auth.login()

Once again, let's start by running through the same cluster setup as before:

NOTE: The default images used by the CodeFlare SDK for creating a RayCluster resource depend on the installed Python version:

- For Python 3.9: 'quay.io/modh/ray:2.35.0-py39-cu121'
- For Python 3.11: 'quay.io/modh/ray:2.35.0-py311-cu121'

If you prefer to use a custom Ray image that better suits your needs, you can specify it in the image field to override the default.

In [None]:
# Create and configure our cluster object
# The SDK will try to find the name of your default local queue based on the annotation "kueue.x-k8s.io/default-queue": "true" unless you specify the local queue manually below
cluster_name = "interactivetest"
cluster = Cluster(ClusterConfiguration(
    name=cluster_name,
    head_cpu_requests=1,
    head_cpu_limits=1,
    head_memory_requests=6,
    head_memory_limits=6,
    head_extended_resource_requests={'nvidia.com/gpu':1}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests
    worker_extended_resource_requests={'nvidia.com/gpu':1},
    num_workers=2,
    worker_cpu_requests='250m',
    worker_cpu_limits=1,
    worker_memory_requests=4,
    worker_memory_limits=6,
    # image="", # Optional Field 
    write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources 
    # local_queue="local-queue-name" # Specify the local queue manually
))

In [None]:
# Bring up the cluster
cluster.up()
cluster.wait_ready()

In [None]:
cluster.details()

This time we will demonstrate another potential method of use: working with the Ray cluster interactively.

Using the SDK, we can get both the Ray cluster URI and dashboard URI:

In [None]:
ray_dashboard_uri = cluster.cluster_dashboard_uri()
ray_cluster_uri = cluster.cluster_uri()
print(ray_dashboard_uri)
print(ray_cluster_uri)

Now we can connect directly to our Ray cluster via the Ray python client:

In [None]:
from codeflare_sdk import generate_cert
# Create required TLS cert and export the environment variables to enable TLS
generate_cert.generate_tls_cert(cluster_name, cluster.config.namespace)
generate_cert.export_env(cluster_name, cluster.config.namespace)

In [None]:
# before proceeding make sure the cluster exists and the uri is not empty
assert ray_cluster_uri, "Ray cluster needs to be started and set before proceeding"

import ray

# reset the ray context in case there's already one. 
ray.shutdown()
# establish connection to ray cluster

# install additional libraries that will be required for model training
runtime_env = {"pip": ["transformers==4.41.2", "datasets==2.17.0", "accelerate==0.31.0", "scikit-learn==1.5.0"]}
# NOTE: This will work for in-cluster notebook servers (RHODS/ODH), but not for local machines
# To see how to connect from your laptop, go to demo-notebooks/additional-demos/local_interactive.ipynb
ray.init(address=ray_cluster_uri, runtime_env=runtime_env)

print("Ray cluster is up and running: ", ray.is_initialized())

Now that we are connected (and have passed in some package requirements), let's try writing some training code:

In [None]:
@ray.remote
def train_fn():
    import os
    import numpy as np
    from datasets import load_dataset, load_metric
    import transformers
    from transformers import (
        Trainer,
        TrainingArguments,
        AutoTokenizer,
        AutoModelForSequenceClassification,
    )
    import ray.train.huggingface.transformers
    from ray.train import ScalingConfig
    from ray.train.torch import TorchTrainer

    # When running in a multi-node cluster you will need persistent storage that is accessible across all worker nodes. 
    # See www.github.com/project-codeflare/codeflare-sdk/tree/main/docs/s3-compatible-storage.md for more information.
    
    def train_func():
        # Datasets
        dataset = load_dataset("imdb")
        tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

        def tokenize_function(examples):
            return tokenizer(examples["text"], padding="max_length", truncation=True)

        small_train_dataset = (
            dataset["train"].select(range(100)).map(tokenize_function, batched=True)
        )
        small_eval_dataset = (
            dataset["test"].select(range(100)).map(tokenize_function, batched=True)
        )

        # Model
        model = AutoModelForSequenceClassification.from_pretrained(
            "distilbert-base-uncased", num_labels=2
        )

        def compute_metrics(eval_pred):
            metric = load_metric("accuracy")
            logits, labels = eval_pred
            predictions = np.argmax(logits, axis=-1)
            return metric.compute(predictions=predictions, references=labels)

        # Hugging Face Trainer
        training_args = TrainingArguments(
            output_dir="test_trainer",
            evaluation_strategy="epoch",
            save_strategy="epoch",
            report_to="none",
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=small_train_dataset,
            eval_dataset=small_eval_dataset,
            compute_metrics=compute_metrics,
        )


        callback = ray.train.huggingface.transformers.RayTrainReportCallback()
        trainer.add_callback(callback)

        trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)

        trainer.train()


    ray_trainer = TorchTrainer(
        train_func,
        scaling_config=ScalingConfig(
            # num_workers = number of worker nodes with the ray head node included
            num_workers=3,
            use_gpu=True,
            resources_per_worker={
                "CPU": 1,
            },
            trainer_resources={
                "CPU": 0,
            }
        )
        # Configure persistent storage that is accessible across 
        # all worker nodes.
        # Uncomment and update the RunConfig below to include your storage details.
        # run_config=ray.train.RunConfig(storage_path="storage path"),
    )
    result: ray.train.Result = ray_trainer.fit()

Once we want to test our code out, we can run the training function we defined above remotely on our Ray cluster:

In [None]:
#call the above cell as a remote ray function
ray.get(train_fn.remote())

Once complete, we can bring our Ray cluster down and clean up:

In [None]:
cluster.down()

In [None]:
auth.logout()