Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#4419: Extend virtual partitioning API to pandas on Dask #4420

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
28cdc19
FEAT-#4419: Extend virtual partitioning API to pandas on Dask
RehanSD Apr 25, 2022
15b0072
Format code
RehanSD Apr 25, 2022
ae4eb6c
Fix list_of_blocks add list_of_ips
RehanSD Apr 26, 2022
026bd14
Fix documentation
RehanSD Apr 26, 2022
fa377a8
Fix docs
RehanSD Apr 26, 2022
b9d68f7
Update modin/core/execution/dask/implementations/pandas_on_dask/parti…
RehanSD May 2, 2022
17a9c65
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 13, 2022
348272d
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 15, 2022
6784b4b
Add comments to virtual partition, and update drain call queue to acc…
RehanSD Jun 15, 2022
4efd83b
lint
RehanSD Jun 15, 2022
af4edbc
copy rebalance_partitions implementation to dask virtual partitions
RehanSD Jun 15, 2022
0ac6de3
Fix rebalance_partitions, update groupby test, and add test for virtu…
RehanSD Jun 16, 2022
8c44eb1
Add name to release notes
RehanSD Jun 16, 2022
a5af1ff
lint
RehanSD Jun 16, 2022
971e00a
Refactor to reduce code redundancy
RehanSD Jun 16, 2022
87cad7c
fix docs
RehanSD Jun 16, 2022
0de19cd
Apply suggestions from code review
RehanSD Jun 16, 2022
4f5dd50
update tests
RehanSD Jun 16, 2022
f5d3eb1
Merge branch 'rehan/virtual_partitioning_dask' of https://github.com/…
RehanSD Jun 16, 2022
5e748cd
Fix typo in tests
RehanSD Jun 16, 2022
0cc25c4
Add more comprehensive test
RehanSD Jun 16, 2022
43296b2
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 17, 2022
66c4360
Resolve col width issue
RehanSD Jun 19, 2022
9f48f0d
lint
RehanSD Jun 19, 2022
8dab0d1
flake8
RehanSD Jun 19, 2022
2d9f150
Resolve col width issue without hurting perf
RehanSD Jun 20, 2022
9a945e8
Move concat into testr
RehanSD Jun 27, 2022
5b322ee
Address review comments
RehanSD Jul 12, 2022
a761182
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jul 12, 2022
6a9855c
fix typos
RehanSD Jul 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ specifically for the `PandasOnDask` execution.

* :doc:`PandasOnDaskDataframe <dataframe>`
* :doc:`PandasOnDaskDataframePartition <partitioning/partition>`
* :doc:`PandasOnDaskDataframeAxisPartition <partitioning/axis_partition>`
* :doc:`PandasOnDaskDataframeVirtualPartition <partitioning/axis_partition>`
* :doc:`PandasOnDaskDataframePartitionManager <partitioning/partition_manager>`

