Skip to content

Commit

Permalink
Fix docstrings and add a drain_call_queue comment.
Browse files Browse the repository at this point in the history
Signed-off-by: mvashishtha <mahesh@ponder.io>
  • Loading branch information
mvashishtha committed Aug 2, 2022
1 parent 81f3c5e commit 3a73fab
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
15 changes: 9 additions & 6 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@


class PandasDataframeAxisPartition(BaseDataframeAxisPartition):

block_partition_type = None
wait = None

"""
An abstract class is created to simplify and consolidate the code for axis partition that run pandas.
Because much of the code is similar, this allows us to reuse this code.
Parameters
----------
list_of_blocks : Union[list, block_partition_type]
Expand All @@ -46,6 +41,10 @@ class PandasDataframeAxisPartition(BaseDataframeAxisPartition):
A list of tuples (callable, args, kwargs) that contains deferred calls.
"""

block_partition_type = None
wait = None
axis = None

def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None):
if isinstance(list_of_blocks, self.block_partition_type):
list_of_blocks = [list_of_blocks]
Expand Down Expand Up @@ -342,7 +341,6 @@ def drain_call_queue(self, num_splits=None):
num_splits : int, default: None
The number of times to split the result object.
"""

# Copy the original call queue and set it to empty so we don't try to
# drain it again when we apply().
call_queue = self.call_queue
Expand All @@ -355,6 +353,11 @@ def drain(df):

drained = self.apply(drain, num_splits=num_splits)
if not self.full_axis:
# `apply` gives a partition instead of a list of splits
# when this is not a full-axis partition.
# TODO(https://github.com/modin-project/modin/issues/4757):
# stop making full_axis a property of the object and instead
# control the splitting behavior with an argument to `apply`.
drained = [drained]
self.list_of_partitions_to_combine = drained

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from distributed import Future
from distributed.utils import get_ip
from dask.distributed import wait as dask_wait
from dask.distributed import wait

import pandas

Expand All @@ -27,11 +27,25 @@


class PandasOnDaskDataframeVirtualPartition(PandasDataframeAxisPartition):
"""
The class implements the interface in ``PandasDataframeAxisPartition``.
Parameters
----------
list_of_blocks : Union[list, PandasOnDaskDataframePartition]
List of ``PandasOnDaskDataframePartition`` and
``PandasOnDaskDataframeVirtualPartition`` objects, or a single
``PandasOnDaskDataframePartition``.
get_ip : bool, default: False
Whether to get node IP addresses of conforming partitions or not.
full_axis : bool, default: True
Whether or not the virtual partition encompasses the whole axis.
call_queue : list, optional
A list of tuples (callable, args, kwargs) that contains deferred calls.
"""

block_partition_type = PandasOnDaskDataframePartition
instance_type = Future
wait = dask_wait
axis = None

def wait(self):
"""Wait completing computations on the object wrapped by the partition."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition):

block_partition_type = PandasOnRayDataframePartition
instance_type = ray.ObjectRef
wait = ray.wait
axis = None

@classmethod
def deploy_axis_func(
Expand Down

0 comments on commit 3a73fab

Please sign in to comment.