# Test vLLM Streaming Pipeline (Single GPU)

Simple test to verify the streaming pipeline works with vLLM on a single GPU.

**Key steps:**
1. Download model files BEFORE starting Ray
2. Initialize Ray with proper runtime_env
3. Run the streaming pipeline

In [None]:
import time
import hashlib
import uuid
from pathlib import Path
from loguru import logger

In [None]:
# Configuration
MODEL_NAME = "Qwen/Qwen3-Omni-30B-A3B-Instruct"
MODELS_DIR = "/app/models"
NUM_TEST_FILES = 5
WARMUP_TIMEOUT = 300.0

## Step 1: Download Model Files (BEFORE Ray)

This downloads all model files to a local directory without loading into GPU memory.
vLLM will then load from this local path, which is much faster.

In [None]:
def download_model_for_vllm(model_name: str, models_dir: str) -> str:
    """Download model files for vLLM using huggingface_hub snapshot_download."""
    from huggingface_hub import snapshot_download

    model_cache_name = model_name.replace("/", "_")
    local_path = Path(models_dir) / model_cache_name

    if local_path.exists() and any(local_path.iterdir()):
        print(f"Model already downloaded at {local_path}")
        return str(local_path)

    print(f"Downloading model {model_name} to {local_path}...")
    print("This may take a while on first run...")
    local_path.mkdir(parents=True, exist_ok=True)

    snapshot_download(
        repo_id=model_name,
        local_dir=str(local_path),
        local_dir_use_symlinks=False,
    )

    print(f"Model downloaded successfully to {local_path}")
    return str(local_path)


# Download model BEFORE starting Ray
MODEL_PATH = download_model_for_vllm(MODEL_NAME, MODELS_DIR)
print(f"Using model from: {MODEL_PATH}")

## Step 2: Initialize Ray with runtime_env

In [None]:
import ray
from ray.util.queue import Queue

# Shutdown any existing Ray instance
ray.shutdown()

# Initialize Ray with runtime_env so workers can find 'src'
ray.init(
    ignore_reinit_error=True,
    runtime_env={
        "env_vars": {"PYTHONPATH": "/app"},
    },
)
print(f"Ray initialized: {ray.cluster_resources()}")

In [None]:
# Import pipeline components
from src.streaming_pipeline import (
    AgentRayComputeConfig,
    AgentStage,
    QueueStreamingDatasource,
    StreamingDatasourceConfig,
    StreamingPipeline,
)
from src.pipelines.instrument_detection.agents.audio_preprocessor import (
    AudioPreprocessorAgent,
)
from src.pipelines.instrument_detection.agents.instrument_detector import (
    InstrumentDetectorAgent,
)

In [None]:
# Find audio files
audio_dir = Path("../audio_files")
audio_files = list(audio_dir.glob("*.mp3"))[:NUM_TEST_FILES]
print(f"Found {len(audio_files)} audio files to test:")
for f in audio_files:
    print(f"  - {f.name}")

In [None]:
# Helper functions
def create_job_row(filepath: Path) -> dict:
    """Create a job row from an audio file."""
    audio_bytes = filepath.read_bytes()
    return {
        "job_id": f"job_{uuid.uuid4().hex[:8]}",
        "song_id": f"song_{uuid.uuid4().hex[:8]}",
        "song_hash": hashlib.sha256(audio_bytes).hexdigest()[:16],
        "filename": filepath.name,
        "audio_bytes": audio_bytes,
    }


def identity(x):
    return x

## Step 3: Create Pipeline with Local Model Path

In [None]:
# Create job queue and datasource
job_queue = Queue(maxsize=100)

datasource = QueueStreamingDatasource(
    queue=job_queue,
    item_to_row_fn=identity,
    config=StreamingDatasourceConfig(
        parallelism=1,
        batch_size=1,
        batch_timeout=0.5,
        poll_interval=0.1,
        max_items=NUM_TEST_FILES,
    ),
)
print("Datasource created")

In [None]:
# Create pipeline stages

# Stage 1: Audio Preprocessor (CPU)
preprocessor_stage = AgentStage(
    agent=AudioPreprocessorAgent(target_sr=16000),
    config=AgentRayComputeConfig(
        num_actors=2,
        batch_size=1,
        num_cpus=1.0,
        max_concurrency=1,
    ),
    name="AudioPreprocessor",
)

