# Distributed PDF Processing with Ray Data, Docling, and RayCluster

This notebook demonstrates how to create a **RayCluster** and submit document processing jobs to it using the CodeFlare SDK. Unlike the RayJob approach, here you manage the cluster lifecycle yourself, which allows you to submit multiple jobs to the same cluster.

## What you will do

1. Authenticate to your OpenShift cluster
2. Create a Ray Data processing script that converts PDFs to structured JSON using Docling
3. Create a RayCluster with PVC mounts and configured resources
4. Submit a processing job to the running cluster
5. Monitor the job status and retrieve logs
6. Clean up resources when done

## When to use this approach

Use the **RayCluster** approach when you want a long-lived cluster to submit multiple jobs interactively or iteratively. You control when the cluster is created and torn down.

> **Note:** If you have a single batch job and want automatic cluster lifecycle management, see the companion notebook `ray-data-with-docling.ipynb`.

## Pipeline overview

The submitted script (`ray_data_process_async.py`) uses [Docling](https://github.com/DS4SD/docling) and [Ray Data](https://docs.ray.io/en/latest/data/data.html) to:

- Read PDF files from a Persistent Volume Claim (PVC)
- Convert PDFs to structured JSON using distributed actor pools
- Write results back to the PVC

## Import SDK components

We import the following components:

- **CodeFlare SDK**: `TokenAuthentication` for OpenShift authentication, `Cluster` to create and manage a RayCluster, and `ClusterConfiguration` to define cluster resources
- **Kubernetes client**: `V1VolumeMount`, `V1Volume`, and `V1PersistentVolumeClaimVolumeSource` to configure PVC mounts on the cluster pods

In [None]:
# Import pieces from codeflare-sdk
from codeflare_sdk import Cluster, ClusterConfiguration
from kubernetes import client as kclient
from kubernetes.client import (
    V1PersistentVolumeClaimVolumeSource,
    V1Volume,
    V1VolumeMount,
)
from kubernetes.client.rest import ApiException

## Authentication

Configure authentication to your OpenShift cluster. If you're running within an OpenShift environment with default kubeconfig, authentication may be automatic. Otherwise, provide your token and server URL.

> **Note**: Replace the token and server values with your own credentials.

In [None]:
TOKEN = "sha256~your-long-oauth-token-here"
API_URL = "https://api.your-cluster.example.com:6443"

In [None]:
# Run the below `oc login` command using your Token and Server URL. Ensure the command is prepended by `!` and not `%`. This will work when running both locally and within RHOAI.
!oc login --token={TOKEN} --server={API_URL}

## Create the processing script

The cell below writes `ray_data_process_async.py`, the script that will be submitted as a job to the RayCluster. This script is **optimized for maximum throughput**:

| Optimization | Implementation | Impact |
|---|---|---|
| **One-time model loading** | `DoclingProcessor` loads Docling models once per actor | Avoids repeated startup overhead per file |
| **Parallel PVC writes** | Each actor writes directly to PVC | N actors = N concurrent writes |
| **Streaming execution** | Read, process, and write stages overlap via `iter_batches()` | Keeps all actors busy |
| **Prefetching** | `prefetch_batches=2` keeps data ready for actors | Reduces I/O wait time |
| **Configurable scaling** | `MIN_ACTORS`/`MAX_ACTORS` environment variables | Tune for your cluster size |

In [None]:
%%writefile ray_data_process_async.py
"""
Ray Data Pipeline: PDF to Markdown Conversion ( Optimized)

This script uses an DoclingProcessor to overlap CPU-heavy document conversion
with I/O-heavy PVC (Persistent Volume Claim) writes. 

Key Optimizations:
1. Actor: Handlers yield control during I/O, allowing better pipelining.
2. ThreadPoolExecutor: Offloads synchronous Docling calls so the event loop doesn't hang.
3. Overlapped I/O: Writes Markdown and JSON to the PVC in parallel.
4. Ray Data Pipelining: max_tasks_in_flight_per_actor allows buffering work.
"""

import os
import ray
import time
from pathlib import Path
from typing import Dict, List
import tqdm

# --- PVC Configuration ---
PVC_MOUNT_PATH = os.environ.get("PVC_MOUNT_PATH", "/mnt/data")
INPUT_PATH = os.environ.get("INPUT_PATH", "input/pdfs/10000")
OUTPUT_PATH = os.environ.get("OUTPUT_PATH", "output")

# --- Performance Tuning Parameters ---
NUM_FILES = int(os.environ.get("NUM_FILES", "5000"))
MIN_ACTORS = int(os.environ.get("MIN_ACTORS", "8"))
MAX_ACTORS = int(os.environ.get("MAX_ACTORS", "8"))
CPUS_PER_ACTOR = int(os.environ.get("CPUS_PER_ACTOR", "8"))
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "1"))
TIMEOUT_SECONDS = 600

