-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update explicit-comms for dask-expr support #1323
Changes from 21 commits
85c64d4
be984be
46ae012
c11c47c
dc32127
ad2ea62
107e31a
71b9956
0f8bb29
d04c755
1b6ef7d
fcc0e04
69421df
c79b1f1
68079bb
af6aa67
8c35fba
4a93bc9
a394334
92b496c
206101f
48e6615
5dcd2f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,8 +7,7 @@ | |
import pandas as pd | ||
|
||
import dask | ||
from dask.base import tokenize | ||
from dask.dataframe.core import new_dd_object | ||
import dask.dataframe as dd | ||
from dask.distributed import performance_report, wait | ||
from dask.utils import format_bytes, parse_bytes | ||
|
||
|
@@ -25,12 +24,20 @@ | |
# <https://gist.github.com/rjzamora/0ffc35c19b5180ab04bbf7c793c45955> | ||
|
||
|
||
def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): | ||
# Set default shuffle method to "tasks" | ||
if dask.config.get("dataframe.shuffle.method", None) is None: | ||
dask.config.set({"dataframe.shuffle.method": "tasks"}) | ||
|
||
|
||
def generate_chunk(input): | ||
i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input | ||
|
||
# Setting a seed that triggers max amount of comm in the two-GPU case. | ||
if gpu: | ||
import cupy as xp | ||
|
||
import cudf as xdf | ||
import dask_cudf # noqa: F401 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generating a collection in this way puts us in a strange edge case where |
||
else: | ||
import numpy as xp | ||
import pandas as xdf | ||
|
@@ -105,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): | |
|
||
parts = [chunk_size for _ in range(num_chunks)] | ||
device_type = True if args.type == "gpu" else False | ||
meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) | ||
meta = generate_chunk((0, 4, 1, chunk_type, None, device_type)) | ||
divisions = [None] * (len(parts) + 1) | ||
|
||
name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) | ||
|
||
graph = { | ||
(name, i): ( | ||
generate_chunk, | ||
i, | ||
part, | ||
len(parts), | ||
chunk_type, | ||
frac_match, | ||
device_type, | ||
) | ||
for i, part in enumerate(parts) | ||
} | ||
|
||
ddf = new_dd_object(graph, name, meta, divisions) | ||
ddf = dd.from_map( | ||
generate_chunk, | ||
[ | ||
( | ||
i, | ||
part, | ||
len(parts), | ||
chunk_type, | ||
frac_match, | ||
device_type, | ||
) | ||
for i, part in enumerate(parts) | ||
], | ||
meta=meta, | ||
divisions=divisions, | ||
) | ||
|
||
if chunk_type == "build": | ||
if not args.no_shuffle: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note on these changes: For 24.06, we want to explicitly test with both
DASK_DATAFRAME__QUERY_PLANNING=True
andDASK_DATAFRAME__QUERY_PLANNING=False
.