Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dask_cudf groupby to use apply_concat_apply #11571

Merged
merged 8 commits into from
Sep 6, 2022

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Aug 19, 2022

Dask-cudf currently maintains a specialized groupby-aggregation code, this code is faster for cudf-based data than the upstream (dask.dataframe) code path. However, the custom implementation does not take advantage of Dask's apply_concat_apply function, even though the tree-reduction aspect of the algorithm is the same.

This PR refactors the dask_cudf groupby-aggregation code to use apply_concat_apply. This reduces the amount of code we will need to maintain in cudf, and should improve graph optimizations (like fusion).

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@rjzamora rjzamora added 2 - In Progress Currently a work in progress tech debt non-breaking Non-breaking change labels Aug 19, 2022
@github-actions github-actions bot added the Python Affects Python cuDF API. label Aug 19, 2022
@rjzamora rjzamora added dask Dask issue improvement Improvement / enhancement to an existing function labels Aug 19, 2022
@rjzamora
Copy link
Member Author

@charlesbluca - This PR will change the HLG layer(s) produced for a groupby aggregations. Do you expect this to break anything here or in dask-sql?

@charlesbluca
Copy link
Member

I wouldn't expect it to change anything here, I might need to make some changes to #10853 but don't expect those to be too extensive.

I can pull this PR in with dask-sql to see if anything ends up breaking there

@codecov
Copy link

codecov bot commented Aug 19, 2022

Codecov Report

❗ No coverage uploaded for pull request base (branch-22.10@cc15765). Click here to learn what that means.
Patch has no changes to coverable lines.

Additional details and impacted files
@@               Coverage Diff               @@
##             branch-22.10   #11571   +/-   ##
===============================================
  Coverage                ?   86.39%           
===============================================
  Files                   ?      145           
  Lines                   ?    22963           
  Branches                ?        0           
===============================================
  Hits                    ?    19840           
  Misses                  ?     3123           
  Partials                ?        0           

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

@charlesbluca
Copy link
Member

Was able to check this PR against the dask-sql groupby tests, looks like the only test that's failing is test_groupby_split_every which does a check on the exact number of keys in the HLG; we've since refactored this test to be less prone to upstream breakage in the datafusion branch, and it should be easy to incorporate those changes into main, so things should be good here 🙂

@rjzamora
Copy link
Member Author

Was able to check this PR against the dask-sql groupby tests

Thank you for taking the time to test this with dask-sql @charlesbluca - That is super helpful!

@rjzamora rjzamora marked this pull request as ready for review August 23, 2022 15:18
@rjzamora rjzamora requested a review from a team as a code owner August 23, 2022 15:18
@caryr35 caryr35 added this to PR-WIP in v22.10 Release via automation Aug 25, 2022
@caryr35 caryr35 moved this from PR-WIP to PR-Needs review in v22.10 Release Aug 25, 2022
@rjzamora rjzamora added 3 - Ready for Review Ready for review by team 4 - Needs Dask Reviewer and removed 2 - In Progress Currently a work in progress labels Aug 31, 2022
@rjzamora
Copy link
Member Author

Update: I used the proposed local_cudf_groubpy.py benchmark in rapidsai/dask-cuda#979 to compare this PR to branch-22.10. The results confirm that this PR should not have a significant effect on performance (though a slight improvement is possible):

This PR

Groupby benchmark
--------------------------------------------------------------------------------
Use shuffle               | False
Output partitions         | 1
Input partitions          | 100
Sort Groups               | False
Rows-per-chunk            | 1000000
Unique-group ratio        | 0.001
Protocol                  | tcp
Device(s)                 | 0,1,2,3
Tree-reduction width      | 8
RMM Pool                  | True
Worker thread(s)          | 1
Data processed            | 2.24 GiB
Output size               | 3.05 MiB
Number of workers         | 4
================================================================================
Wall clock                | Throughput
--------------------------------------------------------------------------------
594.12 ms                 | 3.76 GiB/s
469.67 ms                 | 4.76 GiB/s
450.69 ms                 | 4.96 GiB/s
================================================================================
Throughput                | 4.43 GiB/s +/- 329.85 MiB/s
Bandwidth                 | 145.60 MiB/s +/- 20.04 MiB/s
Wall clock                | 504.82 ms +/- 63.61 ms
================================================================================
Groupby benchmark
--------------------------------------------------------------------------------
Use shuffle               | False
Output partitions         | 1
Input partitions          | 100
Sort Groups               | False
Rows-per-chunk            | 1000000
Unique-group ratio        | 0.01
Protocol                  | tcp
Device(s)                 | 0,1,2,3
Tree-reduction width      | 8
RMM Pool                  | True
Worker thread(s)          | 1
Data processed            | 2.24 GiB
Output size               | 30.52 MiB
Number of workers         | 4
================================================================================
Wall clock                | Throughput
--------------------------------------------------------------------------------
1.61 s                    | 1.39 GiB/s
1.29 s                    | 1.74 GiB/s
1.35 s                    | 1.66 GiB/s
================================================================================
Throughput                | 1.58 GiB/s +/- 92.59 MiB/s
Bandwidth                 | 181.52 MiB/s +/- 16.37 MiB/s
Wall clock                | 1.41 s +/- 140.13 ms
================================================================================
Groupby benchmark
--------------------------------------------------------------------------------
Use shuffle               | False
Output partitions         | 1
Input partitions          | 100
Sort Groups               | False
Rows-per-chunk            | 1000000
Unique-group ratio        | 0.1
Protocol                  | tcp
Device(s)                 | 0,1,2,3
Tree-reduction width      | 8
RMM Pool                  | True
Worker thread(s)          | 1
Data processed            | 2.24 GiB
Output size               | 305.16 MiB
Number of workers         | 4
================================================================================
Wall clock                | Throughput
--------------------------------------------------------------------------------
6.18 s                    | 370.31 MiB/s
6.56 s                    | 348.73 MiB/s
6.59 s                    | 347.12 MiB/s
================================================================================
Throughput                | 355.08 MiB/s +/- 5.98 MiB/s
Bandwidth                 | 168.96 MiB/s +/- 13.96 MiB/s
Wall clock                | 6.45 s +/- 187.88 ms
================================================================================

