-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update explicit-comms for dask-expr support #1323
Changes from 14 commits
85c64d4
be984be
46ae012
c11c47c
dc32127
ad2ea62
107e31a
71b9956
0f8bb29
d04c755
1b6ef7d
fcc0e04
69421df
c79b1f1
68079bb
af6aa67
8c35fba
4a93bc9
a394334
92b496c
206101f
48e6615
5dcd2f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,8 +7,7 @@ | |
import pandas as pd | ||
|
||
import dask | ||
from dask.base import tokenize | ||
from dask.dataframe.core import new_dd_object | ||
import dask.dataframe as dd | ||
from dask.distributed import performance_report, wait | ||
from dask.utils import format_bytes, parse_bytes | ||
|
||
|
@@ -25,12 +24,20 @@ | |
# <https://gist.github.com/rjzamora/0ffc35c19b5180ab04bbf7c793c45955> | ||
|
||
|
||
def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): | ||
# Set default shuffle method to "tasks" | ||
if dask.config.get("dataframe.shuffle.method", None) is None: | ||
dask.config.set({"dataframe.shuffle.method": "tasks"}) | ||
|
||
|
||
def generate_chunk(input): | ||
i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input | ||
|
||
# Setting a seed that triggers max amount of comm in the two-GPU case. | ||
if gpu: | ||
import cupy as xp | ||
|
||
import cudf as xdf | ||
import dask_cudf # noqa: F401 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generating a collection in this way puts us in a strange edge case where |
||
else: | ||
import numpy as xp | ||
import pandas as xdf | ||
|
@@ -105,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): | |
|
||
parts = [chunk_size for _ in range(num_chunks)] | ||
device_type = True if args.type == "gpu" else False | ||
meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) | ||
meta = generate_chunk((0, 4, 1, chunk_type, None, device_type)) | ||
divisions = [None] * (len(parts) + 1) | ||
|
||
name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) | ||
|
||
graph = { | ||
(name, i): ( | ||
generate_chunk, | ||
i, | ||
part, | ||
len(parts), | ||
chunk_type, | ||
frac_match, | ||
device_type, | ||
) | ||
for i, part in enumerate(parts) | ||
} | ||
|
||
ddf = new_dd_object(graph, name, meta, divisions) | ||
ddf = dd.from_map( | ||
generate_chunk, | ||
[ | ||
( | ||
i, | ||
part, | ||
len(parts), | ||
chunk_type, | ||
frac_match, | ||
device_type, | ||
) | ||
for i, part in enumerate(parts) | ||
], | ||
meta=meta, | ||
divisions=divisions, | ||
) | ||
|
||
if chunk_type == "build": | ||
if not args.no_shuffle: | ||
|
@@ -368,9 +375,27 @@ def parse_args(): | |
|
||
|
||
if __name__ == "__main__": | ||
args = parse_args() | ||
|
||
# Raise error early if "explicit-comms" is not allowed | ||
if ( | ||
args.backend == "explicit-comms" | ||
and dask.config.get( | ||
"dataframe.query-planning", | ||
None, | ||
) | ||
is not False | ||
): | ||
raise NotImplementedError( | ||
"The 'explicit-comms' config is not yet supported when " | ||
"query-planning is enabled in dask. Please use the legacy " | ||
"dask-dataframe API (set the 'dataframe.query-planning' " | ||
"config to `False` before executing).", | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems you're not addressing this for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right - sorry. I'll use the same message for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'd still suggest simply moving this to
Yes, sorry, that is what I meant. 😅
But would using it directly still work with query-planning enabled? If not then moving the exception above to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ah - I see what you mean. Using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, then how about still moving this to def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[], *, force_explicit_comms=False):
...
if args.multi_node and len(args.hosts.split(",")) < 2:
raise ValueError("--multi-node requires at least 2 hosts")
# Raise error early if "explicit-comms" is not allowed
if (
args.backend == "explicit-comms"
and dask.config.get(
"dataframe.query-planning",
None,
)
is not False
and force_explicit_comms is False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API (set the 'dataframe.query-planning' "
"config to `False` before executing).",
)
return args Then WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, sounds good. Pushed something very similar to your solution just now (just used a |
||
|
||
execute_benchmark( | ||
Config( | ||
args=parse_args(), | ||
args=args, | ||
bench_once=bench_once, | ||
create_tidy_results=create_tidy_results, | ||
pretty_print_results=pretty_print_results, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note on these changes: For 24.06, we want to explicitly test with both
DASK_DATAFRAME__QUERY_PLANNING=True
andDASK_DATAFRAME__QUERY_PLANNING=False
.