Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#4419: Extend virtual partitioning API to pandas on Dask #4420

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
28cdc19
FEAT-#4419: Extend virtual partitioning API to pandas on Dask
RehanSD Apr 25, 2022
15b0072
Format code
RehanSD Apr 25, 2022
ae4eb6c
Fix list_of_blocks add list_of_ips
RehanSD Apr 26, 2022
026bd14
Fix documentation
RehanSD Apr 26, 2022
fa377a8
Fix docs
RehanSD Apr 26, 2022
b9d68f7
Update modin/core/execution/dask/implementations/pandas_on_dask/parti…
RehanSD May 2, 2022
17a9c65
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 13, 2022
348272d
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 15, 2022
6784b4b
Add comments to virtual partition, and update drain call queue to acc…
RehanSD Jun 15, 2022
4efd83b
lint
RehanSD Jun 15, 2022
af4edbc
copy rebalance_partitions implementation to dask virtual partitions
RehanSD Jun 15, 2022
0ac6de3
Fix rebalance_partitions, update groupby test, and add test for virtu…
RehanSD Jun 16, 2022
8c44eb1
Add name to release notes
RehanSD Jun 16, 2022
a5af1ff
lint
RehanSD Jun 16, 2022
971e00a
Refactor to reduce code redundancy
RehanSD Jun 16, 2022
87cad7c
fix docs
RehanSD Jun 16, 2022
0de19cd
Apply suggestions from code review
RehanSD Jun 16, 2022
4f5dd50
update tests
RehanSD Jun 16, 2022
f5d3eb1
Merge branch 'rehan/virtual_partitioning_dask' of https://github.com/…
RehanSD Jun 16, 2022
5e748cd
Fix typo in tests
RehanSD Jun 16, 2022
0cc25c4
Add more comprehensive test
RehanSD Jun 16, 2022
43296b2
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 17, 2022
66c4360
Resolve col width issue
RehanSD Jun 19, 2022
9f48f0d
lint
RehanSD Jun 19, 2022
8dab0d1
flake8
RehanSD Jun 19, 2022
2d9f150
Resolve col width issue without hurting perf
RehanSD Jun 20, 2022
9a945e8
Move concat into testr
RehanSD Jun 27, 2022
5b322ee
Address review comments
RehanSD Jul 12, 2022
a761182
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jul 12, 2022
6a9855c
fix typos
RehanSD Jul 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ specifically for the `PandasOnDask` execution.

* :doc:`PandasOnDaskDataframe <dataframe>`
* :doc:`PandasOnDaskDataframePartition <partitioning/partition>`
* :doc:`PandasOnDaskDataframeAxisPartition <partitioning/axis_partition>`
* :doc:`PandasOnDaskDataframeVirtualPartition <partitioning/axis_partition>`
* :doc:`PandasOnDaskDataframePartitionManager <partitioning/partition_manager>`

