# Submitting Jobs to Kuberay - GPU

In this demo we will go over the basics of the Ray Job Submission Client in the SDK

In [13]:
# Import pieces from codeflare-sdk
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication

In [2]:
!pip list | grep codeflare


codeflare-sdk             0.16.1
codeflare-torchx          0.6.0.dev2

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Authenticate to the cluster either using the SDK or OpenShift console login

In [None]:
# Create authentication object for user permissions
# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config

# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually
# auth = TokenAuthentication(
#     token = "XXXXX",
#     server = "XXXXX",
#     skip_tls=False
# )
# auth.login()

# Paste in the oc login command from
# the OpenShift console "Copy login command" after the "!"
!oc login --token=sha256~XXXX --server=https://XXXX 

### Create Cluster

In [14]:
# Configuration of our Ray cluster
name = "raycluster-gpu"
namespace = !cat /var/run/secrets/kubernetes.io/serviceaccount/namespace
namespace = namespace[0]

# We can use the standard codeflare image or one of the newer Ray images
# or we can use one of the newer Ray images
ray_version = "2.33.0"
python_version = "py311"
cuda_version = "cu118"
image = f"docker.io/rayproject/ray:{ray_version}-{python_version}-{cuda_version}"
# image = "rayproject/ray-ml:2.23.0-py311-gpu"

# image = "quay.io/project-codeflare/ray:latest-py39-cu118"
# image = "rayproject/ray-ml:2.23.0-py311-gpu"
# image = "docker.io/rayproject/ray:2.23.0-py39-cu121"

print(name, namespace, image)

raycluster-gpu chase-dev docker.io/rayproject/ray:2.33.0-py311-cu118


In [15]:
!oc get localqueue

NAME                  CLUSTERQUEUE    PENDING WORKLOADS   ADMITTED WORKLOADS
local-queue-default   cluster-queue   0                   2


The SDK will try to find the name of your default local queue based on the annotation "kueue.x-k8s.io/default-queue": "true" unless you specify the local queue manually below


In [None]:
cluster = Cluster(ClusterConfiguration(
    name=name,
    namespace=namespace,
    head_gpus=1,
    num_gpus=1,
    num_workers=2,
    min_cpus=1,
    max_cpus=6,
    min_memory=8,
    max_memory=28,
    image=image,
    write_to_file=True, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources 
    # local_queue="local-queue-name" # Specify the local queue manually
))

In [None]:
import os
import yaml

file_path = os.path.expanduser(f"~/.codeflare/resources/{name}.yaml")

with open(file_path, "r") as file:
    try:
        mod_cluster = yaml.safe_load(file)
        # pprint(cluster)  # This will print the content of the YAML file as a dictionary
    except yaml.YAMLError as exc:
        print(exc)

# mod_cluster["spec"]["headGroupSpec"]["template"]["spec"]["tolerations"] = [{
#     "effect": "NoSchedule",
#     "key": "nvidia.com/gpu",
#     "operator": "Exists",
# }]

# mod_cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"]["tolerations"] = [{
#     "effect": "NoSchedule",
#     "key": "nvidia.com/gpu",
#     "operator": "Exists",
# }]


with open(file_path, "w") as file:
    yaml.dump(mod_cluster, file)


with open(file_path, "r") as file:
    try:
        check_cluster = yaml.safe_load(file)
        # print(check_cluster["spec"]["headGroupSpec"]["template"]["spec"]["tolerations"])
        # print(check_cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"]["tolerations"])
    except yaml.YAMLError as exc:
        print(exc)



In [None]:
# Bring up the cluster
cluster.up()
cluster.wait_ready()

### Alternatively, get a running cluster object

In [16]:
from codeflare_sdk import get_cluster
# 
cluster = get_cluster(name, namespace=namespace)

Yaml resources loaded for raycluster-gpu


In [17]:
cluster.details()

RayCluster(name='raycluster-gpu', status=<RayClusterStatus.READY: 'ready'>, head_cpus=2, head_mem='8G', head_gpu=0, workers=2, worker_mem_min='8G', worker_mem_max='28G', worker_cpu='1', worker_gpu=1, namespace='chase-dev', dashboard='https://ray-dashboard-raycluster-gpu-chase-dev.apps.dev.rhoai.rh-aiservices-bu.com')

### Upload data to S3


In [11]:
import sys
sys.path.append('./utils')

import utils.s3

utils.s3.upload_directory_to_s3("data", "data")
utils.s3.list_objects("")

data/train.csv -> data/train.csv
data/validate.csv -> data/validate.csv
data/card_transdata.csv -> data/card_transdata.csv
data/test.csv -> data/test.csv
data/card_transdata.csv
data/test.csv
data/train.csv
data/validate.csv
generated-images/000.png
models/Meta-Llama-3-8B/LICENSE
models/Meta-Llama-3-8B/README.md
models/Meta-Llama-3-8B/USE_POLICY.md
models/Meta-Llama-3-8B/config.json
models/Meta-Llama-3-8B/generation_config.json
models/Meta-Llama-3-8B/model-00001-of-00004.safetensors
models/Meta-Llama-3-8B/model-00002-of-00004.safetensors
models/Meta-Llama-3-8B/model-00003-of-00004.safetensors
models/Meta-Llama-3-8B/model-00004-of-00004.safetensors
models/Meta-Llama-3-8B/model.safetensors.index.json
models/Meta-Llama-3-8B/original/consolidated.00.pth
models/Meta-Llama-3-8B/original/params.json
models/Meta-Llama-3-8B/original/tokenizer.model
models/Meta-Llama-3-8B/special_tokens_map.json
models/Meta-Llama-3-8B/tokenizer.json
models/Meta-Llama-3-8B/tokenizer_config.json
models/Meta-Llama-