class DoclingProcessor:
    """
    Stateful Ray Actor that handles PDF conversion.
    """
    def __init__(self):
        import socket
        from docling.document_converter import DocumentConverter, PdfFormatOption
        from docling.datamodel.pipeline_options import PdfPipelineOptions, AcceleratorOptions
        from docling.datamodel.base_models import InputFormat
        from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend

        # Hardware-level thread tuning
        os.environ["OMP_NUM_THREADS"] = str(CPUS_PER_ACTOR)
        os.environ["MKL_NUM_THREADS"] = str(CPUS_PER_ACTOR)
        
        self.hostname = socket.gethostname()
        self.processed_count = 0

        # Initialize Docling Converter
        pipeline_options = PdfPipelineOptions()
        pipeline_options.do_ocr = False
        pipeline_options.do_table_structure = True
        pipeline_options.accelerator_options = AcceleratorOptions(
            num_threads=CPUS_PER_ACTOR,
            device="cpu"
        )
        
        pdf_format_config = PdfFormatOption(
            pipeline_options=pipeline_options,
        #    backend=PyPdfiumDocumentBackend
        )

        self.converter = DocumentConverter(
            format_options={
                InputFormat.PDF: pdf_format_config,
                           #InputFormat.Model: 
                           }
        )
        
        # Prepare output paths
        self.output_base = Path(PVC_MOUNT_PATH) / OUTPUT_PATH
        self.markdown_dir = self.output_base / "markdown"
        self.json_dir = self.output_base / "json"
        self.markdown_dir.mkdir(parents=True, exist_ok=True)
        self.json_dir.mkdir(parents=True, exist_ok=True)
        
        print(f"[{self.hostname}] üü¢ Actor initialized (CPUs: {CPUS_PER_ACTOR})")

    def __call__(self, batch: Dict[str, List]) -> Dict[str, List]:
        """
        Processes a batch of PDFs. 
        """
        from docling.datamodel.base_models import DocumentStream
        import io
        import orjson

        results = []

        for file_bytes, file_path in zip(batch["bytes"], batch["path"]):
            fname = os.path.basename(file_path)
            fname_base = fname.rsplit('.', 1)[0]
            status = "success"
            error_msg = ""
            page_count = 0
            docling_duration = 0.0
            output_size_bytes = 0

            try:
                if not file_bytes or len(file_bytes) < 100:
                    raise ValueError("File empty or too small")

                docling_start = time.time()
                
                # --- Step 1: Heavy CPU Conversion ---
                stream = DocumentStream(name=fname, stream=io.BytesIO(file_bytes))
                doc = self.converter.convert(stream)

                page_count = len(doc.document.pages) if hasattr(doc.document, 'pages') else 0
                #md_out = doc.document.export_to_markdown()
                json_out = doc.document.export_to_dict()
                
                docling_duration = time.time() - docling_start

                # --- Step 2: Parallel I/O Writes ---
                #md_path = self.markdown_dir / f"{fname_base}.md"
                json_path = self.json_dir / f"{fname_base}.json"

                json_bytes = orjson.dumps(json_out, option=orjson.OPT_INDENT_2)

                #self._write_with_retry(md_path, md_out)
                output_size_bytes = len(json_bytes)
                self._write_with_retry(json_path, json_bytes)

                
                self.processed_count += 1

            except Exception as e:
                status = "error"
                error_msg = str(e)[:150]

            results.append({
                "filename": str(fname),
                "status": str(status),
                "page_count": int(page_count),  
                "error": str(error_msg),
                
                # Timing
                "docling_duration_s": float(round(docling_duration, 2)),
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
                
                # Size metrics
                "file_size_mb": float(round(len(file_bytes) / (1024 * 1024), 3)),
                "output_size_kb": float(round(len(orjson.dumps(json_out)) / 1024, 2)) if status == "success" else 0.0,
                
                # Efficiency
                "pages_per_second": float(round(page_count / docling_duration, 2)) if docling_duration > 0 else 0.0,
                
                # Distribution tracking
                "actor_hostname": self.hostname,
            })

        return {"results": results}
    
    def _write_with_retry(self, path: Path, content: str, max_retries: int = 3):
        """Worker-local disk write with retry logic."""
        for attempt in range(max_retries):
            try:
                path.write_bytes(content)
                return
            except Exception:
                if attempt == max_retries - 1:
                    raise
                time.sleep(1)

