-
Notifications
You must be signed in to change notification settings - Fork 55
no-jira: add remote offline batch inference with vllm example #848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kryanbeane
wants to merge
2
commits into
project-codeflare:main
Choose a base branch
from
kryanbeane:remote-offline-bi
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
227 changes: 227 additions & 0 deletions
227
demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Remote Offline Batch Inference with Ray Data & vLLM Example\n", | ||
"\n", | ||
"This notebook presumes:\n", | ||
"- You are working on Openshift AI\n", | ||
"- You have a Ray Cluster URL given to you to run workloads on\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 4, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from codeflare_sdk import RayJobClient\n", | ||
"\n", | ||
"# Setup Authentication Configuration\n", | ||
"auth_token = \"XXXX\"\n", | ||
"header = {\"Authorization\": f\"Bearer {auth_token}\"}" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 6, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Gather the dashboard URL (provided by the creator of the RayCluster)\n", | ||
"ray_dashboard = \"XXXX\" # Replace with the Ray dashboard URL\n", | ||
"\n", | ||
"# Initialize the RayJobClient\n", | ||
"client = RayJobClient(address=ray_dashboard, headers=header, verify=True)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### Simple Example Explanation\n", | ||
"\n", | ||
"With the RayJobClient instantiated, lets run some batch inference. The following code is stored in `simple_batch_inf.py`, and is used as the entrypoint for the RayJob.\n", | ||
"\n", | ||
"What this processor configuration does:\n", | ||
"- Set up a vLLM engine with your model\n", | ||
"- Configure some settings for GPU processing\n", | ||
"- Defines batch processing parameters (8 requests per batch, 2 GPU workers)\n", | ||
"\n", | ||
"```python\n", | ||
"import ray\n", | ||
"from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig\n", | ||
"\n", | ||
"processor_config = vLLMEngineProcessorConfig(\n", | ||
" model_source=\"replace-me\",\n", | ||
" engine_kwargs=dict(\n", | ||
" enable_lora=False,\n", | ||
" dtype=\"half\",\n", | ||
" max_model_len=1024,\n", | ||
" ),\n", | ||
" batch_size=8,\n", | ||
" concurrency=2,\n", | ||
")\n", | ||
"```" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"With the config defined, we can instantiate the processor. This enables batch inference by processing multiple requests through the vLLM engine, with two key steps:\n", | ||
"- **Preprocess**: Converts each row into a structured chat format with system instructions and user queries, preparing the input for the LLM\n", | ||
"- **Postprocess**: Extracts only the generated text from the model response, cleaning up the output\n", | ||
"\n", | ||
"The processor defines the pipeline that will be applied to each row in the dataset, enabling efficient batch processing through Ray Data's distributed execution framework.\n", | ||
"\n", | ||
"```python\n", | ||
"processor = build_llm_processor(\n", | ||
" processor_config,\n", | ||
" preprocess=lambda row: dict(\n", | ||
" messages=[\n", | ||
" {\n", | ||
" \"role\": \"system\",\n", | ||
" \"content\": \"You are a calculator. Please only output the answer \"\n", | ||
" \"of the given equation.\",\n", | ||
" },\n", | ||
" {\"role\": \"user\", \"content\": f\"{row['id']} ** 3 = ?\"},\n", | ||
" ],\n", | ||
" sampling_params=dict(\n", | ||
" temperature=0.3,\n", | ||
" max_tokens=20,\n", | ||
" detokenize=False,\n", | ||
" ),\n", | ||
" ),\n", | ||
" postprocess=lambda row: {\n", | ||
" \"resp\": row[\"generated_text\"],\n", | ||
" },\n", | ||
")\n", | ||
"```" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Now we can run the batch inference pipeline on our data, it will:\n", | ||
"- In the background, the processor will download the model into memory where vLLM serves it locally (on Ray Cluster) for use in inference\n", | ||
"- Generate a sample Ray Dataset with 32 rows (0-31) to process\n", | ||
"- Run the LLM processor on the dataset, triggering the preprocessing, inference, and postprocessing steps\n", | ||
"- Execute the lazy pipeline and loads results into memory\n", | ||
"- Iterate through all outputs and print each response \n", | ||
"\n", | ||
"```python\n", | ||
"ds = ray.data.range(30)\n", | ||
"ds = processor(ds)\n", | ||
"ds = ds.materialize()\n", | ||
"\n", | ||
"for out in ds.take_all():\n", | ||
" print(out)\n", | ||
" print(\"==========\")\n", | ||
"```\n", | ||
"\n", | ||
"### Job Submission\n", | ||
"\n", | ||
"Now we can submit this job against the Ray Cluster using the `RayJobClient` from earlier " | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stderr", | ||
"output_type": "stream", | ||
"text": [ | ||
"2025-06-23 16:56:53,008\tINFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_d3badb03645503e8.zip.\n", | ||
"2025-06-23 16:56:53,010\tINFO packaging.py:576 -- Creating a file package for local module './'.\n" | ||
] | ||
}, | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"raysubmit_AJhmqzWsvHu6SqZD successfully submitted\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"entrypoint_command = \"python simple_batch_inf.py\"\n", | ||
"\n", | ||
"submission_id = client.submit_job(\n", | ||
" entrypoint=entrypoint_command,\n", | ||
" runtime_env={\"working_dir\": \"./\", \"pip\": \"requirements.txt\"},\n", | ||
")\n", | ||
"\n", | ||
"print(submission_id + \" successfully submitted\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 12, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"data": { | ||
"text/plain": [ | ||
"<JobStatus.PENDING: 'PENDING'>" | ||
] | ||
}, | ||
"execution_count": 12, | ||
"metadata": {}, | ||
"output_type": "execute_result" | ||
} | ||
], | ||
"source": [ | ||
"# Get the job's status\n", | ||
"client.get_job_status(submission_id)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"data": { | ||
"text/plain": [ | ||
"'2025-06-23 15:47:22,272\\tINFO job_manager.py:531 -- Runtime env is setting up.\\nINFO 06-23 15:53:36 [__init__.py:244] Automatically detected platform cuda.\\n2025-06-23 15:53:54,307\\tINFO worker.py:1554 -- Using address 10.128.2.45:6379 set in the environment variable RAY_ADDRESS\\n2025-06-23 15:53:54,308\\tINFO worker.py:1694 -- Connecting to existing Ray cluster at address: 10.128.2.45:6379...\\n2025-06-23 15:53:54,406\\tINFO worker.py:1879 -- Connected to Ray cluster. View the dashboard at \\x1b[1m\\x1b[32mhttp://10.128.2.45:8265 \\x1b[39m\\x1b[22m\\nNo cloud storage mirror configured\\n2025-06-23 15:53:57,501\\tWARNING util.py:589 -- The argument ``compute`` is deprecated in Ray 2.9. Please specify argument ``concurrency`` instead. For more information, see https://docs.ray.io/en/master/data/transforming-data.html#stateful-transforms.\\n2025-06-23 15:53:58,095\\tINFO logging.py:290 -- Registered dataset logger for dataset dataset_33_0\\n2025-06-23 15:53:59,702\\tINFO streaming_executor.py:117 -- Starting execution of Dataset dataset_33_0. Full logs are in /tmp/ray/session_2025-06-23_10-53-41_019757_1/logs/ray-data\\n2025-06-23 15:53:59,702\\tINFO streaming_executor.py:118 -- Execution plan of Dataset dataset_33_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(_preprocess)] -> ActorPoolMapOperator[MapBatches(ChatTemplateUDF)] -> ActorPoolMapOperator[MapBatches(TokenizeUDF)] -> ActorPoolMapOperator[MapBatches(vLLMEngineStageUDF)] -> ActorPoolMapOperator[MapBatches(DetokenizeUDF)] -> TaskPoolMapOperator[Map(_postprocess)]\\n\\nRunning 0: 0.00 row [00:00, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m [2025-06-23 15:54:00,800 E 829 829] (raylet) node_manager.cc:3287: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: b72a45799ac9496bf52347fb9f9ef218722683d7bd8dd14702e821f0, IP: 10.128.2.45) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.128.2.45`\\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m \\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.\\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m \\n\\nRunning 0: 0.00 row [01:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m [2025-06-23 15:55:00,824 E 829 829] (raylet) node_manager.cc:3287: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: b72a45799ac9496bf52347fb9f9ef218722683d7bd8dd14702e821f0, IP: 10.128.2.45) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.128.2.45`\\n\\nRunning 0: 0.00 row [01:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.\\n\\nRunning 0: 0.00 row [01:01, ? row/s]'" | ||
kryanbeane marked this conversation as resolved.
Show resolved
Hide resolved
|
||
] | ||
}, | ||
"execution_count": 15, | ||
"metadata": {}, | ||
"output_type": "execute_result" | ||
} | ||
], | ||
"source": [ | ||
"# Get the job's logs\n", | ||
"client.get_job_logs(submission_id)" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": ".venv", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.11.12" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |
4 changes: 4 additions & 0 deletions
4
demo-notebooks/additional-demos/batch-inference/requirements.txt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
vllm | ||
transformers | ||
triton>=2.0.0 | ||
torch>=2.0.0 |
62 changes: 62 additions & 0 deletions
62
demo-notebooks/additional-demos/batch-inference/simple_batch_inf.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import ray | ||
from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig | ||
|
||
|
||
# 1. Construct a vLLM processor config. | ||
processor_config = vLLMEngineProcessorConfig( | ||
# The base model. | ||
model_source="unsloth/Llama-3.2-1B-Instruct", | ||
# vLLM engine config. | ||
engine_kwargs=dict( | ||
enable_lora=False, | ||
# # Older GPUs (e.g. T4) don't support bfloat16. You should remove | ||
# # this line if you're using later GPUs. | ||
dtype="half", | ||
# Reduce the model length to fit small GPUs. You should remove | ||
# this line if you're using large GPUs. | ||
max_model_len=1024, | ||
), | ||
# The batch size used in Ray Data. | ||
batch_size=8, | ||
# Use one GPU in this example. | ||
concurrency=1, | ||
# If you save the LoRA adapter in S3, you can set the following path. | ||
# dynamic_lora_loading_path="s3://your-lora-bucket/", | ||
) | ||
|
||
# 2. Construct a processor using the processor config. | ||
processor = build_llm_processor( | ||
processor_config, | ||
preprocess=lambda row: dict( | ||
# Remove the LoRA model specification | ||
messages=[ | ||
{ | ||
"role": "system", | ||
"content": "You are a calculator. Please only output the answer " | ||
"of the given equation.", | ||
}, | ||
{"role": "user", "content": f"{row['id']} ** 3 = ?"}, | ||
], | ||
sampling_params=dict( | ||
temperature=0.3, | ||
max_tokens=20, | ||
detokenize=False, | ||
), | ||
), | ||
postprocess=lambda row: { | ||
"resp": row["generated_text"], | ||
}, | ||
) | ||
|
||
# 3. Synthesize a dataset with 32 rows. | ||
ds = ray.data.range(32) | ||
# 4. Apply the processor to the dataset. Note that this line won't kick off | ||
# anything because processor is execution lazily. | ||
ds = processor(ds) | ||
# Materialization kicks off the pipeline execution. | ||
ds = ds.materialize() | ||
|
||
# 5. Print all outputs. | ||
for out in ds.take_all(): | ||
print(out) | ||
print("==========") |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.