From ec539e7ef78f347e336c52e6a904880f2cfff975 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Wed, 28 Sep 2022 10:57:13 -0500 Subject: [PATCH] Revert "Update rearrange_by_column patch for explicit comms (#992)" This reverts commit 8de9ce3a07f34a1fbf7d5de903f003a4856ea7bf. --- dask_cuda/__init__.py | 8 ++++-- dask_cuda/explicit_comms/dataframe/shuffle.py | 28 ++++--------------- dask_cuda/tests/test_explicit_comms.py | 16 +---------- docs/source/explicit_comms.rst | 5 ++-- 4 files changed, 14 insertions(+), 43 deletions(-) diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 47f0abdb..ed8e6ae9 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -10,7 +10,7 @@ from ._version import get_versions from .cuda_worker import CUDAWorker -from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_wrapper +from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper from .local_cuda_cluster import LocalCUDACluster from .proxify_device_objects import proxify_decorator, unproxify_decorator @@ -19,8 +19,10 @@ # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` -dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( - dask.dataframe.shuffle.rearrange_by_column +dask.dataframe.shuffle.rearrange_by_column_tasks = ( + get_rearrange_by_column_tasks_wrapper( + dask.dataframe.shuffle.rearrange_by_column_tasks + ) ) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 37d0330c..ff72fad2 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -1,7 +1,6 @@ import asyncio import functools import inspect -import warnings from collections import defaultdict from operator import getitem from typing import Dict, List, Optional @@ -345,7 +344,7 @@ def shuffle( return new_dd_object(dsk, name, meta, divs).persist() -def get_rearrange_by_column_wrapper(func): +def get_rearrange_by_column_tasks_wrapper(func): """Returns a function wrapper that dispatch the shuffle to explicit-comms. Notice, this is monkey patched into Dask at dask_cuda import @@ -355,40 +354,23 @@ def get_rearrange_by_column_wrapper(func): @functools.wraps(func) def wrapper(*args, **kwargs): - # Use explicit-comms shuffle if the shuffle kwarg is - # set to "explicit-comms". For now, we also use - # explicit-comms if the shuffle kwarg is set to "tasks" - # and the `dask.config` specifies "explicit-comms". - # However, this behavior should be deprecated in the - # next dask-cuda release (after 22.10) - shuffle_arg = kwargs.pop("shuffle", None) or dask.config.get("shuffle", "disk") - if shuffle_arg == "tasks" and dask.config.get("explicit-comms", False): - shuffle_arg = "explicit-comms" - warnings.warn( - "The 'explicit-comms' config field is now deprecated and " - "will be removed in a future dask-cuda version. Please set " - "the 'shuffle' config to 'explicit-comms' instead, or pass " - "`shuffle='explicit-comms'` to the collection API.", - FutureWarning, - ) - - if shuffle_arg == "explicit-comms": + if dask.config.get("explicit-comms", False): try: import distributed.worker # Make sure we have an activate client. distributed.worker.get_client() except (ImportError, ValueError): - shuffle_arg = "tasks" # Fall back to task-based shuffle + pass else: # Convert `*args, **kwargs` to a dict of `keyword -> values` kw = func_sig.bind(*args, **kwargs) kw.apply_defaults() kw = kw.arguments - column = kw["col"] + column = kw["column"] if isinstance(column, str): column = [column] return shuffle(kw["df"], column, kw["npartitions"], kw["ignore_index"]) - return func(*args, shuffle=shuffle_arg, **kwargs) + return func(*args, **kwargs) return wrapper diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index da664920..dd92e2a6 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -168,30 +168,16 @@ def check_shuffle(in_cluster): """ name = "explicit-comms-shuffle" ddf = dd.from_pandas(pd.DataFrame({"key": np.arange(10)}), npartitions=2) - # TODO: Update when "explicit-comms" config is deprecated - # See: https://github.com/rapidsai/dask-cuda/pull/992 with dask.config.set(explicit_comms=False): res = ddf.shuffle(on="key", npartitions=4, shuffle="tasks") assert all(name not in str(key) for key in res.dask) with dask.config.set(explicit_comms=True): - with pytest.warns(FutureWarning): - res = ddf.shuffle(on="key", npartitions=4, shuffle="tasks") + res = ddf.shuffle(on="key", npartitions=4, shuffle="tasks") if in_cluster: assert any(name in str(key) for key in res.dask) else: # If not in cluster, we cannot use explicit comms assert all(name not in str(key) for key in res.dask) - # Passing explicit `shuffle="explicit-comms"` argument - res = dd.shuffle.shuffle(ddf, "key", npartitions=4, shuffle="explicit-comms") - if in_cluster: - assert any(name in str(key) for key in res.dask) - else: # If not in cluster, we cannot use explicit comms - assert all(name not in str(key) for key in res.dask) - - # Passing explicit `shuffle="tasks"` argument - res = dd.shuffle.shuffle(ddf, "key", npartitions=4, shuffle="tasks") - assert all(name not in str(key) for key in res.dask) - with LocalCluster( protocol="tcp", dashboard_address=None, diff --git a/docs/source/explicit_comms.rst b/docs/source/explicit_comms.rst index 75fb9ea6..56ad9775 100644 --- a/docs/source/explicit_comms.rst +++ b/docs/source/explicit_comms.rst @@ -2,7 +2,7 @@ Explicit-comms ============== Communication and scheduling overhead can be a major bottleneck in Dask/Distributed. Dask-CUDA addresses this by introducing an API for explicit communication in Dask tasks. -The idea is that Dask/Distributed spawns workers and distributes the data as usual while the user can submit tasks on the workers that communicate explicitly. +The idea is that Dask/Distributed spawns workers and distribute data as usually while the user can submit tasks on the workers that communicate explicitly. This makes it possible to bypass Distributed's scheduler and write hand-tuned computation and communication patterns. Currently, Dask-CUDA includes an explicit-comms implementation of the Dataframe `shuffle `_ operation used for merging and sorting. @@ -11,6 +11,7 @@ implementation of the Dataframe `shuffle `_. +In order to use explicit-comms in Dask/Distributed automatically, simply define the environment variable ``DASK_EXPLICIT_COMMS=True`` or setting the ``"explicit-comms"`` +key in the `Dask configuration `_. It is also possible to use explicit-comms in tasks manually, see the `API `_ and our `implementation of shuffle `_ for guidance.