Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#4419: Extend virtual partitioning API to pandas on Dask #4420

Merged
Merged
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
28cdc19
FEAT-#4419: Extend virtual partitioning API to pandas on Dask
RehanSD Apr 25, 2022
15b0072
Format code
RehanSD Apr 25, 2022
ae4eb6c
Fix list_of_blocks add list_of_ips
RehanSD Apr 26, 2022
026bd14
Fix documentation
RehanSD Apr 26, 2022
fa377a8
Fix docs
RehanSD Apr 26, 2022
b9d68f7
Update modin/core/execution/dask/implementations/pandas_on_dask/parti…
RehanSD May 2, 2022
17a9c65
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 13, 2022
348272d
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 15, 2022
6784b4b
Add comments to virtual partition, and update drain call queue to acc…
RehanSD Jun 15, 2022
4efd83b
lint
RehanSD Jun 15, 2022
af4edbc
copy rebalance_partitions implementation to dask virtual partitions
RehanSD Jun 15, 2022
0ac6de3
Fix rebalance_partitions, update groupby test, and add test for virtu…
RehanSD Jun 16, 2022
8c44eb1
Add name to release notes
RehanSD Jun 16, 2022
a5af1ff
lint
RehanSD Jun 16, 2022
971e00a
Refactor to reduce code redundancy
RehanSD Jun 16, 2022
87cad7c
fix docs
RehanSD Jun 16, 2022
0de19cd
Apply suggestions from code review
RehanSD Jun 16, 2022
4f5dd50
update tests
RehanSD Jun 16, 2022
f5d3eb1
Merge branch 'rehan/virtual_partitioning_dask' of https://github.com/…
RehanSD Jun 16, 2022
5e748cd
Fix typo in tests
RehanSD Jun 16, 2022
0cc25c4
Add more comprehensive test
RehanSD Jun 16, 2022
43296b2
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jun 17, 2022
66c4360
Resolve col width issue
RehanSD Jun 19, 2022
9f48f0d
lint
RehanSD Jun 19, 2022
8dab0d1
flake8
RehanSD Jun 19, 2022
2d9f150
Resolve col width issue without hurting perf
RehanSD Jun 20, 2022
9a945e8
Move concat into testr
RehanSD Jun 27, 2022
5b322ee
Address review comments
RehanSD Jul 12, 2022
a761182
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
RehanSD Jul 12, 2022
6a9855c
fix typos
RehanSD Jul 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from distributed import Future
from distributed.utils import get_ip
from dask.distributed import wait

import pandas

