Skip to content

Commit

Permalink
FEAT-#7004: use generators when returning from _deploy_ray_func remot…
Browse files Browse the repository at this point in the history
…e function. (#7005)

Signed-off-by: arunjose696 <arunjose696@gmail.com>
  • Loading branch information
arunjose696 committed Mar 13, 2024
1 parent 2ed2d49 commit eb740b9
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 39 deletions.
41 changes: 34 additions & 7 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from modin.core.dataframe.base.partitioning.axis_partition import (
BaseDataframeAxisPartition,
)
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas
from modin.core.storage_formats.pandas.utils import (
generate_result_of_axis_func_pandas,
split_result_of_axis_func_pandas,
)

from .partition import PandasDataframePartition

Expand Down Expand Up @@ -388,6 +391,7 @@ def deploy_axis_func(
*partitions,
lengths=None,
manual_partition=False,
return_generator=False,
):
"""
Deploy a function along a full axis.
Expand All @@ -413,11 +417,14 @@ def deploy_axis_func(
The list of lengths to shuffle the object.
manual_partition : bool, default: False
If True, partition the result with `lengths`.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.
Returns
-------
list
A list of pandas DataFrames.
list | Generator
A list or generator of pandas DataFrames.
"""
dataframe = pandas.concat(list(partitions), axis=axis, copy=False)
with warnings.catch_warnings():
Expand Down Expand Up @@ -451,7 +458,12 @@ def deploy_axis_func(
lengths = [len(part.columns) for part in partitions]
if sum(lengths) != len(result.columns):
lengths = None
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
if return_generator:
return generate_result_of_axis_func_pandas(
axis, num_splits, result, lengths
)
else:
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)

@classmethod
def deploy_func_between_two_axis_partitions(
Expand All @@ -464,6 +476,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
return_generator=False,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -487,11 +500,14 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.
Returns
-------
list
A list of pandas DataFrames.
list | Generator
A list or generator of pandas DataFrames.
"""
lt_frame = pandas.concat(partitions[:len_of_left], axis=axis, copy=False)

Expand All @@ -510,7 +526,18 @@ def deploy_func_between_two_axis_partitions(
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
return split_result_of_axis_func_pandas(axis, num_splits, result)
if return_generator:
return generate_result_of_axis_func_pandas(
axis,
num_splits,
result,
)
else:
return split_result_of_axis_func_pandas(
axis,
num_splits,
result,
)

@classmethod
def drain(cls, df: pandas.DataFrame, call_queue: list):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def deploy_axis_func(
f_kwargs=f_kwargs,
manual_partition=manual_partition,
lengths=lengths,
return_generator=True,
)

@classmethod
Expand Down Expand Up @@ -244,6 +245,7 @@ def deploy_func_between_two_axis_partitions(
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
return_generator=True,
)

def wait(self):
Expand Down Expand Up @@ -320,12 +322,16 @@ def _deploy_ray_func(
f_args = positional_args[:f_len_args]
deploy_args = positional_args[f_len_args:]
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs)

if not extract_metadata:
return result
ip = get_node_ip_address()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
for item in result:
yield item
else:
return [i for r in result for i in [r, None, None, ip]]
ip = get_node_ip_address()
for r in result:
if isinstance(r, pandas.DataFrame):
for item in [r, len(r), len(r.columns), ip]:
yield item
else:
for item in [r, None, None, ip]:
yield item
82 changes: 57 additions & 25 deletions modin/core/storage_formats/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,34 +84,66 @@ def split_result_of_axis_func_pandas(
list of pandas.DataFrames
Splitted dataframe represented by list of frames.
"""
return list(
generate_result_of_axis_func_pandas(
axis, num_splits, result, length_list, min_block_size
)
)


def generate_result_of_axis_func_pandas(
axis, num_splits, result, length_list=None, min_block_size=None
):
"""
Generate pandas DataFrame evenly based on the provided number of splits.
Parameters
----------
axis : {0, 1}
Axis to split across. 0 means index axis when 1 means column axis.
num_splits : int
Number of splits to separate the DataFrame into.
This parameter is ignored if `length_list` is specified.
result : pandas.DataFrame
DataFrame to split.
length_list : list of ints, optional
List of slice lengths to split DataFrame into. This is used to
return the DataFrame to its original partitioning schema.
min_block_size : int, optional
Minimum number of rows/columns in a single split.
If not specified, the value is assumed equal to ``MinPartitionSize``.
Yields
------
Generator
Generates 'num_splits' dataframes as a result of axis function.
"""
if num_splits == 1:
return [result]

if length_list is None:
length_list = get_length_list(result.shape[axis], num_splits, min_block_size)
# Inserting the first "zero" to properly compute cumsum indexing slices
length_list = np.insert(length_list, obj=0, values=[0])

sums = np.cumsum(length_list)
axis = 0 if isinstance(result, pandas.Series) else axis
# We do this to restore block partitioning
if axis == 0:
chunked = [result.iloc[sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]
yield result
else:
chunked = [result.iloc[:, sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]

return [
# Sliced MultiIndex still stores all encoded values of the original index, explicitly
# asking it to drop unused values in order to save memory.
(
chunk.set_axis(
chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False
if length_list is None:
length_list = get_length_list(
result.shape[axis], num_splits, min_block_size
)
if isinstance(chunk.axes[axis], pandas.MultiIndex)
else chunk
)
for chunk in chunked
]
# Inserting the first "zero" to properly compute cumsum indexing slices
length_list = np.insert(length_list, obj=0, values=[0])
sums = np.cumsum(length_list)
axis = 0 if isinstance(result, pandas.Series) else axis

for i in range(len(sums) - 1):
# We do this to restore block partitioning
if axis == 0:
chunk = result.iloc[sums[i] : sums[i + 1]]
else:
chunk = result.iloc[:, sums[i] : sums[i + 1]]

# Sliced MultiIndex still stores all encoded values of the original index, explicitly
# asking it to drop unused values in order to save memory.
if isinstance(chunk.axes[axis], pandas.MultiIndex):
chunk = chunk.set_axis(
chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False
)
yield chunk


def get_length_list(axis_len: int, num_splits: int, min_block_size=None) -> list:
Expand Down

0 comments on commit eb740b9

Please sign in to comment.