# Remote Offline Batch Inference with Ray Data & vLLM Example

This notebook presumes:
- You are working on Openshift AI
- You have a Ray Cluster URL given to you to run workloads on


In [4]:
from codeflare_sdk import RayJobClient

# Setup Authentication Configuration
auth_token = "XXXX"
header = {"Authorization": f"Bearer {auth_token}"}

In [6]:
# Gather the dashboard URL (provided by the creator of the RayCluster)
ray_dashboard = "XXXX"  # Replace with the Ray dashboard URL

# Initialize the RayJobClient
client = RayJobClient(address=ray_dashboard, headers=header, verify=True)

### Simple Example Explanation

With the RayJobClient instantiated, lets run some batch inference. The following code is stored in `simple_batch_inf.py`, and is used as the entrypoint for the RayJob.

What this processor configuration does:
- Set up a vLLM engine with your model
- Configure some settings for GPU processing
- Defines batch processing parameters (8 requests per batch, 2 GPU workers)

```python
import ray
from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig

processor_config = vLLMEngineProcessorConfig(
    model_source="replace-me",
    engine_kwargs=dict(
        enable_lora=False,
        dtype="half",
        max_model_len=1024,
    ),
    batch_size=8,
    concurrency=2,
)
```

With the config defined, we can instantiate the processor. This enables batch inference by processing multiple requests through the vLLM engine, with two key steps:
- **Preprocess**: Converts each row into a structured chat format with system instructions and user queries, preparing the input for the LLM
- **Postprocess**: Extracts only the generated text from the model response, cleaning up the output

The processor defines the pipeline that will be applied to each row in the dataset, enabling efficient batch processing through Ray Data's distributed execution framework.

```python
processor = build_llm_processor(
    processor_config,
    preprocess=lambda row: dict(
        messages=[
            {
                "role": "system",
                "content": "You are a calculator. Please only output the answer "
                "of the given equation.",
            },
            {"role": "user", "content": f"{row['id']} ** 3 = ?"},
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=20,
            detokenize=False,
        ),
    ),
    postprocess=lambda row: {
        "resp": row["generated_text"],
    },
)
```

Now we can run the batch inference pipeline on our data, it will:
- In the background, the processor will download the model into memory where vLLM serves it locally (on Ray Cluster) for use in inference
- Generate a sample Ray Dataset with 32 rows (0-31) to process
- Run the LLM processor on the dataset, triggering the preprocessing, inference, and postprocessing steps
- Execute the lazy pipeline and loads results into memory
- Iterate through all outputs and print each response 

```python
ds = ray.data.range(30)
ds = processor(ds)
ds = ds.materialize()

for out in ds.take_all():
    print(out)
    print("==========")
```

### Job Submission

Now we can submit this job against the Ray Cluster using the `RayJobClient` from earlier 

In [None]:
entrypoint_command = "python simple_batch_inf.py"

submission_id = client.submit_job(
    entrypoint=entrypoint_command,
    runtime_env={"working_dir": "./", "pip": "requirements.txt"},
)

print(submission_id + " successfully submitted")

2025-06-23 16:56:53,008	INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_d3badb03645503e8.zip.
2025-06-23 16:56:53,010	INFO packaging.py:576 -- Creating a file package for local module './'.


raysubmit_AJhmqzWsvHu6SqZD successfully submitted


In [12]:
# Get the job's status
client.get_job_status(submission_id)

<JobStatus.PENDING: 'PENDING'>

In [None]:
# Get the job's logs
client.get_job_logs(submission_id)