### Ray Job Submission

* Initialize the Cluster Job Client 
* Provide an entrypoint command directed to your job script
* Set up your [runtime environment](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#runtime-environments)

Some common runtime environment configurations include:

```python
runtime_env={
    "working_dir": "./", # relative path to files uploaded to the job
    "excludes": ["local_data/"], # directories and files to exclude from being uploaded to the job
    "pip": ["boto3", "botocore"], # can also be a string path to a requirements.txt file
    "env_vars": {
        "MY_ENV_VAR": "MY_ENV_VAR_VALUE",
        "MY_ENV_VAR_2": os.environ.get("MY_ENV_VAR_2"),
    },
}
```

In [18]:
# Initialize the Job Submission Client
"""
The SDK will automatically gather the dashboard address and authenticate using the Ray Job Submission Client
"""
client = cluster.job_client

See if there are any existing jobs

In [19]:
# List all existing jobs
client.list_jobs()

[JobDetails(type=<JobType.SUBMISSION: 'SUBMISSION'>, job_id='07000000', submission_id='raysubmit_bp9anRkY8cLxFaQV', driver_info=DriverInfo(id='07000000', node_ip_address='10.128.42.23', pid='22072'), status=<JobStatus.FAILED: 'FAILED'>, entrypoint='python mnist_fashion_2.py', message='Job entrypoint command failed with exit code 1, last available logs (truncated to 20,000 chars):\n\nTraceback (most recent call last):\n  File "/tmp/ray/session_2024-08-02_10-07-26_937748_1/runtime_resources/working_dir_files/_ray_pkg_78756766d4a0f4f0/mnist_fashion_2.py", line 74, in <module>\n    result = trainer.fit()\n             ^^^^^^^^^^^^^\n  File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/train/base_trainer.py", line 638, in fit\n    raise TrainingFailedError(\nray.train.base_trainer.TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rathe

#### Some Sample Runtime Environments

In [38]:
import os

script = "train.py"
runtime_env = {
    "working_dir": "./ray-scripts",
    "excludes": [],
    "pip": "./ray-scripts/requirements.txt",
    "env_vars": {
        "HF_USER": os.environ.get("HF_USER"),
        "HF_TOKEN": os.environ.get("HF_TOKEN"),
        "AWS_ACCESS_KEY_ID": os.environ.get("AWS_ACCESS_KEY_ID"),
        "AWS_SECRET_ACCESS_KEY": os.environ.get("AWS_SECRET_ACCESS_KEY"),
        "AWS_S3_ENDPOINT": os.environ.get("AWS_S3_ENDPOINT"),
        "AWS_DEFAULT_REGION": os.environ.get("AWS_DEFAULT_REGION"),
        "AWS_S3_BUCKET": os.environ.get("AWS_S3_BUCKET")
    },
}

### Submit the configured job

In [39]:
submission_id = client.submit_job(
    entrypoint=f"python {script}",
    runtime_env=runtime_env,
)

print(submission_id)

2024-08-02 20:48:54,064	INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_f741ade6e8726522.zip.
2024-08-02 20:48:54,065	INFO packaging.py:518 -- Creating a file package for local directory './ray-scripts'.


raysubmit_GA2ShMGYXdwYWcKN


### Query Important Job Information

In [31]:
# Get the job's status
print(client.get_job_status(submission_id), "\n")

# Get job related info
print(client.get_job_info(submission_id), "\n")

# Get the job's logs
print(client.get_job_logs(submission_id))

FAILED 

type=<JobType.SUBMISSION: 'SUBMISSION'> job_id=None submission_id='raysubmit_aR6kczrLJFcWUF4Y' driver_info=None status=<JobStatus.FAILED: 'FAILED'> entrypoint='python train.py' message='Job entrypoint command failed with exit code 1, last available logs (truncated to 20,000 chars):\n2024-08-02 13:44:44,585\tINFO job_manager.py:531 -- Runtime env is setting up.\n  File "/tmp/ray/session_2024-08-02_10-07-26_937748_1/runtime_resources/working_dir_files/_ray_pkg_6af8ab6f7aa01120/train.py", line 95\n    storage_path=f"s3://{bucket_name}/ray/fraud-training/"        \n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nSyntaxError: invalid syntax. Perhaps you forgot a comma?\n' error_type=None start_time=1722631484583 end_time=1722631487661 metadata={} runtime_env={'working_dir': 'gcs://_ray_pkg_6af8ab6f7aa01120.zip', 'pip': {'packages': ['boto3', 'botocore', 'torch', 'torchvision'], 'pip_check': False}, 'env_vars': {'HF_USER': 'cfchase', 'HF_TOKEN': 'hf_lyiJfupCXnVAW

In [None]:
# Iterate through the logs of a job 
async for lines in client.tail_job_logs(submission_id):
    print(lines, end="")

### Delete a job

In [None]:
print(client.list_jobs())

In [None]:
client.stop_job(submission_id)

client.delete_job(submission_id)

In [None]:
print(client.list_jobs())

In [None]:
cluster.down()