# In this demo we will go over the basics of the Ray Job Submission Client in the Codeflare SDK

# Install the desired version of codeflare SDK

In [None]:
%%capture
!pip install codeflare_sdk==0.19.1

## Import the desired packages

In [187]:
# Import pieces from codeflare-sdk
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication

### Create authentication object for user permissions
### IF unused, SDK will automatically check for default kubeconfig, then in-cluster config
### KubeConfigFileAuthentication can also be used to specify kubeconfig path manually

In [None]:
auth = TokenAuthentication(
    token = "sha256~IiWS10rBkChW_p5nJ8GIf8Q4C2RFF47ZacqzDPJb4e0",
    server = "https://api.cluster-7jljr.7jljr.sandbox586.opentlc.com:6443",
    skip_tls=False
)
auth.login()

#### Here, we want to define our cluster by specifying the resources we require for our batch workload. Below, we define our cluster object (which generates a corresponding RayCluster).

#### NOTE: 'quay.io/rhoai/ray:2.23.0-py39-cu121' is the default community image used by the CodeFlare SDK for creating a RayCluster resource. 
# If you have your own Ray image which suits your purposes, specify it in image field to override the default image.

#### Create and configure our cluster object
#### The SDK will try to find the name of your default local queue based on the annotation "kueue.x-k8s.io/default-queue": "true" unless you specify the local queue manually below

In [None]:
cluster = Cluster(ClusterConfiguration(
    name='raytest', # name of the cluster
    namespace='vllm', # the namespace where the ray cluster will be created
    head_cpus=8, # the cpu which are assigned to ray head node
    head_memory=12, # the memory which are assigned to ray head node
    head_extended_resource_requests={'nvidia.com/gpu':0}, # For Ray Head node no need to assign the GPU unless you have specific need to provide GPU. Its a coordinator node if you assign the GPU the model will be put on the head Node
    worker_extended_resource_requests={'nvidia.com/gpu':1}, # For GPU enabled workloads set the worker_extended_resource_requests
    num_workers=3, # Max number of workers for this Ray Cluster.
    worker_cpu_requests='250m', # Ray Worker CPU Request
    worker_cpu_limits=10, # Ray Worker CPU Limits 
    worker_memory_requests=4, # Ray Worker Memory Request
    worker_memory_limits=16, # Ray Worker Memory Limits
    image="quay.io/project-codeflare/ray:latest-py39-cu118", # Optional Field if not mentioned then image quay.io/rhoai/ray:2.23.0-py39-cu121 shall be used
    write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources 
    local_queue="local-queue-test" # Specify the local queue manually
))

#### Initial the RAY cluster

In [207]:
# Bring up the cluster
cluster.up()

#### Wait for the RAY cluster to Start

In [None]:
cluster.wait_ready()

#### Details of RAY cluster

In [192]:
cluster.details()

RayCluster(name='raytest', status=<RayClusterStatus.READY: 'ready'>, head_cpus=8, head_mem='12G', workers=3, worker_mem_min='4G', worker_mem_max='16G', worker_cpu='250m', namespace='vllm', dashboard='https://ray-dashboard-raytest-vllm.apps.cluster-7jljr.7jljr.sandbox586.opentlc.com', worker_extended_resources={'nvidia.com/gpu': 1}, head_extended_resources={'nvidia.com/gpu': 0})

### Ray Job Submission

* Initialise the Cluster Job Client 
* Provide an entrypoint command directed to your job script
* Set up your runtime environment

In [193]:
# Initialize the Job Submission Client
"""
The SDK will automatically gather the dashboard address and authenticate using the Ray Job Submission Client
"""
client = cluster.job_client

### Submit the Job

In [None]:
# Submit an example mnist job using the Job Submission Client
submission_id = client.submit_job(
    entrypoint="python rayfinetunellama.py",
    runtime_env={"working_dir": "./","pip": "requirements.txt"},
)
print(submission_id)

### Logs for the JOB

In [None]:
# Get the job's logs
client.get_job_logs(submission_id)

### Status of the Jobs

In [None]:
# Get the job's status
client.get_job_status(submission_id)

### Job info

In [None]:
# Get job related info
client.get_job_info(submission_id)

### Get all the jobs

In [None]:
# List all existing jobs
client.list_jobs()

### Print all the logs

In [None]:
# Iterate through the logs of a job 
async for lines in client.tail_job_logs(submission_id):
    print(lines, end="")

### After the job is done the delete the job

In [None]:
# Delete a job
# Can run client.stop_job(submission_id) first if job is still running
client.delete_job(submission_id)

### Tear down the ray cluster

In [203]:
cluster.down()

### Logout with the OCP Cluster

In [None]:
auth.logout()