<a href="https://colab.research.google.com/github/kd303/data_engineering/blob/main/collab_test/Ray_vLLMCustomTransform.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Notebook demonstrate writing custom transform using IBM's DPK, it uses Ray. This transform makes sure it handles a lot of use-cases for data parallel, tensor parallel, pipeline parallel and distributed inferencing use-cases for various use cases of Synthetic data generation.

In [1]:
!git clone https://github.com/kd303/data_engineering.git

Cloning into 'data_engineering'...
remote: Enumerating objects: 65, done.[K
remote: Counting objects: 100% (65/65), done.[K
remote: Compressing objects: 100% (55/55), done.[K
remote: Total 65 (delta 16), reused 55 (delta 9), pack-reused 0 (from 0)[K
Receiving objects: 100% (65/65), 46.54 KiB | 15.51 MiB/s, done.
Resolving deltas: 100% (16/16), done.


In [1]:
!ls /content/data_engineering/dpk_extn/

__init__.py  README.md	sglang_transform.py  vllm_transform.py


### All required installation, since this is running in collab, Ray will use current collab environment

In [None]:
!pip uninstall numpy -y
!pip install numpy==2.0.0

In [None]:
!pip install vllm ray[default] data-prep-toolkit-transforms jinja2 pandas pyarrow huggingface_hub

#### Setting the path for the transforms, just to be on safer side, howevver this does not work for Ray, as Ray would create a "virtual environment" so this path has to be passed as ``PYTHONPATH`` in the worker arguments.

In [1]:
import sys
import os

sys.path.append("/content/data_engineering")

In [2]:
sys.path

['/content',
 '/env/python',
 '/usr/lib/python312.zip',
 '/usr/lib/python3.12',
 '/usr/lib/python3.12/lib-dynload',
 '',
 '/usr/local/lib/python3.12/dist-packages',
 '/usr/local/lib/python3.12/dist-packages/nvidia_cutlass_dsl/python_packages',
 '/usr/lib/python3/dist-packages',
 '/usr/local/lib/python3.12/dist-packages/IPython/extensions',
 '/root/.ipython',
 '/content/data_engineering']

In [3]:
from dpk_extn.vllm_transform import VLLMTransform

In [4]:
import pyarrow as pa

In [15]:
!nvidia-smi

Sat Jan  3 12:38:04 2026       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA A100-SXM4-80GB          Off |   00000000:00:05.0 Off |                    0 |
| N/A   34C    P0             53W /  400W |       5MiB /  81920MiB |      0%      Default |
|                                         |                        |             Disabled |
+-----------------------------------------+------------------------+----------------------+
                                                

In [7]:
import torch
torch.cuda.empty_cache()

## Just to test if everything is alright with local vLLM, this is for debugging purpose. Do not forget to release GPU memory at the end, else above code will not work

In [9]:
from vllm import LLM

## Step 1: Library Patching & Compatibility

We need to address some version mismatches between the installed `vllm` library and the `dpk_extn` code:
1.  **Remove `best_of`**: The `best_of` parameter was deprecated and removed in recent `vllm` versions, but the library code still uses it. We'll strip it out.
2.  **Force Memory Limit**: We'll inject a `gpu_memory_utilization` setting (0.5) directly into the library code to prevent Out-Of-Memory (OOM) errors on the shared GPU.

In [None]:
import ray
import gc
import torch
import pyarrow as pa
import os
import time
import pandas as pd
from dpk_extn.vllm_transform import VLLMTransform

print("üõ†Ô∏è Step 1: Patching library files for compatibility...")
# Remove incompatible 'best_of' argument
!sed -i '/best_of=/d' /content/data_engineering/dpk_extn/vllm_transform.py

# Inject gpu_memory_utilization limit (50%)
!sed -i '/self.llm = LLM/i \ \ \ \ \ \ \ \ vllm_args["gpu_memory_utilization"] = 0.5' /content/data_engineering/dpk_extn/vllm_transform.py

## Step 2: Aggressive Resource Cleanup

To ensure a clean start, we must aggressively clear GPU resources. Standard garbage collection often leaves "zombie" processes (like Ray actors) holding onto GPU memory. We use `pkill` to ensure they are terminated.

In [None]:
print("üßπ Step 2: Cleaning up GPU resources & lingering processes...")
if ray.is_initialized():
    ray.shutdown()

# Kill any Ray or vLLM processes
os.system("pkill -9 -f 'ray'")
os.system("pkill -9 -f 'vllm'")
time.sleep(3) # Wait for processes to exit

# Python garbage collection and CUDA cache clear
gc.collect()
torch.cuda.empty_cache()

## Step 3: Initialize Ray Runtime

We initialize Ray with a custom environment. We set `PYTHONPATH` to `/content/data_engineering` so that the worker processes can import the `dpk_extn` module without needing complex file transfers.

In [None]:
print("üöÄ Step 3: Initializing Ray with custom environment...")
ray.init(num_gpus=1, runtime_env={
    "env_vars": {
        "CUDA_VISIBLE_DEVICES": "0",
        "PYTHONPATH": "/content/data_engineering"
    }
})

## Step 4: Data Preparation

We create a simple PyArrow Table containing a few instructions to test the LLM inference.

In [None]:
print("üìä Step 4: Creating dummy data...")
data = [
    {"instruction": "What is the capital of France?", "id": 1},
    {"instruction": "Write a python function to add two numbers.", "id": 2},
    {"instruction": "Explain quantum computing in one sentence.", "id": 3}
]
table = pa.Table.from_pylist(data)

## Step 5: Configuration & Execution

We configure the `VLLMTransform`. Note that the memory utilization setting was handled in Step 1 via patching.

In [None]:
config = {
    "model": "Qwen/Qwen2.5-1.5B-Instruct",
    "tensor_parallel_size": 1,
    "max_replicas": 1,
    "batch_size": 10,
    "map_column": "instruction",
    "system_prompt": "You are a helpful assistant."
}

print("ü§ñ Step 5: Initializing and Running VLLM Transform (this may take a minute)...")
transform = VLLMTransform(config)
result_table = transform.transform(table)

## Step 6: Results

Finally, we convert the result to a Pandas DataFrame and display the generated completions.

In [None]:
print("‚úÖ Step 6: Inference Complete. Displaying results:")
df = result_table.to_pandas()
pd.set_option('display.max_colwidth', None)
display(df[["instruction", "completions"]])

# Optional: Shutdown Ray
ray.shutdown()

In [10]:
   # vLLM args
vllm_args = {
  "model": "Qwen/Qwen2.5-1.5B-Instruct"
}

# Initialize vLLM engine
llm = LLM(**vllm_args)

INFO 01-03 12:32:07 [utils.py:253] non-default args: {'disable_log_stats': True, 'model': 'Qwen/Qwen2.5-1.5B-Instruct'}
INFO 01-03 12:32:11 [model.py:514] Resolved architecture: Qwen2ForCausalLM
INFO 01-03 12:32:11 [model.py:1661] Using max model len 32768
INFO 01-03 12:32:11 [arg_utils.py:1394] Using ray runtime env (env vars redacted): {'env_vars': {'CUDA_VISIBLE_DEVICES': '***', 'PYTHONPATH': '***'}}
INFO 01-03 12:32:11 [scheduler.py:230] Chunked prefill is enabled with max_num_batched_tokens=8192.
INFO 01-03 12:33:48 [llm.py:360] Supported tasks: ['generate']


In [13]:
del llm

import gc
gc.collect()

771

In [11]:
output = llm.generate("you are a good boy and")



Adding requests:   0%|          | 0/1 [00:00<?, ?it/s]

Processed prompts:   0%|          | 0/1 [00:00<?, ?it/s, est. speed input: 0.00 toks/s, output: 0.00 toks/s]

In [12]:
output[0].outputs[0].text

" you will help me with my homework\nI'd be happy to help you with"