.. toctree::
Expand Down Expand Up @@ -80,4 +80,4 @@ the user query to execute it on Dask workers. Then, the :py:class:`~modin.core.e
that will be written into the file in parallel in Dask workers.

.. note::
Currently, data egress uses default `pandas` implementation for `pandas on Dask` execution.
Currently, data egress uses default `pandas` implementation for `pandas on Dask` execution.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
PandasOnDaskDataframeAxisPartition
""""""""""""""""""""""""""""""""""
PandasOnDaskDataframeVirtualPartition
YarShev marked this conversation as resolved.
Show resolved Hide resolved
"""""""""""""""""""""""""""""""""""""

The class is the specific implementation of :py:class:`~modin.core.dataframe.pandas.partitioning.virtual_partition.PandasDataframeAxisPartition`,
The class is the specific implementation of :py:class:`~modin.core.dataframe.pandas.partitioning.virtual_partition.PandasOnDaskDataframeVirtualPartition`,
providing the API to perform operations on an axis (column or row) partition using Dask as the execution engine.
The axis partition is a wrapper over a list of block partitions that are stored in this class.

Public API
----------

.. autoclass:: modin.core.execution.dask.implementations.pandas_on_dask.partitioning.virtual_partition.PandasOnDaskDataframeAxisPartition
.. autoclass:: modin.core.execution.dask.implementations.pandas_on_dask.partitioning.virtual_partition.PandasOnDaskDataframeVirtualPartition
:members:

PandasOnDaskDataframeColumnPartition
Expand Down
2 changes: 2 additions & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ Key Features and Updates
* Dependencies
*
* New Features
* FEAT-#4419: Extend virtual partitioning API to pandas on Dask (#4420)

Contributors
------------
@mvashishtha
@NickCrews
@prutskov
@vnlitvinov
@RehanSD
119 changes: 114 additions & 5 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from modin.error_message import ErrorMessage
from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.core.dataframe.pandas.utils import concatenate
from modin.config import NPartitions, ProgressBar, BenchmarkMode
from modin.config import NPartitions, ProgressBar, BenchmarkMode, Engine, StorageFormat

import os

Expand Down Expand Up @@ -615,11 +615,15 @@ def concat(cls, axis, left_parts, right_parts):
to_concat = (
[left_parts] + right_parts if left_parts.size != 0 else right_parts
)
return (
result = (
np.concatenate(to_concat, axis=axis) if len(to_concat) else left_parts
)
else:
return np.append(left_parts, right_parts, axis=axis)
result = np.append(left_parts, right_parts, axis=axis)
if axis == 0:
return cls.rebalance_partitions(result)
else:
return result

@classmethod
def to_pandas(cls, partitions):
Expand Down Expand Up @@ -1292,7 +1296,15 @@ def finalize(cls, partitions):
@classmethod
def rebalance_partitions(cls, partitions):
"""
Return the provided array of partitions without rebalancing it.
Rebalance a 2-d array of partitions if we are using PandasOnRay or PandasOnDask executions.
YarShev marked this conversation as resolved.
Show resolved Hide resolved

For all other executions, the partitions are returned unchanged.

Rebalance the partitions by building a new array
of partitions out of the original ones so that:

- If all partitions have a length, each new partition has roughly the same number of rows.
- Otherwise, each new partition spans roughly the same number of old partitions.

Parameters
----------
Expand All @@ -1302,6 +1314,103 @@ def rebalance_partitions(cls, partitions):
Returns
-------
np.ndarray
The same 2-d array.
A NumPy array with the same; or new, rebalanced, partitions, depending on the execution
engine and storage format.
"""
if Engine.get() in ["Ray", "Dask"] and StorageFormat.get() == "Pandas":
# Rebalancing partitions is currently only implemented for PandasOnRay and PandasOnDask.
# We rebalance when the ratio of the number of existing partitions to
# the ideal number of partitions is larger than this threshold. The
# threshold is a heuristic that may need to be tuned for performance.
max_excess_of_num_partitions = 1.5
num_existing_partitions = partitions.shape[0]
ideal_num_new_partitions = NPartitions.get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: ideal_num_new_partitions doesn't account for the total number of vCPUs in a cluster right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can override NPartitions.get() to do so. It just asks Ray how many cores there are, and returns that, so as long as your cluster is fully initialized when modin is initialized, and you set IsRayCluster correctly, it should.

if (
num_existing_partitions
<= ideal_num_new_partitions * max_excess_of_num_partitions
):
return partitions
# If any partition has an unknown length, give each axis partition
# roughly the same number of row partitions. We use `_length_cache` here
# to avoid materializing any unmaterialized lengths.
if any(
partition._length_cache is None
for row in partitions
for partition in row
):
# We need each partition to go into an axis partition, but the
# number of axis partitions may not evenly divide the number of
# partitions.
chunk_size = compute_chunksize(
YarShev marked this conversation as resolved.
Show resolved Hide resolved
num_existing_partitions, ideal_num_new_partitions, min_block_size=1
)
return np.array(
[
cls.column_partitions(
partitions[i : i + chunk_size],
full_axis=False,
)
for i in range(
0,
num_existing_partitions,
chunk_size,
)
]
)

# If we know the number of rows in every partition, then we should try
# instead to give each new partition roughly the same number of rows.
new_partitions = []
# `start` is the index of the first existing partition that we want to
# put into the current new partition.
start = 0
total_rows = sum(part.length() for part in partitions[:, 0])
ideal_partition_size = compute_chunksize(
total_rows, ideal_num_new_partitions, min_block_size=1
)
for _ in range(ideal_num_new_partitions):
# We might pick up old partitions too quickly and exhaust all of them.
if start >= len(partitions):
break
Comment on lines +1373 to +1374
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand when this would happen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above kind of explains, but basically, if we have very, very small partitions, we may want to coalesce more than we actually have - e.g. if we have rebalanced all but the last three partitions, and those three alone are not enough to make a new partition, we would run off the end of the list before we've satisfied the min constraint for the length of the new partition, and hit this case.

# `stop` is the index of the last existing partition so far that we
# want to put into the current new partition.
stop = start
partition_size = partitions[start][0].length()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is length a blocking call? Do we have to materialize things in memory to get this value or is it stored as metadata?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to cache it, but it will block if it is not cached. If it is blocking, it submits a remote task that gets the length of the object, and only materializes the length in memory on the main node.

# Add existing partitions into the current new partition until the
# number of rows in the new partition hits `ideal_partition_size`.
while stop < len(partitions) and partition_size < ideal_partition_size:
stop += 1
if stop < len(partitions):
partition_size += partitions[stop][0].length()
# If the new partition is larger than we want, split the last
# current partition that it contains into two partitions, where
# the first partition has just enough rows to make the current
# new partition have length `ideal_partition_size`, and the second
# partition has the remainder.
if partition_size > ideal_partition_size * max_excess_of_num_partitions:
new_last_partition_size = ideal_partition_size - sum(
row[0].length() for row in partitions[start:stop]
)
YarShev marked this conversation as resolved.
Show resolved Hide resolved
partitions = np.insert(
partitions,
stop + 1,
[
obj.mask(slice(new_last_partition_size, None), slice(None))
for obj in partitions[stop]
],
0,
)
partitions[stop, :] = [
obj.mask(slice(None, new_last_partition_size), slice(None))
for obj in partitions[stop]
]
partition_size = ideal_partition_size
new_partitions.append(
cls.column_partitions(
(partitions[start : stop + 1]),
full_axis=partition_size == total_rows,
)
)
start = stop + 1
return np.array(new_partitions)
return partitions
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from ..partitioning.partition_manager import PandasOnDaskDataframePartitionManager
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper


class PandasOnDaskDataframe(PandasDataframe):
Expand All @@ -41,22 +42,63 @@ class PandasOnDaskDataframe(PandasDataframe):

_partition_mgr_cls = PandasOnDaskDataframePartitionManager

def _get_partition_size_along_axis(self, partition, axis=0):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per #4494, we are currently calculating partition shapes serially in other places than the one you changed here, e.g. here for Ray, in Ray virtual partitioning length, and in the same place length for the new dask virtual partition class. The dask blocking means that this function actually blocks on inner partitions if the Modin frame consists of virtual partitions that are themselves made of virtual partitions.

In my opinion, we should parallelize getting all the partition shapes correctly in a separate fix for #4494.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I'm wrong about this function blocking on inner partitions, because it doesn't call length or width. So I think it's correct. Still, this part seems outside the scope of this PR. I'd rather worry about this fix in a separate PR for #4494.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to include here, since the original code to get length + width in parallel breaks when applied to axis partitions so I'll need to fix that code anyways, and if I'm doing that, I may as well fix the code to just do it all in parallel right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the original code was broken, I think it's okay to include the partial fix for #4494 here.

"""
Compute the length along the specified axis of the specified partition.

Parameters
----------
partition : ``PandasOnDaskDataframeVirtualPartition`` or ``PandasOnDaskDataframePartition``
The partition whose size to compute.
axis : int, default: 0
The axis along which to compute size.

Returns
-------
list
A list of lengths along the specified axis that sum to the overall length of the partition
along the specified axis.

Notes
-----
This utility function is used to ensure that computation occurs asynchronously across all partitions
whether the partitions are virtual or physical partitions.
"""
if isinstance(partition, self._partition_mgr_cls._partition_class):
return [
partition.apply(
lambda df: len(df) if not axis else len(df.columns)
)._data
]
elif partition.axis == axis:
return [
ptn.apply(lambda df: len(df) if not axis else len(df.columns))._data
for ptn in partition.list_of_partitions_to_combine
]
return [
partition.list_of_partitions_to_combine[0]
.apply(lambda df: len(df) if not axis else (len(df.columns)))
._data
]

@property
def _row_lengths(self):
"""
Compute the row partitions lengths if they are not cached.
Compute ther row partitions lengths if they are not cached.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Compute ther row partitions lengths if they are not cached.
Compute the row partition lengths if they are not cached.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not fixed


Returns
-------
list
A list of row partitions lengths.
"""
if self._row_lengths_cache is None:
self._row_lengths_cache = (
self._partition_mgr_cls.get_objects_from_partitions(
[obj.apply(lambda df: len(df)) for obj in self._partitions.T[0]]
)
row_lengths_list = DaskWrapper.materialize(
[
YarShev marked this conversation as resolved.
Show resolved Hide resolved
self._get_partition_size_along_axis(obj, axis=0)
for obj in self._partitions.T[0]
]
)
self._row_lengths_cache = [sum(len_list) for len_list in row_lengths_list]
return self._row_lengths_cache

@property
Expand All @@ -70,12 +112,13 @@ def _column_widths(self):
A list of column partitions widths.
"""
if self._column_widths_cache is None:
self._column_widths_cache = (
self._partition_mgr_cls.get_objects_from_partitions(
[
obj.apply(lambda df: len(df.columns))
for obj in self._partitions[0]
]
)
col_widths_list = DaskWrapper.materialize(
YarShev marked this conversation as resolved.
Show resolved Hide resolved
[
self._get_partition_size_along_axis(obj, axis=1)
for obj in self._partitions[0]
]
)
self._column_widths_cache = [
sum(width_list) for width_list in col_widths_list
]
return self._column_widths_cache