def ray_data_process():
    input_full_path = os.path.join(PVC_MOUNT_PATH, INPUT_PATH)
    
    # 1. SET GLOBAL PARALLELISM BEFORE READING
    ctx = ray.data.DataContext.get_current()
    ctx.execution_options.preserve_order = False
    ctx.execution_options.actor_locality_enabled = True
    # Force the minimum number of blocks globally
    ctx.min_parallelism = 100 
    ctx.target_min_block_size = 1 * 1024 * 1024
    ctx.target_max_block_size = 2 * 1024 * 1024  # 2 MB
    ctx.target_shuffle_block_size = 1 * 1024 * 1024
    target_blocks = MAX_ACTORS * 4

    # 2. READ WITH EXPLICIT PARALLELISM
    # This forces the initial 'ReadBinary' stage to create 100+ blocks
    ds = ray.data.read_binary_files(
        input_full_path, 
        include_paths=True,
        override_num_blocks=target_blocks
    )
    
    ds = ds.filter(lambda row: row["path"].lower().endswith(".pdf")).limit(NUM_FILES)

    # 3. FORCE REPARTITION WITH SHUFFLE
    # Setting shuffle=True forces Ray to actually redistribute the data 
    # instead of just marking the metadata.
    # ds = ds.repartition(num_blocks=target_blocks, shuffle=False)

    # Stage 2: Map with Actor Pool
    results_ds = ds.map_batches(
        DoclingProcessor,
        compute=ray.data.ActorPoolStrategy(
            min_size=MIN_ACTORS,
            max_size=MAX_ACTORS,
        ),
        batch_size=BATCH_SIZE,
        num_cpus=CPUS_PER_ACTOR,
    )
    print(f"DEBUG: Target blocks: {target_blocks}, Max actors: {MAX_ACTORS}")    
    
    # Stage 3: Collect & Report
    success_count = 0
    error_count = 0
    total_pages = 0
    total_docling_time = 0.0
    start_time = time.time()
    # Additional tracking variables
    total_file_size_mb = 0.0
    total_output_size_kb = 0.0
    actor_distribution = {}
    file_durations = []
    errors_list = []
    
    for batch in results_ds.iter_batches(batch_size=10, prefetch_batches=2):
        for result_list in batch.get("results", []):
            items = result_list if isinstance(result_list, list) else [result_list]
            for item in items:
                if item["status"] == "success":
                    success_count += 1
                    file_durations.append((item["filename"], item["docling_duration_s"]))
                else:
                    error_count += 1
                    errors_list.append((item["filename"], item.get("error", "")))
                
                total_pages += item["page_count"]
                total_docling_time += item["docling_duration_s"]
                total_file_size_mb += item.get("file_size_mb", 0)
                total_output_size_kb += item.get("output_size_kb", 0)
                
                # Track actor distribution
                actor = item.get("actor_hostname", "unknown")
                actor_distribution[actor] = actor_distribution.get(actor, 0) + 1

    # Calculate derived metrics
    wall_clock = time.time() - start_time
    total_files = success_count + error_count
    error_rate = (error_count / total_files * 100) if total_files > 0 else 0
    avg_pages_per_file = total_pages / success_count if success_count > 0 else 0
    parallelization_efficiency = (total_docling_time / wall_clock / MAX_ACTORS * 100) if wall_clock > 0 else 0
    
    # Sort for fastest/slowest
    if file_durations:
        file_durations.sort(key=lambda x: x[1])
        fastest = file_durations[0]
        slowest = file_durations[-1]
    
    # Enhanced Report
    print("\n" + "=" * 70)
    print("PERFORMANCE REPORT")
    print("=" * 70)
    
    print("\n--- Results Summary ---")
    print(f"Total Files:    {total_files}")
    print(f"Success:        {success_count} ({100-error_rate:.1f}%)")
    print(f"Errors:         {error_count} ({error_rate:.1f}%)")
    print(f"Total Pages:    {total_pages}")
    print(f"Avg Pages/File: {avg_pages_per_file:.1f}")
    
    print("\n--- Data Volume ---")
    print(f"Input Size:     {total_file_size_mb:.2f} MB")
    print(f"Output Size:    {total_output_size_kb/1024:.2f} MB")
    print(f"Compression:    {total_output_size_kb*1024/total_file_size_mb/1024/1024:.1f}x" if total_file_size_mb > 0 else "N/A")
    
    print("\n--- Timing ---")
    print(f"Wall Clock:     {wall_clock:.2f}s")
    print(f"CPU Time (sum): {total_docling_time:.2f}s")
    
    print("\n--- Throughput ---")
    print(f"Files/second:   {success_count / wall_clock:.2f}")
    print(f"Pages/second:   {total_pages / wall_clock:.2f}")
    print(f"MB/second:      {total_file_size_mb / wall_clock:.2f}")
    
    print("\n--- Efficiency ---")
    print(f"Parallelization: {parallelization_efficiency:.1f}% of ideal")
    print(f"Speedup:         {total_docling_time / wall_clock:.1f}x vs sequential")
    
    if file_durations:
        print("\n--- Outliers ---")
        print(f"Fastest: {fastest[0]} ({fastest[1]:.2f}s)")
        print(f"Slowest: {slowest[0]} ({slowest[1]:.2f}s)")
    
    print("\n--- Actor Distribution ---")
    for actor, count in sorted(actor_distribution.items()):
        pct = count / total_files * 100
        print(f"  {actor}: {count} files ({pct:.1f}%)")
    
    if errors_list:
        print("\n--- Errors (first 5) ---")
        for fname, err in errors_list[:5]:
            print(f"  {fname}: {err[:60]}")
    
    print("=" * 70)
    
