In [1]:
from orcapod.execution_engines import RayEngine
import orcapod as op
import pyarrow as pa

In [2]:
ray_engine = RayEngine(
    "ray://op-pipe-kuberay-head-svc.ray.svc.cluster.local:10001",
)

2025-11-12 08:26:40,429	INFO client_builder.py:242 -- Passing the following kwargs to ray.init() on the server: log_to_driver
    Ray: 2.48.0
    Python: 3.12.12
This process on Ray Client was started with:
    Ray: 2.48.0
    Python: 3.12.10



In [3]:
input_stream = op.streams.TableStream(
    pa.Table.from_pylist([{"id": i, "x": i * 2, "y": i * 3} for i in range(50)]),
    tag_columns=["id"],
)

In [4]:
from pathlib import Path


@op.function_pod("sum")
def add_numbers(x: int, y: int) -> int:
    """
    A simple function that adds two numbers.
    """
    import time

    time.sleep(0.2)
    return x + y

First run synchronously

In [5]:
result_stream1 = add_numbers.pod(input_stream)
result_stream1.run()
result_stream1.as_df()

id,sum
i64,i64
0,0
1,5
2,10
3,15
4,20
…,…
45,225
46,230
47,235
48,240


Next we run using Ray engine

In [7]:
result_stream2 = add_numbers.pod(input_stream)
await result_stream2.run_async(execution_engine=ray_engine)
result_stream2.as_df()

id,sum
i64,i64
0,0
1,5
2,10
3,15
4,20
…,…
45,225
46,230
47,235
48,240


**NOTE**: Depending on the availability of nodes and how Ray was configured, you may *not* see any improvement in the running speed for the example above (it may even take longer due to overhead!). If you observe that you don't seem to be getting any speed up, please consult your Ray cluster administrator.

## Integration with pipeline system

In [22]:
# make sure we are stating with a clean slate
import shutil

shutil.rmtree("./test_store", ignore_errors=True)

In [23]:
database = op.databases.DeltaTableDatabase("./test_store")
pipeline = op.Pipeline("pipeline_with_ray", database)

In [24]:
with pipeline:
    result_stream = add_numbers.pod(input_stream)

In [25]:
pipeline.add_numbers

*id,sum,_source_sum,_context_key
i64,i64,str,str


In [26]:
pipeline.run(execution_engine=ray_engine)

In [27]:
pipeline.add_numbers

*id,sum
i64,i64
0,0
1,5
2,10
3,15
4,20
…,…
45,225
46,230
47,235
48,240


In [55]:
pipeline.add_numbers.run()

In [58]:
pipeline.add_numbers

*id,sum
i64,i64
0,0
1,5
2,10
3,15
4,20
…,…
45,225
46,230
47,235
48,240


In [39]:
pipeline.add_numbers

*id,sum
i64,i64
0,0
1,5
2,10
3,15
4,20
…,…
45,225
46,230
47,235
48,240


[36m(autoscaler +2m31s)[0m Removing 1 nodes of type cpuOnlyGroup (idle).
[36m(autoscaler +2m31s)[0m Resized to 30 CPUs.
[36m(autoscaler +3m12s)[0m Removing 1 nodes of type cpuOnlyGroup (idle).
[36m(autoscaler +3m12s)[0m Resized to 0 CPUs.


In [38]:
pipeline.run(execution_engine=ray_engine)

In [40]:
import asyncio

In [12]:
def synchronous_run(async_func, *args, **kwargs):
    """
    Use existing event loop if available.

    Pros: Reuses existing loop, more efficient
    Cons: More complex, need to handle loop detection
    """
    import asyncio
    try:
        # Check if we're already in an event loop
        _ = asyncio.get_running_loop()

        def run_in_thread():
            return asyncio.run(async_func(*args, **kwargs))

        import concurrent.futures

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(run_in_thread)
            return future.result()
    except RuntimeError:
        # No event loop running, safe to use asyncio.run()
        return asyncio.run(async_func(*args, **kwargs))


In [55]:
async def show_message():
    await asyncio.sleep(10)
    print("Hello, World!")

In [56]:
synchronous_run(show_message)

Hello, World!


In [47]:
async def test():
    show_message()
    

In [16]:
pipeline.add_numbers.as_df()

id,sum,_source_sum,_context_key
i64,i64,str,str