from modin.core.dataframe.pandas.partitioning.axis_partition import (
Expand All @@ -24,7 +26,7 @@
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper


class PandasOnDaskDataframeAxisPartition(PandasDataframeAxisPartition):
class PandasOnDaskDataframeVirtualPartition(PandasDataframeAxisPartition):
"""
The class implements the interface in ``PandasDataframeAxisPartition``.

Expand All @@ -38,21 +40,93 @@ class PandasOnDaskDataframeAxisPartition(PandasDataframeAxisPartition):
Whether or not the virtual partition encompasses the whole axis.
"""

def __init__(self, list_of_blocks, get_ip=False, full_axis=True):
if not full_axis:
raise NotImplementedError(
"Pandas on Dask execution requires full-axis partitions."
axis = None

def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None):
self.call_queue = call_queue or []
self.full_axis = full_axis
if isinstance(list_of_blocks, PandasOnDaskDataframePartition):
list_of_blocks = [list_of_blocks]
if not any(
isinstance(o, PandasOnDaskDataframeVirtualPartition) for o in list_of_blocks
):
self.list_of_partitions_to_combine = list_of_blocks
return

assert (
RehanSD marked this conversation as resolved.
Show resolved Hide resolved
len(
set(
o.axis
for o in list_of_blocks
if isinstance(o, PandasOnDaskDataframeVirtualPartition)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Instead of having two different loops here, can we combine them into one if/else case?

)
for obj in list_of_blocks:
obj.drain_call_queue()
# Unwrap from PandasDataframePartition object for ease of use
self.list_of_blocks = [obj.future for obj in list_of_blocks]
if get_ip:
self.list_of_ips = [obj._ip_cache for obj in list_of_blocks]
== 1
)
if (
next(
o
for o in list_of_blocks
if isinstance(o, PandasOnDaskDataframeVirtualPartition)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe a better variable name than o?

).axis
== self.axis
):
new_list_of_blocks = []
for o in list_of_blocks:
new_list_of_blocks.extend(
o.list_of_partitions_to_combine
) if isinstance(
0, PandasOnDaskDataframeVirtualPartition
RehanSD marked this conversation as resolved.
Show resolved Hide resolved
) else new_list_of_blocks.append(
o
)
self.list_of_partitions_to_combine = new_list_of_blocks
else:
self.list_of_partitions_to_combine = [
obj.force_materialization().list_of_partitions_to_combine[0]
if isinstance(obj, PandasOnDaskDataframeVirtualPartition)
else obj
for obj in list_of_blocks
]

partition_type = PandasOnDaskDataframePartition
instance_type = Future
Comment on lines 105 to 106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be located near to axis = None.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I can open a refactor PR for that if you think it warrants it, or I can just roll it in with another PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way of these is fine to me


@property
def list_of_blocks(self):
"""
Get the list of physical partition objects that compose this partition.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, but this means that our perspective of a partition might contain N dask partitions, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup - a virtual partition is just one or more block partitions (physical partitions) and an axis that they are aligned along.


Returns
-------
List
A list of ``distributed.Future``.
"""
# Defer draining call queue until we get the partitions
# TODO Look into draining call queue at the same time as the task
result = [None] * len(self.list_of_partitions_to_combine)
for idx, partition in enumerate(self.list_of_partitions_to_combine):
partition.drain_call_queue()
result[idx] = partition.future
return result

@property
def list_of_ips(self):
"""
Get the IPs holding the physical objects composing this partition.

Returns
-------
List
A list of IPs as ``distributed.Future`` or str.
"""
# Defer draining call queue until we get the ip address
result = [None] * len(self.list_of_partitions_to_combine)
for idx, partition in enumerate(self.list_of_partitions_to_combine):
partition.drain_call_queue()
result[idx] = partition._ip_cache
return result

@classmethod
def deploy_axis_func(
cls, axis, func, num_splits, kwargs, maintain_partitioning, *partitions
Expand Down Expand Up @@ -159,8 +233,193 @@ def _wrap_partitions(self, partitions):
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
]

def apply(
self,
func,
num_splits=None,
other_axis_partition=None,
maintain_partitioning=True,
**kwargs
):
"""
Apply a function to this axis partition along full axis.

Parameters
----------
func : callable
The function to apply.
num_splits : int, default: None
The number of times to split the result object.
other_axis_partition : PandasDataframeAxisPartition, default: None
Another `PandasDataframeAxisPartition` object to be applied
to func. This is for operations that are between two data sets.
maintain_partitioning : bool, default: True
Whether to keep the partitioning in the same
orientation as it was previously or not. This is important because we may be
operating on an individual AxisPartition and not touching the rest.
In this case, we have to return the partitioning to its previous
orientation (the lengths will remain the same). This is ignored between
two axis partitions.
**kwargs : dict
Additional keywords arguments to be passed in `func`.

Returns
-------
list
A list of `PandasOnDaskDataframeVirtualPartition` objects.
"""
if not self.full_axis:
num_splits = 1
if len(self.call_queue) > 0:
self.drain_call_queue()
Comment on lines +297 to +298
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have the if check for this encapsulated within drain_call_queue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - tbh I don't think there's really any huge reason to do so. It does save an extra call on the stack, but thats an extremely minuscule effect (so def not why). I think it just boils down to the idea that the drain_call_queue method (conceptually) should only be called when we have a call queue?

result = super(PandasOnDaskDataframeVirtualPartition, self).apply(
func, num_splits, other_axis_partition, maintain_partitioning, **kwargs
)
if self.full_axis:
return result
else:
return result[0]

def force_materialization(self, get_ip=False):
"""
Materialize partitions into a single partition.

Parameters
----------
get_ip : bool, default: False
Whether to get node ip address to a single partition or not.

Returns
-------
PandasOnDaskDataframeVirtualPartition
An axis partition containing only a single materialized partition.
"""
materialized = super(
PandasOnDaskDataframeVirtualPartition, self
).force_materialization(get_ip=get_ip)
self.list_of_partitions_to_combine = materialized.list_of_partitions_to_combine
return materialized

def mask(self, row_indices, col_indices):
"""
Create (synchronously) a mask that extracts the indices provided.

Parameters
----------
row_indices : list-like, slice or label
The row labels for the rows to extract.
col_indices : list-like, slice or label
The column labels for the columns to extract.

Returns
-------
PandasOnDaskDataframeVirtualPartition
A new ``PandasOnDaskDataframeVirtualPartition`` object,
materialized.
"""
return (
self.force_materialization()
.list_of_partitions_to_combine[0]
.mask(row_indices, col_indices)
)

def to_pandas(self):
"""
Convert the data in this partition to a ``pandas.DataFrame``.

Returns
-------
pandas DataFrame.
"""
return self.force_materialization().list_of_partitions_to_combine[0].to_pandas()

_length_cache = None

def length(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test that covers this function and width?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really have any unit testing at the partition level. This PR is actually the first to add any testing thats supposed to cover that level. We could look into adding more testing, but I think that would belong in a separate PR.

"""
Get the length of this partition.

Returns
-------
int
The length of the partition.
"""
if self._length_cache is None:
if self.axis == 0:
self._length_cache = sum(
o.length() for o in self.list_of_partitions_to_combine
)
else:
self._length_cache = self.list_of_partitions_to_combine[0].length()
return self._length_cache

_width_cache = None

def width(self):
"""
Get the width of this partition.

Returns
-------
int
THe width of the partition.
"""
if self._width_cache is None:
if self.axis == 1:
self._width_cache = sum(
o.width() for o in self.list_of_partitions_to_combine
)
else:
self._width_cache = self.list_of_partitions_to_combine[0].width()
return self._width_cache

def drain_call_queue(self):
"""Execute all operations stored in this partition's call queue."""

def drain(df):
for func, args, kwargs in self.call_queue:
df = func(df, *args, **kwargs)
return df

drained = super(PandasOnDaskDataframeVirtualPartition, self).apply(drain)
self.list_of_partitions_to_combine = drained
self.call_queue = []

def wait(self):
"""Wait completing computations on the object wrapped by the partition."""
self.drain_call_queue()
wait(self.list_of_blocks)

def add_to_apply_calls(self, func, *args, **kwargs):
"""
Add a function to the call queue.

Parameters
----------
func : callable
Function to be added to the call queue.
*args : iterable
Additional positional arguments to be passed in `func`.
**kwargs : dict
Additional keyword arguments to be passed in `func`.

Returns
-------
PandasOnDaskDataframeVirtualPartition
A new ``PandasOnDaskDataframeVirtualPartition`` object.

Notes
-----
The keyword arguments are sent as a dictionary.
"""
return type(self)(
self.list_of_partitions_to_combine,
full_axis=self.full_axis,
call_queue=self.call_queue + [(func, args, kwargs)],
)


class PandasOnDaskDataframeColumnPartition(PandasOnDaskDataframeAxisPartition):
class PandasOnDaskDataframeColumnPartition(PandasOnDaskDataframeVirtualPartition):
"""
The column partition implementation.

Expand All @@ -180,7 +439,7 @@ class PandasOnDaskDataframeColumnPartition(PandasOnDaskDataframeAxisPartition):
axis = 0


class PandasOnDaskDataframeRowPartition(PandasOnDaskDataframeAxisPartition):
class PandasOnDaskDataframeRowPartition(PandasOnDaskDataframeVirtualPartition):
"""
The row partition implementation.

Expand Down