if __name__ == "__main__":
    ray.init(ignore_reinit_error=True)
    ray_data_process()

## PVC requirements

To use PVC storage with your RayCluster, you need to:

1. **Create a PVC** with `ReadWriteMany` (RWX) access mode
2. **Mount the PVC** on both the Ray head and worker pods
3. **Upload your PDF files** to the PVC before running the job

### Step 1: Create the PVC

See the Setup section in the [README](./README.md) for the PVC YAML configuration.

### Step 2: Verify the PVC

Run the cell below to verify the PVC exists and has the correct access mode.

In [None]:
# Cluster and namespace configuration
EXISTING_CLUSTER_NAME = "rayjobtest"
NAMESPACE = "ray-docling"

# PVC Configuration
PVC_NAME = "my-rwx-pvc2"
PVC_MOUNT_PATH = "/mnt/data"
INPUT_PATH = "input/pdfs/10000"
OUTPUT_PATH = "output"


def verify_pvc():
    # Load Kubernetes configuration
    # This looks for the kubeconfig file (usually at ~/.kube/config)
    # Your OpenShift Details

    configuration = kclient.Configuration()
    configuration.host = API_URL
    configuration.verify_ssl = True
    configuration.api_key["authorization"] = f"Bearer {TOKEN}"

    # Initialize client with manual config
    v1 = kclient.CoreV1Api(kclient.ApiClient(configuration))

    print(f"üîç Checking PVC '{PVC_NAME}' in namespace '{NAMESPACE}'...")

    try:
        # Fetch the PVC object
        pvc = v1.read_namespaced_persistent_volume_claim(
            name=PVC_NAME, namespace=NAMESPACE
        )

        # Check Status
        pvc_status = pvc.status.phase
        print(f"‚úÖ PVC '{PVC_NAME}' found, status: {pvc_status}")

        # Check Access Modes
        access_modes = pvc.spec.access_modes
        if access_modes:
            primary_mode = access_modes[0]
            print(f"   Access Mode: {primary_mode}")

            if primary_mode != "ReadWriteMany":
                print(
                    "   ‚ö†Ô∏è  Warning: PVC should use 'ReadWriteMany' access mode for concurrent writes from multiple workers."
                )
        else:
            print("   ‚ö†Ô∏è  Warning: No access modes defined for this PVC.")

    except ApiException as e:
        if e.status == 404:
            print(f"‚ùå PVC '{PVC_NAME}' not found in namespace '{NAMESPACE}'")
            print(
                "\n   Create the PVC first using the YAML in the configuration section."
            )
        else:
            print(f"‚ùå An error occurred while fetching PVC: {e}")


