Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update explicit-comms for dask-expr support #1323

Merged
merged 23 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 41 additions & 2 deletions ci/test_python.sh
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note on these changes: For 24.06, we want to explicitly test with both DASK_DATAFRAME__QUERY_PLANNING=True and DASK_DATAFRAME__QUERY_PLANNING=False.

Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ set_exit_code() {
trap set_exit_code ERR
set +e

rapids-logger "pytest dask-cuda"
rapids-logger "pytest dask-cuda (dask-expr)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=True \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
Expand All @@ -62,13 +63,51 @@ timeout 60m pytest \
tests -k "not ucxx"
popd

rapids-logger "Run local benchmark"
rapids-logger "pytest explicit-comms (legacy dd)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=False \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
timeout 30m pytest \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, 15 minutes was insufficient, explicit comms tests are some of the longer running ones. It also took me a few minutes to figure out what exactly happened, and to simplify that in the future I've opened #1330 to make this debugging process easier in the future.

-vv \
--durations=0 \
--capture=no \
--cache-clear \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \
--cov-config=../pyproject.toml \
--cov=dask_cuda \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \
--cov-report=term \
tests/test_explicit_comms.py -k "not ucxx"
popd

rapids-logger "Run local benchmark (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Run local benchmark (legacy dd)"
DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
Expand Down
12 changes: 12 additions & 0 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@
from .proxify_device_objects import proxify_decorator, unproxify_decorator


if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get(
"explicit-comms", False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the shuffle "
"API directly, or use the legacy dask-dataframe API "
"(set the 'dataframe.query-planning' config to `False`"
"before importing `dask.dataframe`).",
)


# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True`
dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper(
dask.dataframe.shuffle.rearrange_by_column
Expand Down
47 changes: 27 additions & 20 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import pandas as pd

import dask
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
import dask.dataframe as dd
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes

Expand All @@ -25,12 +24,20 @@
# <https://gist.github.com/rjzamora/0ffc35c19b5180ab04bbf7c793c45955>


def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu):
# Set default shuffle method to "tasks"
if dask.config.get("dataframe.shuffle.method", None) is None:
dask.config.set({"dataframe.shuffle.method": "tasks"})


def generate_chunk(input):
i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input

# Setting a seed that triggers max amount of comm in the two-GPU case.
if gpu:
import cupy as xp

import cudf as xdf
import dask_cudf # noqa: F401
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generating a collection in this way puts us in a strange edge case where dask_cudf may never get imported on the worker, and so some type-based dispatching may fail. The only way to avoid this possibility is to import dask_cudf before we deploy the cluster, or to import dask_cudf here.

else:
import numpy as xp
import pandas as xdf
Expand Down Expand Up @@ -105,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args):

parts = [chunk_size for _ in range(num_chunks)]
device_type = True if args.type == "gpu" else False
meta = generate_chunk(0, 4, 1, chunk_type, None, device_type)
meta = generate_chunk((0, 4, 1, chunk_type, None, device_type))
divisions = [None] * (len(parts) + 1)

name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type)

