diff --git a/ci/test_python_other.sh b/ci/test_python_other.sh index 9cdceb295db..8ecd02f70a1 100755 --- a/ci/test_python_other.sh +++ b/ci/test_python_other.sh @@ -29,6 +29,14 @@ rapids-logger "pytest dask_cudf" --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cudf-coverage.xml" \ --cov-report=term +# Run tests in dask_cudf/tests and dask_cudf/io/tests with dask-expr +rapids-logger "pytest dask_cudf + dask_expr" +DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-expr.xml" \ + --numprocesses=8 \ + --dist=loadscope \ + . + rapids-logger "pytest custreamz" ./ci/run_custreamz_pytests.sh \ --junitxml="${RAPIDS_TESTS_DIR}/junit-custreamz.xml" \ diff --git a/ci/test_wheel_dask_cudf.sh b/ci/test_wheel_dask_cudf.sh index 59f6ecd8483..398eed43ea4 100755 --- a/ci/test_wheel_dask_cudf.sh +++ b/ci/test_wheel_dask_cudf.sh @@ -38,3 +38,12 @@ python -m pytest \ --numprocesses=8 \ . popd + +# Run tests in dask_cudf/tests and dask_cudf/io/tests with dask-expr +rapids-logger "pytest dask_cudf + dask_expr" +pushd python/dask_cudf/dask_cudf +DASK_DATAFRAME__QUERY_PLANNING=True python -m pytest \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-expr.xml" \ + --numprocesses=8 \ + . +popd diff --git a/python/dask_cudf/dask_cudf/__init__.py b/python/dask_cudf/dask_cudf/__init__.py index c152a9e6a81..c66e85ed2af 100644 --- a/python/dask_cudf/dask_cudf/__init__.py +++ b/python/dask_cudf/dask_cudf/__init__.py @@ -1,29 +1,75 @@ -# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Copyright (c) 2018-2024, NVIDIA CORPORATION. +from dask import config + +# For dask>2024.2.0, we can silence the loud deprecation +# warning before importing `dask.dataframe` (this won't +# do anything for dask==2024.2.0) +config.set({"dataframe.query-planning-warning": False}) + +import dask.dataframe as dd from dask.dataframe import from_delayed import cudf from . import backends from ._version import __git_commit__, __version__ -from .core import DataFrame, Series, concat, from_cudf, from_dask_dataframe -from .groupby import groupby_agg -from .io import read_csv, read_json, read_orc, read_text, to_orc +from .core import concat, from_cudf, from_dask_dataframe +from .expr import QUERY_PLANNING_ON + + +def read_csv(*args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return dd.read_csv(*args, **kwargs) + + +def read_json(*args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return dd.read_json(*args, **kwargs) + + +def read_orc(*args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return dd.read_orc(*args, **kwargs) + + +def read_parquet(*args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return dd.read_parquet(*args, **kwargs) + + +def raise_not_implemented_error(attr_name): + def inner_func(*args, **kwargs): + raise NotImplementedError( + f"Top-level {attr_name} API is not available for dask-expr." + ) + + return inner_func + + +if QUERY_PLANNING_ON: + from .expr._collection import DataFrame, Index, Series + + groupby_agg = raise_not_implemented_error("groupby_agg") + read_text = raise_not_implemented_error("read_text") + to_orc = raise_not_implemented_error("to_orc") +else: + from .core import DataFrame, Index, Series + from .groupby import groupby_agg + from .io import read_text, to_orc -try: - from .io import read_parquet -except ImportError: - pass __all__ = [ "DataFrame", "Series", + "Index", "from_cudf", "from_dask_dataframe", "concat", "from_delayed", ] + if not hasattr(cudf.DataFrame, "mean"): cudf.DataFrame.mean = None del cudf diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 317c45ba582..c7b4a1c4c6a 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -627,13 +627,68 @@ def read_csv(*args, **kwargs): @staticmethod def read_hdf(*args, **kwargs): - from dask_cudf import from_dask_dataframe - # HDF5 reader not yet implemented in cudf warnings.warn( "read_hdf is not yet implemented in cudf/dask_cudf. " "Moving to cudf from pandas. Expect poor performance!" ) - return from_dask_dataframe( - _default_backend(dd.read_hdf, *args, **kwargs) + return _default_backend(dd.read_hdf, *args, **kwargs).to_backend( + "cudf" + ) + + +# Define "cudf" backend entrypoint for dask-expr +class CudfDXBackendEntrypoint(DataFrameBackendEntrypoint): + """Backend-entrypoint class for Dask-Expressions + + This class is registered under the name "cudf" for the + ``dask-expr.dataframe.backends`` entrypoint in ``setup.cfg``. + Dask-DataFrame will use the methods defined in this class + in place of ``dask_expr.`` when the + "dataframe.backend" configuration is set to "cudf": + + Examples + -------- + >>> import dask + >>> import dask_expr + >>> with dask.config.set({"dataframe.backend": "cudf"}): + ... ddf = dx.from_dict({"a": range(10)}) + >>> type(ddf._meta) + + """ + + @classmethod + def to_backend_dispatch(cls): + return CudfBackendEntrypoint.to_backend_dispatch() + + @classmethod + def to_backend(cls, *args, **kwargs): + return CudfBackendEntrypoint.to_backend(*args, **kwargs) + + @staticmethod + def from_dict( + data, + npartitions, + orient="columns", + dtype=None, + columns=None, + constructor=cudf.DataFrame, + ): + import dask_expr as dx + + return _default_backend( + dx.from_dict, + data, + npartitions=npartitions, + orient=orient, + dtype=dtype, + columns=columns, + constructor=constructor, ) + + +# Import/register cudf-specific classes for dask-expr +try: + import dask_cudf.expr # noqa: F401 +except ImportError: + pass diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py index b051b21790e..bfe58531a73 100644 --- a/python/dask_cudf/dask_cudf/core.py +++ b/python/dask_cudf/dask_cudf/core.py @@ -685,18 +685,27 @@ def reduction( @_dask_cudf_nvtx_annotate def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): + from dask_cudf import QUERY_PLANNING_ON + if isinstance(getattr(data, "index", None), cudf.MultiIndex): raise NotImplementedError( "dask_cudf does not support MultiIndex Dataframes." ) - name = name or ("from_cudf-" + tokenize(data, npartitions or chunksize)) + # Dask-expr doesn't support the `name` argument + name = {} + if not QUERY_PLANNING_ON: + name = { + "name": name + or ("from_cudf-" + tokenize(data, npartitions or chunksize)) + } + return dd.from_pandas( data, npartitions=npartitions, chunksize=chunksize, sort=sort, - name=name, + **name, ) @@ -711,7 +720,10 @@ def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): rather than pandas objects.\n """ ) - + textwrap.dedent(dd.from_pandas.__doc__) + # TODO: `dd.from_pandas.__doc__` is empty when + # `DASK_DATAFRAME__QUERY_PLANNING=True` + # since dask-expr does not provide a docstring for from_pandas. + + textwrap.dedent(dd.from_pandas.__doc__ or "") ) diff --git a/python/dask_cudf/dask_cudf/expr/__init__.py b/python/dask_cudf/dask_cudf/expr/__init__.py new file mode 100644 index 00000000000..c36dd0abcb9 --- /dev/null +++ b/python/dask_cudf/dask_cudf/expr/__init__.py @@ -0,0 +1,22 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from dask import config + +# Check if dask-dataframe is using dask-expr. +# For dask>=2024.3.0, a null value will default to True +QUERY_PLANNING_ON = config.get("dataframe.query-planning", None) is not False + +# Register custom expressions and collections +try: + import dask_cudf.expr._collection + import dask_cudf.expr._expr + +except ImportError as err: + if QUERY_PLANNING_ON: + # Dask *should* raise an error before this. + # However, we can still raise here to be certain. + raise RuntimeError( + "Failed to register the 'cudf' backend for dask-expr." + " Please make sure you have dask-expr installed.\n" + f"Error Message: {err}" + ) diff --git a/python/dask_cudf/dask_cudf/expr/_collection.py b/python/dask_cudf/dask_cudf/expr/_collection.py new file mode 100644 index 00000000000..b2f92aeddda --- /dev/null +++ b/python/dask_cudf/dask_cudf/expr/_collection.py @@ -0,0 +1,110 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from functools import cached_property + +from dask_expr import ( + DataFrame as DXDataFrame, + FrameBase, + Index as DXIndex, + Series as DXSeries, + get_collection_type, +) +from dask_expr._collection import new_collection +from dask_expr._util import _raise_if_object_series + +from dask import config +from dask.dataframe.core import is_dataframe_like + +import cudf + +## +## Custom collection classes +## + + +# VarMixin can be removed if cudf#15179 is addressed. +# See: https://github.com/rapidsai/cudf/issues/15179 +class VarMixin: + def var( + self, + axis=0, + skipna=True, + ddof=1, + numeric_only=False, + split_every=False, + **kwargs, + ): + _raise_if_object_series(self, "var") + axis = self._validate_axis(axis) + self._meta.var(axis=axis, skipna=skipna, numeric_only=numeric_only) + frame = self + if is_dataframe_like(self._meta) and numeric_only: + # Convert to pandas - cudf does something weird here + index = self._meta.to_pandas().var(numeric_only=True).index + frame = frame[list(index)] + return new_collection( + frame.expr.var( + axis, skipna, ddof, numeric_only, split_every=split_every + ) + ) + + +class DataFrame(VarMixin, DXDataFrame): + @classmethod + def from_dict(cls, *args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return DXDataFrame.from_dict(*args, **kwargs) + + def groupby( + self, + by, + group_keys=True, + sort=None, + observed=None, + dropna=None, + **kwargs, + ): + from dask_cudf.expr._groupby import GroupBy + + if isinstance(by, FrameBase) and not isinstance(by, DXSeries): + raise ValueError( + f"`by` must be a column name or list of columns, got {by}." + ) + + return GroupBy( + self, + by, + group_keys=group_keys, + sort=sort, + observed=observed, + dropna=dropna, + **kwargs, + ) + + +class Series(VarMixin, DXSeries): + def groupby(self, by, **kwargs): + from dask_cudf.expr._groupby import SeriesGroupBy + + return SeriesGroupBy(self, by, **kwargs) + + @cached_property + def list(self): + from dask_cudf.accessors import ListMethods + + return ListMethods(self) + + @cached_property + def struct(self): + from dask_cudf.accessors import StructMethods + + return StructMethods(self) + + +class Index(DXIndex): + pass # Same as pandas (for now) + + +get_collection_type.register(cudf.DataFrame, lambda _: DataFrame) +get_collection_type.register(cudf.Series, lambda _: Series) +get_collection_type.register(cudf.BaseIndex, lambda _: Index) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py new file mode 100644 index 00000000000..cbe7a71cb73 --- /dev/null +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -0,0 +1,58 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from dask_expr._cumulative import CumulativeBlockwise, TakeLast +from dask_expr._reductions import Var + +## +## Custom expression patching +## + + +# This can be removed after cudf#15176 is addressed. +# See: https://github.com/rapidsai/cudf/issues/15176 +class PatchCumulativeBlockwise(CumulativeBlockwise): + @property + def _args(self) -> list: + return self.operands[:1] + + @property + def _kwargs(self) -> dict: + # Must pass axis and skipna as kwargs in cudf + return {"axis": self.axis, "skipna": self.skipna} + + +CumulativeBlockwise._args = PatchCumulativeBlockwise._args +CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs + + +# This can be removed if squeeze support is added to cudf, +# or if squeeze is removed from the dask-expr logic. +# See: https://github.com/rapidsai/cudf/issues/15177 +def _takelast(a, skipna=True): + if not len(a): + return a + if skipna: + a = a.bfill() + # Cannot use `squeeze` with cudf + return a.tail(n=1).iloc[0] + + +TakeLast.operation = staticmethod(_takelast) + + +# This patch accounts for differences between +# numpy and cupy behavior. It may make sense +# to move this logic upstream. +_dx_reduction_aggregate = Var.reduction_aggregate + + +def _reduction_aggregate(*args, **kwargs): + result = _dx_reduction_aggregate(*args, **kwargs) + if result.ndim == 0: + # cupy will sometimes produce a 0d array, and + # we need to convert it to a scalar. + return result.item() + return result + + +Var.reduction_aggregate = staticmethod(_reduction_aggregate) diff --git a/python/dask_cudf/dask_cudf/expr/_groupby.py b/python/dask_cudf/dask_cudf/expr/_groupby.py new file mode 100644 index 00000000000..7f275151f75 --- /dev/null +++ b/python/dask_cudf/dask_cudf/expr/_groupby.py @@ -0,0 +1,48 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from dask_expr._groupby import ( + GroupBy as DXGroupBy, + SeriesGroupBy as DXSeriesGroupBy, +) +from dask_expr._util import is_scalar + +## +## Custom groupby classes +## + +# TODO: These classes are mostly a work-around for missing +# `observed=False` support. +# See: https://github.com/rapidsai/cudf/issues/15173 + + +class GroupBy(DXGroupBy): + def __init__(self, *args, observed=None, **kwargs): + observed = observed if observed is not None else True + super().__init__(*args, observed=observed, **kwargs) + + def __getitem__(self, key): + if is_scalar(key): + return SeriesGroupBy( + self.obj, + by=self.by, + slice=key, + sort=self.sort, + dropna=self.dropna, + observed=self.observed, + ) + g = GroupBy( + self.obj, + by=self.by, + slice=key, + sort=self.sort, + dropna=self.dropna, + observed=self.observed, + group_keys=self.group_keys, + ) + return g + + +class SeriesGroupBy(DXSeriesGroupBy): + def __init__(self, *args, observed=None, **kwargs): + observed = observed if observed is not None else True + super().__init__(*args, observed=observed, **kwargs) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_json.py b/python/dask_cudf/dask_cudf/io/tests/test_json.py index 5e06832ed94..a2b1d7fc114 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_json.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_json.py @@ -10,6 +10,10 @@ from dask.utils import tmpfile import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr + +# No dask-expr support +pytestmark = skip_dask_expr() def test_read_json_backend_dispatch(tmp_path): diff --git a/python/dask_cudf/dask_cudf/io/tests/test_orc.py b/python/dask_cudf/dask_cudf/io/tests/test_orc.py index c2be75e8ddd..8ccb7a7bfe7 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_orc.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_orc.py @@ -12,6 +12,10 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr + +# No dask-expr support +pytestmark = skip_dask_expr() cur_dir = os.path.dirname(__file__) sample_orc = os.path.join(cur_dir, "data/orc/sample.orc") diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 5e4ea578101..de2a735b2ce 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -15,6 +15,7 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr, xfail_dask_expr # Check if create_metadata_file is supported by # the current dask.dataframe version @@ -71,7 +72,7 @@ def test_roundtrip_from_dask(tmpdir, divisions, write_metadata_file): ddf2 = dask_cudf.read_parquet( files, columns="y", calculate_divisions=divisions ) - dd.assert_eq(ddf[["y"]], ddf2, check_divisions=divisions) + dd.assert_eq(ddf["y"], ddf2, check_divisions=divisions) # Now include metadata ddf2 = dask_cudf.read_parquet(tmpdir, calculate_divisions=divisions) @@ -87,7 +88,7 @@ def test_roundtrip_from_dask(tmpdir, divisions, write_metadata_file): ddf2 = dask_cudf.read_parquet( tmpdir, columns="y", calculate_divisions=divisions ) - dd.assert_eq(ddf[["y"]], ddf2, check_divisions=divisions) + dd.assert_eq(ddf["y"], ddf2, check_divisions=divisions) def test_roundtrip_from_dask_index_false(tmpdir): @@ -184,6 +185,7 @@ def test_dask_timeseries_from_dask(tmpdir, index, divisions): ) +@xfail_dask_expr("Categorical column support") @pytest.mark.parametrize("index", [False, None]) @pytest.mark.parametrize("divisions", [False, True]) def test_dask_timeseries_from_daskcudf(tmpdir, index, divisions): @@ -292,7 +294,11 @@ def test_filters_at_row_group_level(tmpdir): assert a.npartitions == 1 assert (a.shape[0] == 1).compute() - ddf.to_parquet(tmp_path, engine="pyarrow", row_group_size=1) + # Overwrite=True can be removed for dask-expr>=0.4.1 + # See: https://github.com/dask-contrib/dask-expr/issues/800 + ddf.to_parquet( + tmp_path, engine="pyarrow", row_group_size=1, overwrite=True + ) b = dask_cudf.read_parquet( tmp_path, filters=[("x", "==", 1)], split_row_groups=True @@ -436,6 +442,7 @@ def test_create_metadata_file(tmpdir, partition_on): dd.assert_eq(ddf1, ddf2) +@xfail_dask_expr("dtypes are inconsistent") @need_create_meta def test_create_metadata_file_inconsistent_schema(tmpdir): # NOTE: This test demonstrates that the CudfEngine @@ -516,15 +523,19 @@ def test_cudf_list_struct_write(tmpdir): dd.assert_eq(df, new_ddf) +@skip_dask_expr("Not necessary in dask-expr") def test_check_file_size(tmpdir): # Test simple file-size check to help warn users # of upstream change to `split_row_groups` default fn = str(tmpdir.join("test.parquet")) cudf.DataFrame({"a": np.arange(1000)}).to_parquet(fn) with pytest.warns(match="large parquet file"): - dask_cudf.read_parquet(fn, check_file_size=1).compute() + # Need to use `dask_cudf.io` path + # TODO: Remove outdated `check_file_size` functionality + dask_cudf.io.read_parquet(fn, check_file_size=1).compute() +@xfail_dask_expr("HivePartitioning cannot be hashed") def test_null_partition(tmpdir): import pyarrow as pa from pyarrow.dataset import HivePartitioning @@ -554,11 +565,10 @@ def test_nullable_schema_mismatch(tmpdir): path1 = str(tmpdir.join("test.1.parquet")) cudf.DataFrame.from_dict({"a": [1, 2, 3]}).to_parquet(path0) cudf.DataFrame.from_dict({"a": [4, 5, None]}).to_parquet(path1) - with dask.config.set({"dataframe.backend": "cudf"}): - ddf = dd.read_parquet( - [path0, path1], split_row_groups=2, aggregate_files=True - ) - expect = pd.read_parquet([path0, path1]) + ddf = dask_cudf.read_parquet( + [path0, path1], split_row_groups=2, aggregate_files=True + ) + expect = pd.read_parquet([path0, path1]) dd.assert_eq(ddf, expect, check_index=False) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 7614ea38d6a..f4a6fabdb60 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. import os import socket @@ -10,6 +10,10 @@ import pytest import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr + +# No dask-expr support +pytestmark = skip_dask_expr() moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") diff --git a/python/dask_cudf/dask_cudf/io/tests/test_text.py b/python/dask_cudf/dask_cudf/io/tests/test_text.py index a14eec1fea9..d3dcd386d0d 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_text.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_text.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. import os @@ -9,6 +9,10 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr + +# No dask-expr support +pytestmark = skip_dask_expr() cur_dir = os.path.dirname(__file__) text_file = os.path.join(cur_dir, "data/text/sample.pgn") diff --git a/python/dask_cudf/dask_cudf/tests/test_accessor.py b/python/dask_cudf/dask_cudf/tests/test_accessor.py index 8c9ce45df59..ebb8e4be187 100644 --- a/python/dask_cudf/dask_cudf/tests/test_accessor.py +++ b/python/dask_cudf/dask_cudf/tests/test_accessor.py @@ -12,6 +12,7 @@ from cudf.testing._utils import assert_eq, does_not_raise import dask_cudf +from dask_cudf.tests.utils import xfail_dask_expr ############################################################################# # Datetime Accessor # @@ -110,6 +111,7 @@ def test_categorical_accessor_initialization2(data): dsr.cat +@xfail_dask_expr("TODO: Unexplained dask-expr failure") @pytest.mark.parametrize("data", [data_cat_1()]) def test_categorical_basic(data): cat = data.copy() @@ -201,10 +203,11 @@ def test_categorical_compare_unordered(data): dsr < dsr +@xfail_dask_expr("TODO: Unexplained dask-expr failure") @pytest.mark.parametrize("data", [data_cat_3()]) def test_categorical_compare_ordered(data): - cat1 = data[0] - cat2 = data[1] + cat1 = data[0].copy() + cat2 = data[1].copy() pdsr1 = pd.Series(cat1) pdsr2 = pd.Series(cat2) sr1 = Series(cat1) @@ -271,6 +274,7 @@ def test_categorical_categories(): ) +@xfail_dask_expr("TODO: Unexplained dask-expr failure") def test_categorical_as_known(): df = dask_cudf.from_cudf(DataFrame({"col_1": [0, 1, 2, 3]}), npartitions=2) df["col_1"] = df["col_1"].astype("category") diff --git a/python/dask_cudf/dask_cudf/tests/test_applymap.py b/python/dask_cudf/dask_cudf/tests/test_applymap.py index 929f00ec296..d84235481c3 100644 --- a/python/dask_cudf/dask_cudf/tests/test_applymap.py +++ b/python/dask_cudf/dask_cudf/tests/test_applymap.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. import pytest from pandas import NA @@ -24,6 +24,6 @@ def test_applymap_basic(func, has_na): dpdf = dd.from_pandas(pdf, npartitions=dgdf.npartitions) - expect = dpdf.applymap(func) - got = dgdf.applymap(func) + expect = dpdf.map(func) + got = dgdf.map(func) dd.assert_eq(expect, got, check_dtype=False) diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index ecad2220ba5..8a2f3414fd1 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -15,6 +15,7 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr, xfail_dask_expr def test_from_dict_backend_dispatch(): @@ -83,7 +84,7 @@ def test_to_backend_kwargs(): gser_null.to_backend("pandas", bad_arg=True) -def test_from_cudf(): +def test_from_pandas(): np.random.seed(0) df = pd.DataFrame( @@ -95,16 +96,16 @@ def test_from_cudf(): gdf = cudf.DataFrame.from_pandas(df) - # Test simple around to/from dask + # Test simple around to/from cudf ingested = dd.from_pandas(gdf, npartitions=2) dd.assert_eq(ingested, df) - # Test conversion to dask.dataframe - ddf = ingested.to_dask_dataframe() + # Test conversion back to pandas + ddf = ingested.to_backend("pandas") dd.assert_eq(ddf, df) -def test_from_cudf_multiindex_raises(): +def test_from_pandas_multiindex_raises(): df = cudf.DataFrame({"x": list("abc"), "y": [1, 2, 3], "z": [1, 2, 3]}) with pytest.raises(NotImplementedError): @@ -112,7 +113,7 @@ def test_from_cudf_multiindex_raises(): dask_cudf.from_cudf(df.set_index(["x", "y"])) -def test_from_cudf_with_generic_idx(): +def test_from_pandas_with_generic_idx(): cdf = cudf.DataFrame( { "a": list(range(20)), @@ -187,22 +188,8 @@ def test_head(): dd.assert_eq(dgf.head(), df.head()) -def test_from_dask_dataframe(): - np.random.seed(0) - df = pd.DataFrame( - {"x": np.random.randint(0, 5, size=20), "y": np.random.normal(size=20)} - ) - ddf = dd.from_pandas(df, npartitions=2) - dgdf = ddf.map_partitions(cudf.from_pandas) - got = dgdf.compute().to_pandas() - expect = df - - dd.assert_eq(got, expect) - - @pytest.mark.parametrize("nelem", [10, 200, 1333]) -@pytest.mark.parametrize("divisions", [None, "quantile"]) -def test_set_index(nelem, divisions): +def test_set_index(nelem): with dask.config.set(scheduler="single-threaded"): np.random.seed(0) # Use unique index range as the sort may not be stable-ordering @@ -212,14 +199,15 @@ def test_set_index(nelem, divisions): {"x": x, "y": np.random.randint(0, nelem, size=nelem)} ) ddf = dd.from_pandas(df, npartitions=2) - dgdf = ddf.map_partitions(cudf.from_pandas) + ddf2 = ddf.to_backend("cudf") expect = ddf.set_index("x") - got = dgdf.set_index("x", divisions=divisions) + got = ddf2.set_index("x") dd.assert_eq(expect, got, check_index=False, check_divisions=False) +@xfail_dask_expr("missing support for divisions='quantile'") @pytest.mark.parametrize("by", ["a", "b"]) @pytest.mark.parametrize("nelem", [10, 500]) @pytest.mark.parametrize("nparts", [1, 10]) @@ -269,7 +257,6 @@ def test_set_index_2(nelem): assert_frame_equal_by_index_group(expect, got) -@pytest.mark.xfail(reason="dask's index name '__dask_cudf.index' is correct") def test_set_index_w_series(): with dask.config.set(scheduler="single-threaded"): nelem = 20 @@ -349,7 +336,8 @@ def test_assign(): newcol = dd.from_pandas(cudf.Series(pdcol), npartitions=dgf.npartitions) got = dgf.assign(z=newcol) - dd.assert_eq(got.loc[:, ["x", "y"]], df) + # Using `loc[:, ["x", "y"]]` was broken for dask-expr 0.4.0 + dd.assert_eq(got[["x", "y"]], df) np.testing.assert_array_equal(got["z"].compute().values_host, pdcol) @@ -400,6 +388,7 @@ def test_setitem_scalar_datetime(): np.testing.assert_array_equal(got["z"], df["z"]) +@skip_dask_expr("Not relevant for dask-expr") @pytest.mark.parametrize( "func", [ @@ -756,13 +745,13 @@ def test_dataframe_assign_col(): ddf = dask_cudf.from_cudf(df, npartitions=4) ddf["fold"] = 0 ddf["fold"] = ddf["fold"].map_partitions( - lambda cudf_df: cp.random.randint(0, 4, len(cudf_df)) + lambda cudf_df: cudf.Series(cp.random.randint(0, 4, len(cudf_df))) ) pddf = dd.from_pandas(pdf, npartitions=4) pddf["fold"] = 0 pddf["fold"] = pddf["fold"].map_partitions( - lambda p_df: np.random.randint(0, 4, len(p_df)) + lambda p_df: pd.Series(np.random.randint(0, 4, len(p_df))) ) dd.assert_eq(ddf[0], pddf[0]) @@ -787,6 +776,7 @@ def test_dataframe_set_index(): assert_eq(ddf.compute(), pddf.compute()) +@xfail_dask_expr("Insufficient describe support in dask-expr") def test_series_describe(): random.seed(0) sr = cudf.datasets.randomdata(20)["x"] @@ -802,6 +792,7 @@ def test_series_describe(): ) +@xfail_dask_expr("Insufficient describe support in dask-expr") def test_dataframe_describe(): random.seed(0) df = cudf.datasets.randomdata(20) @@ -815,6 +806,7 @@ def test_dataframe_describe(): ) +@xfail_dask_expr("Insufficient describe support in dask-expr") def test_zero_std_describe(): num = 84886781 df = cudf.DataFrame( @@ -858,15 +850,6 @@ def test_index_map_partitions(): def test_merging_categorical_columns(): - try: - from dask.dataframe.dispatch import ( # noqa: F401 - union_categoricals_dispatch, - ) - except ImportError: - pytest.skip( - "need a version of dask that has union_categoricals_dispatch" - ) - df_1 = cudf.DataFrame( {"id_1": [0, 1, 2, 3], "cat_col": ["a", "b", "f", "f"]} ) @@ -882,6 +865,7 @@ def test_merging_categorical_columns(): ddf_2 = dask_cudf.from_cudf(df_2, npartitions=2) ddf_2 = dd.categorical.categorize(ddf_2, columns=["cat_col"]) + expected = cudf.DataFrame( { "id_1": [2, 3], @@ -894,15 +878,11 @@ def test_merging_categorical_columns(): "id_2": [113, 113], } ) - dd.assert_eq(ddf_1.merge(ddf_2), expected) + with pytest.warns(UserWarning, match="mismatch"): + dd.assert_eq(ddf_1.merge(ddf_2), expected) def test_correct_meta(): - try: - from dask.dataframe.dispatch import make_meta_obj # noqa: F401 - except ImportError: - pytest.skip("need make_meta_obj to be preset") - # Need these local imports in this specific order. # For context: https://github.com/rapidsai/cudf/issues/7946 import pandas as pd diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 30251b88dea..3bb3e3b0bb8 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -12,6 +12,17 @@ import dask_cudf from dask_cudf.groupby import OPTIMIZED_AGGS, _aggs_optimized +from dask_cudf.tests.utils import QUERY_PLANNING_ON, xfail_dask_expr + +# XFAIL "collect" tests for now +agg_params = [agg for agg in OPTIMIZED_AGGS if agg != "collect"] +if QUERY_PLANNING_ON: + agg_params.append( + # TODO: "collect" not supported with dask-expr yet + pytest.param("collect", marks=pytest.mark.xfail) + ) +else: + agg_params.append("collect") def assert_cudf_groupby_layers(ddf): @@ -46,48 +57,42 @@ def pdf(request): return pdf -@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) +@pytest.mark.parametrize("aggregation", agg_params) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) - gdf_grouped = gdf.groupby("xx") - ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("xx") + gdf_grouped = gdf.groupby("xx", dropna=True) + ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby( + "xx", dropna=True + ) if series: - gdf_grouped = gdf_grouped.xx - ddf_grouped = ddf_grouped.xx + gdf_grouped = gdf_grouped.x + ddf_grouped = ddf_grouped.x check_dtype = aggregation != "count" expect = getattr(gdf_grouped, aggregation)() actual = getattr(ddf_grouped, aggregation)() - assert_cudf_groupby_layers(actual) + if not QUERY_PLANNING_ON: + assert_cudf_groupby_layers(actual) dd.assert_eq(expect, actual, check_dtype=check_dtype) - expect = gdf_grouped.agg({"xx": aggregation}) - actual = ddf_grouped.agg({"xx": aggregation}) + if not series: + expect = gdf_grouped.agg({"x": aggregation}) + actual = ddf_grouped.agg({"x": aggregation}) - assert_cudf_groupby_layers(actual) + if not QUERY_PLANNING_ON: + assert_cudf_groupby_layers(actual) - dd.assert_eq(expect, actual, check_dtype=check_dtype) + dd.assert_eq(expect, actual, check_dtype=check_dtype) # TODO: explore adding support with `.agg()` @pytest.mark.parametrize("series", [True, False]) -@pytest.mark.parametrize( - "aggregation", - [ - "cumsum", - pytest.param( - "cumcount", - marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/13390" - ), - ), - ], -) +@pytest.mark.parametrize("aggregation", ["cumsum", "cumcount"]) def test_groupby_cumulative(aggregation, pdf, series): gdf = cudf.DataFrame.from_pandas(pdf) ddf = dask_cudf.from_cudf(gdf, npartitions=5) @@ -105,7 +110,7 @@ def test_groupby_cumulative(aggregation, pdf, series): dd.assert_eq(a, b) -@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) +@pytest.mark.parametrize("aggregation", agg_params) @pytest.mark.parametrize( "func", [ @@ -119,7 +124,6 @@ def test_groupby_cumulative(aggregation, pdf, series): ) def test_groupby_agg(func, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) - ddf = dask_cudf.from_cudf(gdf, npartitions=5) actual = func(ddf, aggregation) @@ -127,11 +131,12 @@ def test_groupby_agg(func, aggregation, pdf): check_dtype = aggregation != "count" - assert_cudf_groupby_layers(actual) + if not QUERY_PLANNING_ON: + assert_cudf_groupby_layers(actual) - # groupby.agg should add an explicit getitem layer - # to improve/enable column projection - assert hlg_layer(actual.dask, "getitem") + # groupby.agg should add an explicit getitem layer + # to improve/enable column projection + assert hlg_layer(actual.dask, "getitem") dd.assert_eq(expect, actual, check_names=False, check_dtype=check_dtype) @@ -574,6 +579,7 @@ def test_groupby_categorical_key(): dd.assert_eq(expect, got) +@xfail_dask_expr("as_index not supported in dask-expr") @pytest.mark.parametrize("as_index", [True, False]) @pytest.mark.parametrize("split_out", ["use_dask_default", 1, 2]) @pytest.mark.parametrize("split_every", [False, 4]) @@ -662,6 +668,7 @@ def test_groupby_agg_params(npartitions, split_every, split_out, as_index): dd.assert_eq(gf, pf) +@xfail_dask_expr("Newer dask-expr version needed") @pytest.mark.parametrize( "aggregations", [(sum, "sum"), (max, "max"), (min, "min")] ) @@ -700,6 +707,7 @@ def test_is_supported(arg, supported): assert _aggs_optimized(arg, OPTIMIZED_AGGS) is supported +@xfail_dask_expr("Fails on older versions of dask-expr") def test_groupby_unique_lists(): df = pd.DataFrame({"a": [0, 0, 0, 1, 1, 1], "b": [10, 10, 10, 7, 8, 9]}) gdf = cudf.from_pandas(df) @@ -746,6 +754,7 @@ def test_groupby_first_last(data, agg): ) +@xfail_dask_expr("Co-alignment check fails in dask-expr") def test_groupby_with_list_of_series(): df = cudf.DataFrame({"a": [1, 2, 3, 4, 5]}) gdf = dask_cudf.from_cudf(df, npartitions=2) @@ -760,6 +769,7 @@ def test_groupby_with_list_of_series(): ) +@xfail_dask_expr("Nested renamer not supported in dask-expr") @pytest.mark.parametrize( "func", [ @@ -812,12 +822,12 @@ def test_groupby_all_columns(func): ) ddf = dd.from_pandas(pdf, npartitions=5) - gddf = ddf.map_partitions(cudf.from_pandas) + gddf = ddf.to_backend("cudf") expect = func(ddf) actual = func(gddf) - dd.assert_eq(expect, actual) + dd.assert_eq(expect, actual, check_names=not QUERY_PLANNING_ON) def test_groupby_shuffle(): @@ -855,13 +865,14 @@ def test_groupby_shuffle(): got = gddf.groupby("a", sort=False).agg(spec, split_out=2) dd.assert_eq(expect, got.compute().sort_index()) - # Sorted aggregation fails with split_out>1 when shuffle is False - # (sort=True, split_out=2, shuffle_method=False) - with pytest.raises(ValueError): - gddf.groupby("a", sort=True).agg( - spec, shuffle_method=False, split_out=2 - ) + if not QUERY_PLANNING_ON: + # Sorted aggregation fails with split_out>1 when shuffle is False + # (sort=True, split_out=2, shuffle_method=False) + with pytest.raises(ValueError): + gddf.groupby("a", sort=True).agg( + spec, shuffle_method=False, split_out=2 + ) - # Check shuffle kwarg deprecation - with pytest.warns(match="'shuffle' keyword is deprecated"): - gddf.groupby("a", sort=True).agg(spec, shuffle=False) + # Check shuffle kwarg deprecation + with pytest.warns(match="'shuffle' keyword is deprecated"): + gddf.groupby("a", sort=True).agg(spec, shuffle=False) diff --git a/python/dask_cudf/dask_cudf/tests/test_join.py b/python/dask_cudf/dask_cudf/tests/test_join.py index eb500ad2462..42ecc130298 100644 --- a/python/dask_cudf/dask_cudf/tests/test_join.py +++ b/python/dask_cudf/dask_cudf/tests/test_join.py @@ -163,7 +163,7 @@ def test_merge_left( } ) - expect = left.merge(right, on=("x", "y"), how=how) + expect = left.merge(right, on=["x", "y"], how=how) def normalize(df): return ( @@ -176,7 +176,7 @@ def normalize(df): left = dask_cudf.from_cudf(left, chunksize=chunksize) right = dask_cudf.from_cudf(right, chunksize=chunksize) - result = left.merge(right, on=("x", "y"), how=how).compute( + result = left.merge(right, on=["x", "y"], how=how).compute( scheduler="single-threaded" ) diff --git a/python/dask_cudf/dask_cudf/tests/test_onehot.py b/python/dask_cudf/dask_cudf/tests/test_onehot.py index 6453d843467..96646f85f74 100644 --- a/python/dask_cudf/dask_cudf/tests/test_onehot.py +++ b/python/dask_cudf/dask_cudf/tests/test_onehot.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2022, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. import pandas as pd import pytest @@ -8,6 +8,10 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import xfail_dask_expr + +# No dask-expr support +pytestmark = xfail_dask_expr("limited get_dummy support in dask-expr + cudf") def test_get_dummies_cat(): diff --git a/python/dask_cudf/dask_cudf/tests/test_reductions.py b/python/dask_cudf/dask_cudf/tests/test_reductions.py index 8688f830dcb..c3056f2607c 100644 --- a/python/dask_cudf/dask_cudf/tests/test_reductions.py +++ b/python/dask_cudf/dask_cudf/tests/test_reductions.py @@ -68,7 +68,7 @@ def test_series_reduce(reducer): ) def test_rowwise_reductions(data, op): gddf = dask_cudf.from_cudf(data, npartitions=10) - pddf = gddf.to_dask_dataframe() + pddf = gddf.to_backend("pandas") with dask.config.set({"dataframe.convert-string": False}): if op in ("var", "std"): diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index 8cf621da1bf..9184ad996ad 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -10,10 +10,26 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import xfail_dask_expr @pytest.mark.parametrize("ascending", [True, False]) -@pytest.mark.parametrize("by", ["a", "b", "c", "d", ["a", "b"], ["c", "d"]]) +@pytest.mark.parametrize( + "by", + [ + "a", + "b", + "c", + pytest.param( + "d", + marks=xfail_dask_expr( + "Dask-expr fails to sort by categorical column." + ), + ), + ["a", "b"], + ["c", "d"], + ], +) @pytest.mark.parametrize("nelem", [10, 500]) @pytest.mark.parametrize("nparts", [1, 10]) def test_sort_values(nelem, nparts, by, ascending): @@ -56,6 +72,7 @@ def test_sort_repartition(): dd.assert_eq(len(new_ddf), len(ddf)) +@xfail_dask_expr("dask-expr code path fails with nulls") @pytest.mark.parametrize("na_position", ["first", "last"]) @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) @@ -117,10 +134,6 @@ def test_sort_values_empty_string(by): def test_disk_shuffle(): - try: - from dask.dataframe.dispatch import partd_encode_dispatch # noqa: F401 - except ImportError: - pytest.skip("need a version of dask that has partd_encode_dispatch") df = cudf.DataFrame({"a": [1, 2, 3] * 20, "b": [4, 5, 6, 7] * 15}) ddf = dd.from_pandas(df, npartitions=4) got = dd.DataFrame.shuffle(ddf, "a", shuffle_method="disk") diff --git a/python/dask_cudf/dask_cudf/tests/utils.py b/python/dask_cudf/dask_cudf/tests/utils.py index 88a2116fb0a..e838b8d63bc 100644 --- a/python/dask_cudf/dask_cudf/tests/utils.py +++ b/python/dask_cudf/dask_cudf/tests/utils.py @@ -1,12 +1,15 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. import numpy as np import pandas as pd +import pytest import dask.dataframe as dd import cudf +from dask_cudf.expr import QUERY_PLANNING_ON + def _make_random_frame(nelem, npartitions=2, include_na=False): df = pd.DataFrame( @@ -19,3 +22,14 @@ def _make_random_frame(nelem, npartitions=2, include_na=False): gdf = cudf.DataFrame.from_pandas(df) dgf = dd.from_pandas(gdf, npartitions=npartitions) return df, dgf + + +_default_reason = "Not compatible with dask-expr" + + +def skip_dask_expr(reason=_default_reason): + return pytest.mark.skipif(QUERY_PLANNING_ON, reason=reason) + + +def xfail_dask_expr(reason=_default_reason): + return pytest.mark.xfail(QUERY_PLANNING_ON, reason=reason) diff --git a/python/dask_cudf/pyproject.toml b/python/dask_cudf/pyproject.toml index 4ecfc4f3f85..21aaa17a6c7 100644 --- a/python/dask_cudf/pyproject.toml +++ b/python/dask_cudf/pyproject.toml @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. [build-system] build-backend = "setuptools.build_meta" @@ -39,6 +39,8 @@ classifiers = [ [project.entry-points."dask.dataframe.backends"] cudf = "dask_cudf.backends:CudfBackendEntrypoint" +[project.entry-points."dask_expr.dataframe.backends"] +cudf = "dask_cudf.backends:CudfDXBackendEntrypoint" [project.optional-dependencies] test = [