if __name__ == "__main__":
    verify_pvc()

## Configure the RayCluster

Define the cluster configuration, prepare the PVC volume mount, and create the cluster.

### Prepare the PVC volume mount

Define the volume mount and volume objects that will attach the PVC to the RayCluster pods.

In [None]:
shared_mount = V1VolumeMount("/mnt/data", name="shared-data")
data_volume = V1Volume(
    name="shared-data",
    persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name=PVC_NAME),
)

### Define cluster resources

Define the cluster resource specifications (CPU, memory, number of workers) for the RayCluster.

In [None]:
cluster_config = ClusterConfiguration(
    name="ray-data-processor",
    namespace=NAMESPACE,
    num_workers=8,
    worker_cpu_requests=8,
    worker_cpu_limits=8,
    worker_memory_requests=8,
    worker_memory_limits=8,
    volume_mounts=[shared_mount],
    volumes=[data_volume],
    image="quay.io/cathaloconnor/docling-ray:latest",
    envs={"RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION": "0.3"},
)
cluster = Cluster(config=cluster_config)

### Create the cluster

Start the RayCluster and wait until it is ready to accept job submissions.

In [None]:
cluster.apply()
cluster.wait_ready(dashboard_check=False)

### Cluster details

Fetch the cluster details, including the Ray dashboard URL.

In [None]:
cluster.details()

## Submit a job to the cluster

Initialize a job submission client from the cluster, then submit the processing job with performance tuning parameters:

| Parameter | Description | Tuning guidance |
|---|---|---|
| `MAX_ACTORS` | Maximum parallel actors | Set to `total_cluster_cpus / CPUS_PER_ACTOR` |
| `MIN_ACTORS` | Warm actors (avoids cold start) | 2-4 for steady workloads |
| `CPUS_PER_ACTOR` | CPUs per Docling actor | 2 for most PDFs, 4 for complex documents |
| `BATCH_SIZE` | PDFs per actor batch | 1 for large PDFs, 2-4 for small PDFs |