branch-22.10

Groupby benchmark
--------------------------------------------------------------------------------
Use shuffle               | False
Output partitions         | 1
Input partitions          | 100
Sort Groups               | False
Rows-per-chunk            | 1000000
Unique-group ratio        | 0.001
Protocol                  | tcp
Device(s)                 | 0,1,2,3
Tree-reduction width      | 8
RMM Pool                  | True
Worker thread(s)          | 1
Data processed            | 2.24 GiB
Output size               | 3.05 MiB
Number of workers         | 4
================================================================================
Wall clock                | Throughput
--------------------------------------------------------------------------------
668.32 ms                 | 3.34 GiB/s
556.43 ms                 | 4.02 GiB/s
510.05 ms                 | 4.38 GiB/s
================================================================================
Throughput                | 3.87 GiB/s +/- 262.52 MiB/s
Bandwidth                 | 175.48 MiB/s +/- 15.17 MiB/s
Wall clock                | 578.27 ms +/- 66.43 ms
================================================================================
Groupby benchmark
--------------------------------------------------------------------------------
Use shuffle               | False
Output partitions         | 1
Input partitions          | 100
Sort Groups               | False
Rows-per-chunk            | 1000000
Unique-group ratio        | 0.01
Protocol                  | tcp
Device(s)                 | 0,1,2,3
Tree-reduction width      | 8
RMM Pool                  | True
Worker thread(s)          | 1
Data processed            | 2.24 GiB
Output size               | 30.52 MiB
Number of workers         | 4
================================================================================
Wall clock                | Throughput
--------------------------------------------------------------------------------
1.56 s                    | 1.43 GiB/s
1.36 s                    | 1.64 GiB/s
1.41 s                    | 1.59 GiB/s
================================================================================
Throughput                | 1.55 GiB/s +/- 55.12 MiB/s
Bandwidth                 | 189.05 MiB/s +/- 12.99 MiB/s
Wall clock                | 1.44 s +/- 86.88 ms
================================================================================
Groupby benchmark
--------------------------------------------------------------------------------
Use shuffle               | False
Output partitions         | 1
Input partitions          | 100
Sort Groups               | False
Rows-per-chunk            | 1000000
Unique-group ratio        | 0.1
Protocol                  | tcp
Device(s)                 | 0,1,2,3
Tree-reduction width      | 8
RMM Pool                  | True
Worker thread(s)          | 1
Data processed            | 2.24 GiB
Output size               | 305.16 MiB
Number of workers         | 4
================================================================================
Wall clock                | Throughput
--------------------------------------------------------------------------------
6.19 s                    | 369.61 MiB/s
7.06 s                    | 323.97 MiB/s
6.67 s                    | 343.37 MiB/s
================================================================================
Throughput                | 344.65 MiB/s +/- 10.68 MiB/s
Bandwidth                 | 152.58 MiB/s +/- 19.05 MiB/s
Wall clock                | 6.64 s +/- 356.58 ms
================================================================================

v22.10 Release automation moved this from PR-Needs review to PR-Reviewer approved Aug 31, 2022
Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor nits

python/dask_cudf/dask_cudf/groupby.py Outdated Show resolved Hide resolved
python/dask_cudf/dask_cudf/groupby.py Outdated Show resolved Hide resolved
python/dask_cudf/dask_cudf/groupby.py Outdated Show resolved Hide resolved
python/dask_cudf/dask_cudf/groupby.py Show resolved Hide resolved
python/dask_cudf/dask_cudf/groupby.py Show resolved Hide resolved
python/dask_cudf/dask_cudf/groupby.py Show resolved Hide resolved
python/dask_cudf/dask_cudf/groupby.py Show resolved Hide resolved
python/dask_cudf/dask_cudf/tests/test_groupby.py Outdated Show resolved Hide resolved
@rjzamora rjzamora added 5 - Ready to Merge Testing and reviews complete, ready to merge and removed 3 - Ready for Review Ready for review by team 4 - Needs Dask Reviewer labels Sep 6, 2022
@rjzamora
Copy link
Member Author

rjzamora commented Sep 6, 2022

@gpucibot merge

@rapids-bot rapids-bot bot merged commit 1742a4d into rapidsai:branch-22.10 Sep 6, 2022
v22.10 Release automation moved this from PR-Reviewer approved to Done Sep 6, 2022
@rjzamora rjzamora deleted the groupby-aca-update branch September 6, 2022 15:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5 - Ready to Merge Testing and reviews complete, ready to merge dask Dask issue improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

None yet

4 participants