.. toctree::
Expand Down Expand Up @@ -80,4 +80,4 @@ the user query to execute it on Dask workers. Then, the :py:class:`~modin.core.e
that will be written into the file in parallel in Dask workers.

.. note::
Currently, data egress uses default `pandas` implementation for `pandas on Dask` execution.
Currently, data egress uses default `pandas` implementation for `pandas on Dask` execution.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
PandasOnDaskDataframeAxisPartition
""""""""""""""""""""""""""""""""""
PandasOnDaskDataframeVirtualPartition
YarShev marked this conversation as resolved.
Show resolved Hide resolved
"""""""""""""""""""""""""""""""""""""

The class is the specific implementation of :py:class:`~modin.core.dataframe.pandas.partitioning.virtual_partition.PandasDataframeAxisPartition`,
The class is the specific implementation of :py:class:`~modin.core.dataframe.pandas.partitioning.virtual_partition.PandasOnDaskDataframeVirtualPartition`,
providing the API to perform operations on an axis (column or row) partition using Dask as the execution engine.
The axis partition is a wrapper over a list of block partitions that are stored in this class.

Public API
----------

.. autoclass:: modin.core.execution.dask.implementations.pandas_on_dask.partitioning.virtual_partition.PandasOnDaskDataframeAxisPartition
.. autoclass:: modin.core.execution.dask.implementations.pandas_on_dask.partitioning.virtual_partition.PandasOnDaskDataframeVirtualPartition
:members:

PandasOnDaskDataframeColumnPartition
Expand Down
2 changes: 2 additions & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ Key Features and Updates
* Dependencies
*
* New Features
* FEAT-#4419: Extend virtual partitioning API to pandas on Dask (#4420)

Contributors
------------
@mvashishtha
@prutskov
@RehanSD
119 changes: 114 additions & 5 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from modin.error_message import ErrorMessage
from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.core.dataframe.pandas.utils import concatenate
from modin.config import NPartitions, ProgressBar, BenchmarkMode
from modin.config import NPartitions, ProgressBar, BenchmarkMode, Engine, StorageFormat

import os

Expand Down Expand Up @@ -615,11 +615,15 @@ def concat(cls, axis, left_parts, right_parts):
to_concat = (
[left_parts] + right_parts if left_parts.size != 0 else right_parts
)
return (
result = (
np.concatenate(to_concat, axis=axis) if len(to_concat) else left_parts
)
else:
return np.append(left_parts, right_parts, axis=axis)
result = np.append(left_parts, right_parts, axis=axis)
if axis == 0:
return cls.rebalance_partitions(result)
else:
return result

@classmethod
def to_pandas(cls, partitions):
Expand Down Expand Up @@ -1297,7 +1301,15 @@ def finalize(cls, partitions):
@classmethod
def rebalance_partitions(cls, partitions):
"""
Return the provided array of partitions without rebalancing it.
Rebalance a 2-d array of partitions if we are using PandasOnRay or PandasOnDask executions.
YarShev marked this conversation as resolved.
Show resolved Hide resolved

For all other executions, the partitions are returned unchanged.

Rebalance the partitions by building a new array
of partitions out of the original ones so that:

- If all partitions have a length, each new partition has roughly the same number of rows.
- Otherwise, each new partition spans roughly the same number of old partitions.

Parameters
----------
Expand All @@ -1307,6 +1319,103 @@ def rebalance_partitions(cls, partitions):
Returns
-------
np.ndarray
The same 2-d array.
A NumPy array with the same; or new, rebalanced, partitions, depending on the execution
engine and storage format.
"""
if Engine.get() in ["Ray", "Dask"] and StorageFormat.get() == "Pandas":
# Rebalancing partitions is currently only implemented for PandasOnRay and PandasOnDask.
# We rebalance when the ratio of the number of existing partitions to
# the ideal number of partitions is larger than this threshold. The
# threshold is a heuristic that may need to be tuned for performance.
max_excess_of_num_partitions = 1.5
num_existing_partitions = partitions.shape[0]
ideal_num_new_partitions = NPartitions.get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: ideal_num_new_partitions doesn't account for the total number of vCPUs in a cluster right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can override NPartitions.get() to do so. It just asks Ray how many cores there are, and returns that, so as long as your cluster is fully initialized when modin is initialized, and you set IsRayCluster correctly, it should.

if (
num_existing_partitions
<= ideal_num_new_partitions * max_excess_of_num_partitions
):
return partitions
# If any partition has an unknown length, give each axis partition
# roughly the same number of row partitions. We use `_length_cache` here
# to avoid materializing any unmaterialized lengths.
if any(
partition._length_cache is None
for row in partitions
for partition in row
):
# We need each partition to go into an axis partition, but the
# number of axis partitions may not evenly divide the number of
# partitions.
chunk_size = compute_chunksize(
YarShev marked this conversation as resolved.
Show resolved Hide resolved
num_existing_partitions, ideal_num_new_partitions, min_block_size=1
)
return np.array(
[
cls.column_partitions(
partitions[i : i + chunk_size],
full_axis=False,
)
for i in range(
0,
num_existing_partitions,
chunk_size,
)
]
)

# If we know the number of rows in every partition, then we should try
# instead to give each new partition roughly the same number of rows.
new_partitions = []
# `start` is the index of the first existing partition that we want to
# put into the current new partition.
start = 0
total_rows = sum(part.length() for part in partitions[:, 0])
ideal_partition_size = compute_chunksize(
total_rows, ideal_num_new_partitions, min_block_size=1
)
for _ in range(ideal_num_new_partitions):
# We might pick up old partitions too quickly and exhaust all of them.
if start >= len(partitions):
break
Comment on lines +1373 to +1374
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand when this would happen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above kind of explains, but basically, if we have very, very small partitions, we may want to coalesce more than we actually have - e.g. if we have rebalanced all but the last three partitions, and those three alone are not enough to make a new partition, we would run off the end of the list before we've satisfied the min constraint for the length of the new partition, and hit this case.

# `stop` is the index of the last existing partition so far that we
# want to put into the current new partition.
stop = start
partition_size = partitions[start][0].length()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is length a blocking call? Do we have to materialize things in memory to get this value or is it stored as metadata?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to cache it, but it will block if it is not cached. If it is blocking, it submits a remote task that gets the length of the object, and only materializes the length in memory on the main node.

# Add existing partitions into the current new partition until the
# number of rows in the new partition hits `ideal_partition_size`.
while stop < len(partitions) and partition_size < ideal_partition_size:
stop += 1
if stop < len(partitions):
partition_size += partitions[stop][0].length()
# If the new partition is larger than we want, split the last
# current partition that it contains into two partitions, where
# the first partition has just enough rows to make the current
# new partition have length `ideal_partition_size`, and the second
# partition has the remainder.
if partition_size > ideal_partition_size * max_excess_of_num_partitions:
new_last_partition_size = ideal_partition_size - sum(
row[0].length() for row in partitions[start:stop]
)
YarShev marked this conversation as resolved.
Show resolved Hide resolved
partitions = np.insert(
partitions,
stop + 1,
[
obj.mask(slice(new_last_partition_size, None), slice(None))
for obj in partitions[stop]
],
0,
)
partitions[stop, :] = [
obj.mask(slice(None, new_last_partition_size), slice(None))
for obj in partitions[stop]
]
partition_size = ideal_partition_size
new_partitions.append(
cls.column_partitions(
(partitions[start : stop + 1]),
full_axis=partition_size == total_rows,
)
)
start = stop + 1
return np.array(new_partitions)
return partitions