Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
150 commits
Select commit Hold shift + click to select a range
1f030b4
initial translation sketch
rjzamora Sep 26, 2025
1ff02fc
add basic dataframe_scan support
rjzamora Sep 26, 2025
2821413
update names
rjzamora Sep 26, 2025
704b2e6
simplify and update test
rjzamora Sep 30, 2025
bc691d2
linting
rjzamora Sep 30, 2025
b011083
refactor
rjzamora Sep 30, 2025
1f1fba6
more refactoring
rjzamora Sep 30, 2025
34de625
begin larger refactor
rjzamora Oct 1, 2025
ce7cfdb
adjust fallback
rjzamora Oct 1, 2025
4107d59
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 1, 2025
7788abb
scan testing
rjzamora Oct 1, 2025
2fdccae
create new test directory
rjzamora Oct 1, 2025
ce9658b
update comments
rjzamora Oct 1, 2025
2c4b54c
adjust fallback
rjzamora Oct 1, 2025
3785cc9
futher notes
rjzamora Oct 1, 2025
ccaa0e1
Merge branch 'branch-25.12' into rapidsmpf-translation
rjzamora Oct 1, 2025
cc017c3
generalize ScanPartitionPlan to IOPartitionPlan
rjzamora Oct 1, 2025
6206dce
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 1, 2025
711708c
Merge branch 'rapidsmpf-translation' of github.com:rjzamora/cudf into…
rjzamora Oct 1, 2025
7b05a34
update Rechunk class
rjzamora Oct 2, 2025
ccfd7c2
split union into distinct file
rjzamora Oct 2, 2025
53a8cb2
avoid using task-engine lowering
rjzamora Oct 2, 2025
4b666eb
add note
rjzamora Oct 2, 2025
88613b5
track whether data is broadcasted
rjzamora Oct 2, 2025
70495cc
linting
rjzamora Oct 2, 2025
854306e
simplify
rjzamora Oct 2, 2025
9d2767e
reusing task-engine lowering in most places - still debugging
rjzamora Oct 2, 2025
7f03abe
debugging deadlock
rjzamora Oct 2, 2025
e1ac8dc
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 3, 2025
ce15be1
small revisions - code still broken
rjzamora Oct 3, 2025
e206277
minor cleanup (still a mess)
rjzamora Oct 3, 2025
2bb97df
still debugging - cursor probably changes some things incorrectly
rjzamora Oct 3, 2025
3c64329
bug fixes
rjzamora Oct 4, 2025
087e5b9
cleanup
rjzamora Oct 4, 2025
0ae9b24
add throttling - though it probably isn't working
rjzamora Oct 4, 2025
0097266
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 4, 2025
a2805c6
fix subtle bug
rjzamora Oct 4, 2025
b094aa5
standardize argument order
rjzamora Oct 6, 2025
5046925
add --rapidsmpf-engine option to pdsh
rjzamora Oct 6, 2025
9b8b6e2
minor cleanup
rjzamora Oct 6, 2025
2315621
removing print statements
rjzamora Oct 6, 2025
1ef3907
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 7, 2025
bbb4be1
very-basic join support
rjzamora Oct 8, 2025
95bb092
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 8, 2025
d12011d
remove final concatenation from lowering
rjzamora Oct 8, 2025
66c208f
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 8, 2025
c480075
cleanup
rjzamora Oct 8, 2025
7848c8e
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 8, 2025
c89505a
avoid concurrent sends - too much memory pressure
rjzamora Oct 8, 2025
56a074c
use default UVM resource for now
rjzamora Oct 8, 2025
eebfeb2
improve bcast join a bit (for inner joins)
rjzamora Oct 8, 2025
9f6e46c
simplify max_io_threads
rjzamora Oct 8, 2025
f01899a
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 8, 2025
d409644
fix scan bug
rjzamora Oct 9, 2025
147a5d3
make io-throttling local to IO node
rjzamora Oct 9, 2025
54ad773
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 9, 2025
a31450b
address partial code review
rjzamora Oct 9, 2025
92b9cbe
set lower bound on estimate for string column size
rjzamora Oct 9, 2025
563eb42
fix bug
rjzamora Oct 9, 2025
2d8ca4e
incorperate lower-limit on storage_size (for now)
rjzamora Oct 9, 2025
2e63d4c
pull out unnecessary changes
rjzamora Oct 10, 2025
72152fa
Merge branch 'branch-25.12' into rapidsmpf-translation
rjzamora Oct 13, 2025
12730dc
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 13, 2025
46970de
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 14, 2025
93fd831
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 16, 2025
f078e2d
update config name to runtime
rjzamora Oct 16, 2025
a113047
add DEFAULT_RUNTIME - still need to debug
rjzamora Oct 16, 2025
5a93792
updated experimental tests to work with rapidsmpf runtime
rjzamora Oct 16, 2025
f2e98fc
remove redundant tests
rjzamora Oct 16, 2025
f7574d3
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 16, 2025
653190a
revise multicast behavior - still pretty ugly
rjzamora Oct 16, 2025
a509492
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 16, 2025
a7a3004
account for recent DataFrame.from_table change
rjzamora Oct 16, 2025
adc46b5
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 17, 2025
a5b3b35
use channel-pair concept
rjzamora Oct 17, 2025
cfa0113
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 17, 2025
7b003ad
update comment
rjzamora Oct 17, 2025
5403cb4
tweak code-coverage
rjzamora Oct 17, 2025
53382ba
revise problematic union_dependancy attribute
rjzamora Oct 20, 2025
4f18016
Merge remote-tracking branch 'upstream/branch-25.12' into rapidsmpf-t…
rjzamora Oct 20, 2025
78a8dcd
revise multicast
rjzamora Oct 21, 2025
ae05e00
adjust test coverage
rjzamora Oct 21, 2025
720d023
adopt ChannelManager class
rjzamora Oct 21, 2025
80ee31d
multicast revisions
rjzamora Oct 22, 2025
7ef2f81
Rename multicast to fanout in rapidsmpf module
rjzamora Oct 22, 2025
4f3b138
Move shutdown_on_error to utils module
rjzamora Oct 22, 2025
85deeb8
partially address code review (simplify join logic for now)
rjzamora Oct 22, 2025
a9dad1e
Merge branch 'main' into rapidsmpf-translation
rjzamora Oct 22, 2025
c1c58d3
rename
rjzamora Oct 22, 2025
9364d0f
Merge branch 'rapidsmpf-translation' of github.com:rjzamora/cudf into…
rjzamora Oct 22, 2025
940b1a1
Replace 'streaming engine' with 'streaming runtime' terminology
rjzamora Oct 22, 2025
f3e22c7
Use operator.add instead of lambda in process_children
rjzamora Oct 22, 2025
15e68d2
update comments
rjzamora Oct 22, 2025
47d166a
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 22, 2025
f5c34b3
minor simplification
rjzamora Oct 22, 2025
217d347
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 23, 2025
02175de
avoid using DEFAULT_STREAM in most places
rjzamora Oct 23, 2025
a2f4635
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 23, 2025
495da51
remove DEFAULT_STREAM usage from io.py
rjzamora Oct 23, 2025
a7739de
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 23, 2025
db55a75
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 27, 2025
8a3aebd
update Message import
rjzamora Oct 27, 2025
d38f58f
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 27, 2025
2d5dd16
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 28, 2025
ad537a8
use pool
rjzamora Oct 28, 2025
1128365
add tests; add --stream-policy pdsh arg
rjzamora Oct 28, 2025
74afb07
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 28, 2025
424617c
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 28, 2025
ac22f6e
make sure at least two IO nodes are always active
rjzamora Oct 28, 2025
3362f0a
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 29, 2025
ff2b50e
adjust for breaking rapidsmpf change
rjzamora Oct 29, 2025
687fe5b
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 29, 2025
15dc89e
use --rmm-async for single cluster as well
rjzamora Oct 29, 2025
42c3f92
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 29, 2025
fc220ae
enable spilling in local shuffle (via env variable for now)
rjzamora Oct 29, 2025
e1713d7
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 30, 2025
455bd4f
update rapidsmpf tests to use the rapidsmpf runtime
rjzamora Oct 30, 2025
debbed0
use Lineariser
rjzamora Oct 30, 2025
024dc49
add Lineariser
rjzamora Oct 30, 2025
d58fc11
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translati…
rjzamora Oct 30, 2025
01dc8bf
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 30, 2025
028ad18
Reduce memory usage
rjzamora Oct 30, 2025
db775bc
reduce memory usage further
rjzamora Oct 30, 2025
037e3f1
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translati…
rjzamora Oct 30, 2025
97e2dbf
rename Lineariser
rjzamora Oct 30, 2025
0941cf9
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translati…
rjzamora Oct 30, 2025
bd8ae2f
Merge branch 'rapidsmpf-translation-linearise' into rapidsmpf-transla…
rjzamora Oct 30, 2025
a805be7
avoid possible stream-race conditions
rjzamora Oct 31, 2025
961898d
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 31, 2025
f32ae12
revise type check
rjzamora Oct 31, 2025
36277b6
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Oct 31, 2025
55eb6ed
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Nov 3, 2025
87d849c
use rapidsmpf Lineariser
rjzamora Nov 3, 2025
546540b
change default CUDAStreamPolicy (for now)
rjzamora Nov 3, 2025
16ad915
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Nov 3, 2025
f066a7f
Use default stream policy in pdsh for now
rjzamora Nov 3, 2025
4cbb36c
fix lineariser logic
rjzamora Nov 3, 2025
7aa7dc8
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Nov 3, 2025
6f13d5f
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Nov 4, 2025
71c470e
adjust defaults and configs
rjzamora Nov 4, 2025
8b463fc
remove comment
rjzamora Nov 4, 2025
81aef3f
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Nov 4, 2025
4e784de
add test
rjzamora Nov 4, 2025
60edcc2
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Nov 4, 2025
f91fb11
roll back dramatic default change (for now)
rjzamora Nov 4, 2025
0f7936b
partial code review
rjzamora Nov 4, 2025
9bfa355
update comments
rjzamora Nov 4, 2025
7d7a007
Merge remote-tracking branch 'upstream/main' into rapidsmpf-translation
rjzamora Nov 4, 2025
5a52ef5
adjust LocalShuffle
rjzamora Nov 4, 2025
87bf05b
more comments
rjzamora Nov 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ci/run_cudf_polars_with_rapidsmpf_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
set -euo pipefail