# Stage 2: Instrument Detector with vLLM (GPU)
# NOTE: Using MODEL_PATH (local path) instead of MODEL_NAME (HuggingFace repo)
detector_stage = AgentStage(
    agent=InstrumentDetectorAgent(
        model_name=MODEL_PATH,  # Use local path!
        use_vllm=True,
        tensor_parallel_size=1,
        gpu_memory_utilization=0.90,
        max_model_len=16384,
        max_num_seqs=4,
    ),
    config=AgentRayComputeConfig(
        num_actors=1,
        batch_size=1,
        num_gpus=1.0,
        max_concurrency=1,
    ),
    name="InstrumentDetector",
)

print(f"Stages created (detector using model from: {MODEL_PATH})")

In [None]:
# Create the pipeline
pipeline = StreamingPipeline(
    datasource=datasource,
    stages=[preprocessor_stage, detector_stage],
    name="vLLM_InstrumentDetection",
)
print("Pipeline created")

In [None]:
# Warmup function - uses a real audio file to warm up the pipeline actors
# This ensures vLLM loads the model ONCE, then the same actors process all real data
def get_warmup_data():
    """Create warmup data using a real audio file."""
    warmup_file = audio_files[0]
    audio_bytes = warmup_file.read_bytes()
    return [
        {
            "job_id": "warmup_001",
            "song_id": "warmup",
            "song_hash": "warmup",
            "filename": warmup_file.name,
            "audio_bytes": audio_bytes,
        }
    ]


print("Warmup function defined")

## Step 4: Warmup and Stream Results

Using `warmup_and_stream()` ensures a SINGLE `iter_batches()` call handles both warmup and real data.
This is critical for vLLM - the model loads once during warmup, then the same actors process real data.

In [None]:
# Get streaming iterator with warmup
# Warmup items are injected into the queue FIRST
print("Starting warmup_and_stream...")
print("(vLLM model will load on first batch - may take a few minutes)")
print("-" * 60)

streaming_iterator = pipeline.warmup_and_stream(
    warmup_data_fn=get_warmup_data,
    warmup_timeout=WARMUP_TIMEOUT,
    batch_size=1,
)

# NOW submit real jobs - they come AFTER warmup items in the queue
print(f"\nSubmitting {len(audio_files)} jobs...")
for i, audio_file in enumerate(audio_files):
    row = create_job_row(audio_file)
    job_queue.put(row)
    print(f"  [{i+1}/{len(audio_files)}] Submitted: {row['filename']}")
    time.sleep(0.1)

print(f"All {len(audio_files)} jobs submitted")

In [None]:
# Stream results - warmup items are automatically discarded by the generator
results = []
start_time = time.time()

print("\nStreaming results (warmup handled internally)...")
print("-" * 60)

for batch in streaming_iterator:
    if not batch:
        continue

    keys = list(batch.keys())
    if not keys:
        continue

    n_items = len(batch[keys[0]])
    for i in range(n_items):
        result = {k: batch[k][i] for k in keys}
        results.append(result)

        elapsed = time.time() - start_time

        if result.get("error"):
            print(f"[{elapsed:.1f}s] {result['filename']} -> ERROR: {result['error']}")
        else:
            instruments = result.get("instruments", [])
            print(f"[{elapsed:.1f}s] {result['filename']} -> {instruments}")

    if len(results) >= NUM_TEST_FILES:
        print("-" * 60)
        print(f"All {NUM_TEST_FILES} results received!")
        break

total_time = time.time() - start_time
print(f"\nTotal streaming time: {total_time:.1f}s")
print(f"Average per file: {total_time / len(results):.1f}s")

In [None]:
# Summary
print("\n" + "=" * 60)
print("RESULTS SUMMARY")
print("=" * 60)

successful = [r for r in results if not r.get("error")]
failed = [r for r in results if r.get("error")]

print(f"Total: {len(results)}")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")

print("\nDetailed results:")
for r in results:
    if r.get("error"):
        print(f"  - {r['filename']}: ERROR - {r['error']}")
    else:
        print(f"  - {r['filename']}: {r.get('instruments', [])}")

In [None]:
# Cleanup
pipeline.stop()
ray.shutdown()
print("Cleanup complete")