### Configure and submit the job

Set the performance tuning parameters and submit the job with its runtime environment.

In [None]:
# ============================================
# ‚ö° PERFORMANCE TUNING PARAMETERS
# ============================================
# Adjust these based on your cluster size for maximum throughput

# How many files to process
NUM_FILES = "5000"

# Actor pool sizing (CRITICAL for throughput!)
# Formula: MAX_ACTORS ‚âà (total_worker_cpus) / CPUS_PER_ACTOR
# Example: 4 workers √ó 8 CPUs each = 32 CPUs ‚Üí MAX_ACTORS = 16 with 2 CPUs each
MIN_ACTORS = "16"  # Keep minimum actors warm (avoids cold start)
MAX_ACTORS = "16"  # Scale up based on cluster (increase for larger clusters!)

# CPUs per Docling actor
CPUS_PER_ACTOR = "4"

# PDFs per batch (1 for large PDFs, 2-4 for small PDFs < 1MB)
BATCH_SIZE = "1"

# Initialize the Job Submission Client

# The SDK will automatically gather the dashboard address and authenticate using the Ray Job Submission Client
client = cluster.job_client

# Submit an example mnist job using the Job Submission Client
submission_id = client.submit_job(
    entrypoint="python ray_data_process_async.py",
    runtime_env={
        "working_dir": ".",
        "pip": ["opencv-python-headless", "pypdfium2", "orjson"],
        "env_vars": {
            # PVC configuration
            "PVC_MOUNT_PATH": PVC_MOUNT_PATH,
            "INPUT_PATH": INPUT_PATH,
            "OUTPUT_PATH": OUTPUT_PATH,
            # Performance tuning
            "NUM_FILES": NUM_FILES,
            "MIN_ACTORS": MIN_ACTORS,
            "MAX_ACTORS": MAX_ACTORS,
            "CPUS_PER_ACTOR": CPUS_PER_ACTOR,
            "BATCH_SIZE": BATCH_SIZE,
            # üí° Enable detailed progress monitoring
            "RAY_DATA_ENABLE_RICH_PROGRESS_BARS": "true",
            "RAY_record_task_actor_creation_sites": "true",
            "RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION": "0.5",
            # Cache directories
            "HF_HOME": "/tmp/huggingface",
            "XDG_CACHE_HOME": "/tmp/cache",
        },
    },
)
print(submission_id)


print(f"‚úÖ Job Submitted with ID: {submission_id}")
print(f"   ‚Üí PVC Mount: {PVC_MOUNT_PATH}")
print(f"   ‚Üí Input Path: {PVC_MOUNT_PATH}/{INPUT_PATH}")
print(f"   ‚Üí Output Path: {PVC_MOUNT_PATH}/{OUTPUT_PATH}")
print(f"   ‚Üí Actors:  {MIN_ACTORS}-{MAX_ACTORS} √ó {CPUS_PER_ACTOR} CPUs each")

## Monitor job status

Check the status of the submitted job. Re-run these cells to see updates.

In [None]:
# List all existing jobs
print("Job ID", " - ", "Submission ID", " - ", "Status")
for j in client.list_jobs():
    print(j.job_id, " - ", j.submission_id, " - ", j.status)

In [None]:
client.get_job_status(submission_id)

## Retrieve job logs

View or stream the job logs using the job submission client.

In [None]:
# this could be a large object
client.get_job_logs(submission_id)

In [None]:
# Iterate through the logs of a job
async for lines in client.tail_job_logs(submission_id):
    print(lines, end="")

## Cleanup

Clean up resources when you are done processing.

### Remove the job

In [None]:
# Delete a job
# Can run client.stop_job(submission_id) first if job is still running

# client.delete_job(submission_id)

### Remove the cluster

In [None]:
# cluster.down()