graph = {
(name, i): (
generate_chunk,
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
}

ddf = new_dd_object(graph, name, meta, divisions)
ddf = dd.from_map(
generate_chunk,
[
(
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
],
meta=meta,
divisions=divisions,
)

if chunk_type == "build":
if not args.no_shuffle:
Expand Down
26 changes: 16 additions & 10 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

import dask
import dask.dataframe
from dask.dataframe.core import new_dd_object
from dask.dataframe.shuffle import shuffle
from dask.distributed import Client, performance_report, wait
from dask.utils import format_bytes, parse_bytes

Expand All @@ -33,7 +31,7 @@


def shuffle_dask(df, args):
result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index)
result = df.shuffle("data", shuffle_method="tasks", ignore_index=args.ignore_index)
if args.backend == "dask-noop":
result = as_noop(result)
t1 = perf_counter()
Expand Down Expand Up @@ -94,18 +92,24 @@ def create_data(
)

# Create partition based to the specified partition distribution
dsk = {}
futures = []
for i, part_size in enumerate(dist):
for _ in range(part_size):
# We use `client.submit` to control placement of the partition.
dsk[(name, len(dsk))] = client.submit(
create_df, chunksize, args.type, workers=[workers[i]], pure=False
futures.append(
client.submit(
create_df, chunksize, args.type, workers=[workers[i]], pure=False
)
)
wait(dsk.values())
wait(futures)

df_meta = create_df(0, args.type)
divs = [None] * (len(dsk) + 1)
ret = new_dd_object(dsk, name, df_meta, divs).persist()
divs = [None] * (len(futures) + 1)
ret = dask.dataframe.from_delayed(
futures,
meta=df_meta,
divisions=divs,
).persist()
wait(ret)

data_processed = args.in_parts * args.partition_size
Expand Down Expand Up @@ -254,7 +258,9 @@ def parse_args():
]

return parse_benchmark_args(
description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args
description="Distributed shuffle (dask/cudf) benchmark",
args_list=special_args,
check_explicit_comms=False,
)


Expand Down
25 changes: 24 additions & 1 deletion dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import pandas as pd

from dask import config
from dask.distributed import Client, SSHCluster
from dask.utils import format_bytes, format_time, parse_bytes
from distributed.comm.addressing import get_address_host
Expand Down Expand Up @@ -47,7 +48,11 @@ def as_noop(dsk):
raise RuntimeError("Requested noop computation but dask-noop not installed.")


def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]):
def parse_benchmark_args(
description="Generic dask-cuda Benchmark",
args_list=[],
check_explicit_comms=True,
):
parser = argparse.ArgumentParser(description=description)
worker_args = parser.add_argument_group(description="Worker configuration")
worker_args.add_argument(
Expand Down Expand Up @@ -317,6 +322,24 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
if args.multi_node and len(args.hosts.split(",")) < 2:
raise ValueError("--multi-node requires at least 2 hosts")

# Raise error early if "explicit-comms" is not allowed
if (
check_explicit_comms
and args.backend == "explicit-comms"
and config.get(
"dataframe.query-planning",
None,
)
is not False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API by setting the following environment "
"variable before executing:",
" DASK_DATAFRAME__QUERY_PLANNING=False",
)

return args


Expand Down
32 changes: 19 additions & 13 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import dask
import dask.config
import dask.dataframe
import dask.dataframe as dd
import dask.utils
import distributed.worker
from dask.base import tokenize
from dask.dataframe.core import DataFrame, Series, _concat as dd_concat, new_dd_object
from dask.dataframe import DataFrame, Series
from dask.dataframe.core import _concat as dd_concat
from dask.dataframe.shuffle import group_split_dispatch, hash_object_dispatch
from distributed import wait
from distributed.protocol import nested_deserialize, to_serialize
Expand Down Expand Up @@ -468,18 +470,19 @@ def shuffle(
npartitions = df.npartitions

# Step (a):
df = df.persist() # Make sure optimizations are apply on the existing graph
df = df.persist() # Make sure optimizations are applied on the existing graph
wait([df]) # Make sure all keys has been materialized on workers
persisted_keys = [f.key for f in c.client.futures_of(df)]
name = (
"explicit-comms-shuffle-"
f"{tokenize(df, column_names, npartitions, ignore_index)}"
f"{tokenize(df, column_names, npartitions, ignore_index, batchsize)}"
)
df_meta: DataFrame = df._meta

# Stage all keys of `df` on the workers and cancel them, which makes it possible
# for the shuffle to free memory as the partitions of `df` are consumed.
# See CommsContext.stage_keys() for a description of staging.
rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__())
rank_to_inkeys = c.stage_keys(name=name, keys=persisted_keys)
c.client.cancel(df)

# Get batchsize
Expand Down Expand Up @@ -526,23 +529,26 @@ def shuffle(
# TODO: can we do this without using `submit()` to avoid the overhead
# of creating a Future for each dataframe partition?

dsk = {}
futures = []
for rank in ranks:
for part_id in rank_to_out_part_ids[rank]:
dsk[(name, part_id)] = c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
futures.append(
c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
)
)

# Create a distributed Dataframe from all the pieces
divs = [None] * (len(dsk) + 1)
ret = new_dd_object(dsk, name, df_meta, divs).persist()
divs = [None] * (len(futures) + 1)
kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"}
ret = dd.from_delayed(futures, **kwargs).persist()
wait([ret])

# Release all temporary dataframes
for fut in [*shuffle_result.values(), *dsk.values()]:
for fut in [*shuffle_result.values(), *futures]:
fut.release()
return ret

Expand Down
22 changes: 17 additions & 5 deletions dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@
from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")

QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False

# Skip these tests when dask-expr is active (for now)
pytestmark = pytest.mark.skipif(
dask.config.get("dataframe.query-planning", None) is not False,
reason="https://github.com/rapidsai/dask-cuda/issues/1311",
query_planning_skip = pytest.mark.skipif(
QUERY_PLANNING_ON,
reason=(
"The 'explicit-comms' config is not supported "
"when query planning is enabled."
),
)

mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")
# Set default shuffle method to "tasks"
if dask.config.get("dataframe.shuffle.method", None) is None:
dask.config.set({"dataframe.shuffle.method": "tasks"})


# Notice, all of the following tests is executed in a new process such
Expand Down Expand Up @@ -89,6 +98,7 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions):
pd.testing.assert_frame_equal(got, expected)


@query_planning_skip
def test_dataframe_merge_empty_partitions():
# Notice, we use more partitions than rows
p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4))
Expand Down Expand Up @@ -227,6 +237,7 @@ def check_shuffle():
check_shuffle()


@query_planning_skip
@pytest.mark.parametrize("in_cluster", [True, False])
def test_dask_use_explicit_comms(in_cluster):
def _timeout(process, function, timeout):
Expand Down Expand Up @@ -289,6 +300,7 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers):
assert_eq(got, expected)


@query_planning_skip
@pytest.mark.parametrize("nworkers", [1, 2, 4])
@pytest.mark.parametrize("backend", ["pandas", "cudf"])
@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"])
Expand Down