From 5ad586d208f1d54c111365d873baa9c3a6ea0214 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Fri, 27 Aug 2021 06:51:52 -0700 Subject: [PATCH 1/9] add fast path for multi-column sorting --- dask_sql/physical/utils/sort.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 1170ba07c..31932577f 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -5,6 +5,15 @@ from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column +def multi_col_sort( + df: dd.DataFrame, + sort_columns: List[str], + sort_ascending: List[bool], + sort_null_first: List[bool], +) -> dd.DataFrame: + + df = df.sort_values(sort_columns) + return df.persist() def apply_sort( df: dd.DataFrame, @@ -12,6 +21,19 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: + + # Try fast path for multi-column sorting before falling back to + # sort_partition_func. Tools like dask-cudf have a limited but fast + # multi-column sort implementation. We check if any sorting/null sorting + # is required. If so, we fall back to default sorting implementation + if any(sort_null_first) is False and all(sort_ascending) is True: + try: + return multi_col_sort(df, sort_columns, sort_ascending, + sort_null_first) + except NotImplementedError: + pass + + # Split the first column. We need to handle this one with set_index first_sort_column = sort_columns[0] first_sort_ascending = sort_ascending[0] From 8fc9a7ba2f744da80c7a63b300a149e0adcd5807 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Fri, 27 Aug 2021 07:26:50 -0700 Subject: [PATCH 2/9] lint --- dask_sql/physical/utils/sort.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 31932577f..6909b0998 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -5,6 +5,7 @@ from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column + def multi_col_sort( df: dd.DataFrame, sort_columns: List[str], @@ -15,6 +16,7 @@ def multi_col_sort( df = df.sort_values(sort_columns) return df.persist() + def apply_sort( df: dd.DataFrame, sort_columns: List[str], @@ -28,12 +30,10 @@ def apply_sort( # is required. If so, we fall back to default sorting implementation if any(sort_null_first) is False and all(sort_ascending) is True: try: - return multi_col_sort(df, sort_columns, sort_ascending, - sort_null_first) + return multi_col_sort(df, sort_columns, sort_ascending, sort_null_first) except NotImplementedError: pass - # Split the first column. We need to handle this one with set_index first_sort_column = sort_columns[0] first_sort_ascending = sort_ascending[0] From c86cdab09ffea861bd18134a2c12bfc67cfff7fa Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 31 Aug 2021 09:13:21 -0700 Subject: [PATCH 3/9] Prevent single column Dask dataframes from calling sort_values --- dask_sql/physical/utils/sort.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 6909b0998..9b6833597 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -1,6 +1,7 @@ from typing import List import dask.dataframe as dd +import dask_cudf import pandas as pd from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column @@ -28,10 +29,14 @@ def apply_sort( # sort_partition_func. Tools like dask-cudf have a limited but fast # multi-column sort implementation. We check if any sorting/null sorting # is required. If so, we fall back to default sorting implementation - if any(sort_null_first) is False and all(sort_ascending) is True: + if ( + isinstance(df, dask_cudf.DataFrame) + and all(sort_ascending) + and not any(sort_null_first) + ): try: return multi_col_sort(df, sort_columns, sort_ascending, sort_null_first) - except NotImplementedError: + except ValueError: pass # Split the first column. We need to handle this one with set_index From d321ca3e3fe100287f8c892111b018fef24119a8 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 2 Sep 2021 07:01:46 -0700 Subject: [PATCH 4/9] Wrap dask_cudf import in try/except block --- dask_sql/physical/utils/sort.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 9b6833597..675e17f60 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -1,11 +1,15 @@ from typing import List import dask.dataframe as dd -import dask_cudf import pandas as pd from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column +try: + import dask_cudf +except ImportError: + dask_cudf = None + def multi_col_sort( df: dd.DataFrame, @@ -30,7 +34,8 @@ def apply_sort( # multi-column sort implementation. We check if any sorting/null sorting # is required. If so, we fall back to default sorting implementation if ( - isinstance(df, dask_cudf.DataFrame) + dask_cudf is not None + and isinstance(df, dask_cudf.DataFrame) and all(sort_ascending) and not any(sort_null_first) ): From ed6522883dc48b42833edf3a8fabd2a60c4a3eca Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 2 Sep 2021 07:09:36 -0700 Subject: [PATCH 5/9] Add test for fast multi column sort --- dask_sql/physical/utils/sort.py | 1 - tests/integration/fixtures.py | 8 ++++++++ tests/integration/test_dask_cudf.py | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_dask_cudf.py diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 675e17f60..e59e6540d 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -28,7 +28,6 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: - # Try fast path for multi-column sorting before falling back to # sort_partition_func. Tools like dask-cudf have a limited but fast # multi-column sort implementation. We check if any sorting/null sorting diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 4566d3690..8129aed61 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -9,6 +9,11 @@ from dask.distributed import Client from pandas.testing import assert_frame_equal +try: + import dask_cudf +except ImportError: + dask_cudf = None + @pytest.fixture() def timeseries_df(c): @@ -117,6 +122,9 @@ def c( for df_name, df in dfs.items(): dask_df = dd.from_pandas(df, npartitions=3) c.create_table(df_name, dask_df) + if dask_cudf is not None: + cudf_df = dask_cudf.from_dask_dataframe(dask_df) + c.create_table("cudf_" + df_name, cudf_df) yield c diff --git a/tests/integration/test_dask_cudf.py b/tests/integration/test_dask_cudf.py new file mode 100644 index 000000000..add15f1ce --- /dev/null +++ b/tests/integration/test_dask_cudf.py @@ -0,0 +1,22 @@ +import pandas as pd +import pytest +from pandas.testing import assert_frame_equal + +pytest.importorskip("dask_cudf") + + +def test_cudf_order_by(c): + df = c.sql( + """ + SELECT + * + FROM cudf_user_table_1 + ORDER BY user_id + """ + ) + df = df.compute().to_pandas() + + expected_df = pd.DataFrame( + {"user_id": [2, 1, 2, 3], "b": [3, 3, 1, 3]} + ).sort_values(by="user_id") + assert_frame_equal(df, expected_df) From 76eb2aa4313e49d743b2155dc90ad10865d1e19d Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 2 Sep 2021 07:39:45 -0700 Subject: [PATCH 6/9] Move multi_col_sort contents to apply_sort --- dask_sql/physical/utils/sort.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index e59e6540d..ae6a9f7f7 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -3,7 +3,7 @@ import dask.dataframe as dd import pandas as pd -from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column +from dask_sql.utils import make_pickable_without_dask_sql try: import dask_cudf @@ -11,17 +11,6 @@ dask_cudf = None -def multi_col_sort( - df: dd.DataFrame, - sort_columns: List[str], - sort_ascending: List[bool], - sort_null_first: List[bool], -) -> dd.DataFrame: - - df = df.sort_values(sort_columns) - return df.persist() - - def apply_sort( df: dd.DataFrame, sort_columns: List[str], @@ -39,7 +28,8 @@ def apply_sort( and not any(sort_null_first) ): try: - return multi_col_sort(df, sort_columns, sort_ascending, sort_null_first) + df = df.sort_values(sort_columns) + return df.persist() except ValueError: pass From 927c6185751be62855299cd02e7ea5fc881a9608 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 10 Sep 2021 06:51:38 -0700 Subject: [PATCH 7/9] Ignore index for dask-cudf sorting --- dask_sql/physical/utils/sort.py | 2 +- tests/integration/test_dask_cudf.py | 22 +++++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index ae6a9f7f7..85075b0bb 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -28,7 +28,7 @@ def apply_sort( and not any(sort_null_first) ): try: - df = df.sort_values(sort_columns) + df = df.sort_values(sort_columns, ignore_index=True) return df.persist() except ValueError: pass diff --git a/tests/integration/test_dask_cudf.py b/tests/integration/test_dask_cudf.py index add15f1ce..d8ec7bb66 100644 --- a/tests/integration/test_dask_cudf.py +++ b/tests/integration/test_dask_cudf.py @@ -1,9 +1,9 @@ -import pandas as pd import pytest -from pandas.testing import assert_frame_equal pytest.importorskip("dask_cudf") +from cudf.testing._utils import assert_eq + def test_cudf_order_by(c): df = c.sql( @@ -13,10 +13,18 @@ def test_cudf_order_by(c): FROM cudf_user_table_1 ORDER BY user_id """ + ).compute() + + expected_df = ( + c.sql( + """ + SELECT + * + FROM cudf_user_table_1 + """ + ) + .sort_values(by="user_id", ignore_index=True) + .compute() ) - df = df.compute().to_pandas() - expected_df = pd.DataFrame( - {"user_id": [2, 1, 2, 3], "b": [3, 3, 1, 3]} - ).sort_values(by="user_id") - assert_frame_equal(df, expected_df) + assert_eq(df, expected_df) From 963ad5e2f16a2303a92492c96423fe2b173818f4 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 10 Sep 2021 06:55:42 -0700 Subject: [PATCH 8/9] Fix show tables test for cudf enabled fixture --- tests/integration/test_show.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/integration/test_show.py b/tests/integration/test_show.py index 2165699ca..aeb1af7b5 100644 --- a/tests/integration/test_show.py +++ b/tests/integration/test_show.py @@ -2,6 +2,11 @@ import pytest from pandas.testing import assert_frame_equal +try: + import dask_cudf +except ImportError: + dask_cudf = None + def test_schemas(c): df = c.sql("SHOW SCHEMAS") @@ -36,6 +41,27 @@ def test_tables(c): "string_table", "datetime_table", ] + if dask_cudf is None + else [ + "df_simple", + "cudf_df_simple", + "df", + "cudf_df", + "user_table_1", + "cudf_user_table_1", + "user_table_2", + "cudf_user_table_2", + "long_table", + "cudf_long_table", + "user_table_inf", + "cudf_user_table_inf", + "user_table_nan", + "cudf_user_table_nan", + "string_table", + "cudf_string_table", + "datetime_table", + "cudf_datetime_table", + ] } ) From 5fb3c41ab574cf44057d445981d5e7fc47660561 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 30 Sep 2021 08:59:31 -0700 Subject: [PATCH 9/9] Trigger CI