# Test cudf_polars with rapidsmpf integration
# This script runs experimental tests with single cluster mode
# This script runs experimental tests with single cluster mode and the rapidsmpf runtime

# It is essential to cd into python/cudf_polars as `pytest-xdist` + `coverage` seem to work only at this directory level.

# Support invoking run_cudf_polars_with_rapidsmpf_pytests.sh outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/

# Run experimental tests with the "single" cluster mode
# These tests use rapidsmpf.integrations.single for single-GPU shuffling
rapids-logger "Running experimental tests with single cluster mode"
# Run experimental tests with the "single" cluster mode and the "rapidsmpf" runtime
rapids-logger "Running experimental tests with the 'rapidsmpf' runtime"
python -m pytest --cache-clear "$@" "tests/experimental" \
--executor streaming \
--cluster single
--cluster single \
--runtime rapidsmpf
2 changes: 2 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ class IOPartitionFlavor(IntEnum):
SINGLE_FILE = enum.auto() # 1:1 mapping between files and partitions
SPLIT_FILES = enum.auto() # Split each file into >1 partition
FUSED_FILES = enum.auto() # Fuse multiple files into each partition
SINGLE_READ = enum.auto() # One worker/task reads everything


class IOPartitionPlan:
Expand All @@ -414,6 +415,7 @@ class IOPartitionPlan:
- SINGLE_FILE: `factor` must be `1`.
- SPLIT_FILES: `factor` is the number of partitions per file.
- FUSED_FILES: `factor` is the number of files per partition.
- SINGLE_READ: `factor` is the total number of files.
"""

__slots__ = ("factor", "flavor")
Expand Down
75 changes: 59 additions & 16 deletions python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ class RunConfig:
queries: list[int]
suffix: str
executor: ExecutorType
runtime: str
stream_policy: str
cluster: str
scheduler: str # Deprecated, kept for backward compatibility
n_workers: int
Expand Down Expand Up @@ -267,6 +269,15 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig:
executor: ExecutorType = args.executor
cluster = args.cluster
scheduler = args.scheduler
runtime = args.runtime
stream_policy = args.stream_policy

# Handle "auto" stream policy
if stream_policy == "auto":
# TODO: Use pool by default for rapidsmpf runtime
# once stream-ordering bugs are fixed.
# See: https://github.com/rapidsai/cudf/issues/20484
stream_policy = "default"

# Deal with deprecated scheduler argument
# and non-streaming executors
Expand Down Expand Up @@ -335,6 +346,8 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig:
executor=executor,
cluster=cluster,
scheduler=scheduler,
runtime=runtime,
stream_policy=stream_policy,
n_workers=args.n_workers,
shuffle=args.shuffle,
gather_shuffle_stats=args.rapidsmpf_dask_statistics,
Expand Down Expand Up @@ -374,7 +387,9 @@ def summarize(self) -> None:
print(f"path: {self.dataset_path}")
print(f"scale_factor: {self.scale_factor}")
print(f"executor: {self.executor}")
print(f"stream_policy: {self.stream_policy}")
if self.executor == "streaming":
print(f"runtime: {self.runtime}")
print(f"cluster: {self.cluster}")
print(f"blocksize: {self.blocksize}")
print(f"shuffle_method: {self.shuffle}")
Expand Down Expand Up @@ -415,20 +430,25 @@ def get_executor_options(
"""Generate executor_options for GPUEngine."""
executor_options: dict[str, Any] = {}

if run_config.blocksize:
executor_options["target_partition_size"] = run_config.blocksize
if run_config.max_rows_per_partition:
executor_options["max_rows_per_partition"] = run_config.max_rows_per_partition
if run_config.shuffle:
executor_options["shuffle_method"] = run_config.shuffle
if run_config.broadcast_join_limit:
executor_options["broadcast_join_limit"] = run_config.broadcast_join_limit
if run_config.rapidsmpf_spill:
executor_options["rapidsmpf_spill"] = run_config.rapidsmpf_spill
if run_config.cluster == "distributed":
executor_options["cluster"] = "distributed"
if run_config.stats_planning:
executor_options["stats_planning"] = {"use_reduction_planning": True}
if run_config.executor == "streaming":
if run_config.blocksize:
executor_options["target_partition_size"] = run_config.blocksize
if run_config.max_rows_per_partition:
executor_options["max_rows_per_partition"] = (
run_config.max_rows_per_partition
)
if run_config.shuffle:
executor_options["shuffle_method"] = run_config.shuffle
if run_config.broadcast_join_limit:
executor_options["broadcast_join_limit"] = run_config.broadcast_join_limit
if run_config.rapidsmpf_spill:
executor_options["rapidsmpf_spill"] = run_config.rapidsmpf_spill
if run_config.cluster == "distributed":
executor_options["cluster"] = "distributed"
if run_config.stats_planning:
executor_options["stats_planning"] = {"use_reduction_planning": True}
executor_options["client_device_threshold"] = run_config.spill_device
executor_options["runtime"] = run_config.runtime

if (
benchmark
Expand Down Expand Up @@ -467,7 +487,7 @@ def print_query_plan(
if args.explain_logical:
print(f"\nQuery {q_id} - Logical plan\n")
print(explain_query(q, engine, physical=False))
if args.explain:
if args.explain and run_config.executor == "streaming":
print(f"\nQuery {q_id} - Physical plan\n")
print(explain_query(q, engine))
else:
Expand Down Expand Up @@ -671,6 +691,22 @@ def parse_args(
- synchronous : Run locally in a single process
- distributed : Use Dask for multi-GPU execution"""),
)
parser.add_argument(
"--runtime",
type=str,
choices=["tasks", "rapidsmpf"],
default="tasks",
help="Runtime to use for the streaming executor (tasks or rapidsmpf).",
)
parser.add_argument(
"--stream-policy",
type=str,
choices=["auto", "default", "new", "pool"],
default="auto",
help=textwrap.dedent("""\
CUDA stream policy (auto, default, new, pool).
Default: auto (use the default policy for the runtime)"""),
)
parser.add_argument(
"--n-workers",
default=1,
Expand Down Expand Up @@ -869,6 +905,10 @@ def run_polars(
executor_options = get_executor_options(run_config, benchmark=benchmark)
engine = pl.GPUEngine(
raise_on_fail=True,
memory_resource=rmm.mr.CudaAsyncMemoryResource()
if run_config.rmm_async
else None,
cuda_stream_policy=run_config.stream_policy,
executor=run_config.executor,
executor_options=executor_options,
)
Expand Down Expand Up @@ -928,7 +968,10 @@ def run_polars(
if args.print_results:
print(result)

print(f"Query {q_id} - Iteration {i} finished in {record.duration:0.4f}s")
print(
f"Query {q_id} - Iteration {i} finished in {record.duration:0.4f}s",
flush=True,
)
records[q_id].append(record)

run_config = dataclasses.replace(run_config, records=dict(records))
Expand Down
12 changes: 11 additions & 1 deletion python/cudf_polars/cudf_polars/experimental/explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,17 @@ def explain_query(
ir = Translator(q._ldf.visit(), engine).translate_ir()

if physical:
lowered_ir, partition_info = lower_ir_graph(ir, config)
if (
config.executor.name == "streaming"
and config.executor.runtime == "rapidsmpf"
): # pragma: no cover; rapidsmpf runtime not tested in CI yet
from cudf_polars.experimental.rapidsmpf.core import (
lower_ir_graph as rapidsmpf_lower_ir_graph,
)

lowered_ir, partition_info = rapidsmpf_lower_ir_graph(ir, config)
else:
lowered_ir, partition_info = lower_ir_graph(ir, config)
return _repr_ir_tree(lowered_ir, partition_info)
else:
if config.executor.name == "streaming":
Expand Down
35 changes: 19 additions & 16 deletions python/cudf_polars/cudf_polars/experimental/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def _make_bcast_join(
left: IR,
right: IR,
shuffle_method: ShuffleMethod,
streaming_runtime: str,
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
if ir.options[0] != "Inner":
left_count = partition_info[left].count
Expand All @@ -162,22 +163,23 @@ def _make_bcast_join(
# - In some cases, we can perform the partial joins
# sequentially. However, we are starting with a
# catch-all algorithm that works for all cases.
if left_count >= right_count:
right = _maybe_shuffle_frame(
right,
ir.right_on,
partition_info,
shuffle_method,
right_count,
)
else:
left = _maybe_shuffle_frame(
left,
ir.left_on,
partition_info,
shuffle_method,
left_count,
)
if streaming_runtime == "tasks":
if left_count >= right_count:
right = _maybe_shuffle_frame(
right,
ir.right_on,
partition_info,
shuffle_method,
right_count,
)
else:
left = _maybe_shuffle_frame(
left,
ir.left_on,
partition_info,
shuffle_method,
left_count,
)

new_node = ir.reconstruct([left, right])
partition_info[new_node] = PartitionInfo(count=output_count)
Expand Down Expand Up @@ -288,6 +290,7 @@ def _(
left,
right,
config_options.executor.shuffle_method,
config_options.executor.runtime,
)
else:
# Create a hash join
Expand Down
37 changes: 34 additions & 3 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,29 @@ def post_process_task_graph(
return graph


def evaluate_rapidsmpf(
ir: IR,
config_options: ConfigOptions,
) -> DataFrame: # pragma: no cover; rapidsmpf runtime not tested in CI yet
"""
Evaluate with the RapidsMPF streaming runtime.

Parameters
----------
ir
Logical plan to evaluate.
config_options
GPUEngine configuration options.

Returns
-------
A cudf-polars DataFrame object.
"""
from cudf_polars.experimental.rapidsmpf.core import evaluate_logical_plan

return evaluate_logical_plan(ir, config_options)


def evaluate_streaming(
ir: IR,
config_options: ConfigOptions,
Expand All @@ -245,11 +268,19 @@ def evaluate_streaming(
# Clear source info cache in case data was overwritten
_clear_source_info_cache()

ir, partition_info = lower_ir_graph(ir, config_options)
assert config_options.executor.name == "streaming", "Executor must be streaming"
if (
config_options.executor.runtime == "rapidsmpf"
): # pragma: no cover; rapidsmpf runtime not tested in CI yet
# Using the RapidsMPF streaming runtime.
return evaluate_rapidsmpf(ir, config_options)
else:
# Using the default task engine.
ir, partition_info = lower_ir_graph(ir, config_options)

graph, key = task_graph(ir, partition_info, config_options)
graph, key = task_graph(ir, partition_info, config_options)

return get_scheduler(config_options)(graph, key)
return get_scheduler(config_options)(graph, key)


@generate_ir_tasks.register(IR)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""RapidsMPF streaming-engine support."""

from __future__ import annotations

__all__: list[str] = []
Loading
Loading