Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#7170: Don't use MinPartitionSize configuration variable in remote context #7177

Merged
merged 8 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, NPartitions
from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -1549,7 +1549,9 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
# the "standard" partitioning. Knowing the standard partitioning scheme
# we are able to compute new row lengths.
new_lengths = get_length_list(
axis_len=len(row_idx), num_splits=ordered_rows.shape[0]
axis_len=len(row_idx),
num_splits=ordered_rows.shape[0],
min_block_size=MinPartitionSize.get(),
)
else:
# If the frame's partitioning was preserved then
Expand Down Expand Up @@ -1585,7 +1587,9 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
# the "standard" partitioning. Knowing the standard partitioning scheme
# we are able to compute new column widths.
new_widths = get_length_list(
axis_len=len(col_idx), num_splits=ordered_cols.shape[1]
axis_len=len(col_idx),
num_splits=ordered_cols.shape[1],
min_block_size=MinPartitionSize.get(),
)
else:
# If the frame's partitioning was preserved then
Expand Down Expand Up @@ -3500,7 +3504,9 @@ def broadcast_apply_full_axis(
if kw["row_lengths"] is None and is_index_materialized:
if axis == 0:
kw["row_lengths"] = get_length_list(
axis_len=len(new_index), num_splits=new_partitions.shape[0]
axis_len=len(new_index),
num_splits=new_partitions.shape[0],
min_block_size=MinPartitionSize.get(),
)
elif axis == 1:
if self._row_lengths_cache is not None and len(new_index) == sum(
Expand All @@ -3512,6 +3518,7 @@ def broadcast_apply_full_axis(
kw["column_widths"] = get_length_list(
axis_len=len(new_columns),
num_splits=new_partitions.shape[1],
min_block_size=MinPartitionSize.get(),
)
elif axis == 0:
if self._column_widths_cache is not None and len(
Expand Down
23 changes: 20 additions & 3 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import numpy as np
import pandas

from modin.config import MinPartitionSize
Fixed Show fixed Hide fixed
from modin.core.dataframe.base.partitioning.axis_partition import (
BaseDataframeAxisPartition,
)
Expand Down Expand Up @@ -276,6 +277,7 @@ def apply(
for part in axis_partition.list_of_blocks
]
),
min_block_size=MinPartitionSize.get(),
)
)
result = self._wrap_partitions(
Expand All @@ -287,8 +289,9 @@ def apply(
num_splits,
maintain_partitioning,
*self.list_of_blocks,
manual_partition=manual_partition,
min_block_size=MinPartitionSize.get(),
lengths=lengths,
manual_partition=manual_partition,
)
)
if self.full_axis:
Expand Down Expand Up @@ -391,6 +394,7 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
manual_partition=False,
return_generator=False,
Expand All @@ -415,6 +419,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : list, optional
The list of lengths to shuffle the object.
manual_partition : bool, default: False
Expand Down Expand Up @@ -473,10 +479,16 @@ def deploy_axis_func(
lengths = None
if return_generator:
return generate_result_of_axis_func_pandas(
axis, num_splits, result, lengths
axis,
num_splits,
result,
min_block_size,
lengths,
)
else:
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
return split_result_of_axis_func_pandas(
axis, num_splits, result, min_block_size, lengths
)

@classmethod
def deploy_func_between_two_axis_partitions(
Expand All @@ -489,6 +501,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
return_generator=False,
):
"""
Expand All @@ -513,6 +526,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.
Expand Down Expand Up @@ -559,12 +574,14 @@ def deploy_func_between_two_axis_partitions(
axis,
num_splits,
result,
min_block_size,
)
else:
return split_result_of_axis_func_pandas(
axis,
num_splits,
result,
min_block_size,
)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from modin.config import (
BenchmarkMode,
Engine,
MinPartitionSize,
NPartitions,
PersistentPickle,
ProgressBar,
Expand Down Expand Up @@ -890,8 +891,9 @@ def from_pandas(cls, df, return_dims=False):
A NumPy array with partitions (with dimensions or not).
"""
num_splits = NPartitions.get()
row_chunksize = compute_chunksize(df.shape[0], num_splits)
col_chunksize = compute_chunksize(df.shape[1], num_splits)
min_block_size = MinPartitionSize.get()
row_chunksize = compute_chunksize(df.shape[0], num_splits, min_block_size)
col_chunksize = compute_chunksize(df.shape[1], num_splits, min_block_size)

bar_format = (
"{l_bar}{bar}{r_bar}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
manual_partition=False,
):
Expand All @@ -135,6 +136,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : iterable, default: None
The list of lengths to shuffle the partition into.
manual_partition : bool, default: False
Expand All @@ -159,6 +162,7 @@ def deploy_axis_func(
*partitions,
),
f_kwargs={
"min_block_size": min_block_size,
"lengths": lengths,
"manual_partition": manual_partition,
},
Expand All @@ -177,6 +181,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -200,6 +205,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.

Returns
-------
Expand All @@ -219,6 +226,9 @@ def deploy_func_between_two_axis_partitions(
other_shape,
*partitions,
),
f_kwargs={
"min_block_size": min_block_size,
},
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN),
pure=False,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import numpy as np
import ray

from modin.config import GpuCount
from modin.config import GpuCount, MinPartitionSize
from modin.core.execution.ray.common import RayWrapper
from modin.core.execution.ray.generic.partitioning import (
GenericRayDataframePartitionManager,
Expand Down Expand Up @@ -125,7 +125,9 @@ def from_pandas(cls, df, return_dims=False):
num_splits = GpuCount.get()
put_func = cls._partition_class.put
# For now, we default to row partitioning
pandas_dfs = split_result_of_axis_func_pandas(0, num_splits, df)
pandas_dfs = split_result_of_axis_func_pandas(
0, num_splits, df, min_block_size=MinPartitionSize.get()
)
keys = [
put_func(cls._get_gpu_managers()[i], pandas_dfs[i])
for i in range(num_splits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
manual_partition=False,
max_retries=None,
Expand All @@ -161,6 +162,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : list, optional
The list of lengths to shuffle the object.
manual_partition : bool, default: False
Expand Down Expand Up @@ -188,6 +191,7 @@ def deploy_axis_func(
f_len_args=len(f_args),
f_kwargs=f_kwargs,
manual_partition=manual_partition,
min_block_size=min_block_size,
lengths=lengths,
return_generator=True,
)
Expand All @@ -203,6 +207,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -226,6 +231,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.

Returns
-------
Expand All @@ -245,6 +252,7 @@ def deploy_func_between_two_axis_partitions(
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
min_block_size=min_block_size,
return_generator=True,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
manual_partition=False,
max_retries=None,
Expand All @@ -162,6 +163,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : list, optional
The list of lengths to shuffle the object.
manual_partition : bool, default: False
Expand All @@ -188,6 +191,7 @@ def deploy_axis_func(
maintain_partitioning,
*partitions,
manual_partition=manual_partition,
min_block_size=min_block_size,
lengths=lengths,
)

Expand All @@ -202,6 +206,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -225,6 +230,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.

Returns
-------
Expand All @@ -243,6 +250,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size=min_block_size,
)

def wait(self):
Expand Down
5 changes: 3 additions & 2 deletions modin/core/io/column_stores/column_store_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def build_index(cls, partition_ids):
row_lengths : list
List with lengths of index chunks.
"""
num_partitions = NPartitions.get()
index_len = (
0 if len(partition_ids) == 0 else cls.materialize(partition_ids[-2][0])
)
Expand All @@ -130,7 +129,9 @@ def build_index(cls, partition_ids):
else:
index = index_len
index_len = len(index)
index_chunksize = compute_chunksize(index_len, num_partitions)
num_partitions = NPartitions.get()
min_block_size = MinPartitionSize.get()
index_chunksize = compute_chunksize(index_len, num_partitions, min_block_size)
if index_chunksize > index_len:
row_lengths = [index_len] + [0 for _ in range(num_partitions - 1)]
else:
Expand Down
8 changes: 6 additions & 2 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths):
for i in range(num_splits):
new_parts[offset + i].append(split[i])

new_row_lengths.extend(get_length_list(part_len, num_splits))
new_row_lengths.extend(
get_length_list(part_len, num_splits, MinPartitionSize.get())
)

remote_parts = np.array(new_parts)
row_lengths = new_row_lengths
Expand All @@ -746,7 +748,9 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths):
for row_parts in remote_parts
]
)
column_widths = get_length_list(sum(column_widths), desired_col_nparts)
column_widths = get_length_list(
sum(column_widths), desired_col_nparts, MinPartitionSize.get()
)

return remote_parts, row_lengths, column_widths

Expand Down
5 changes: 3 additions & 2 deletions modin/core/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pandas.core.dtypes.common import is_list_like
from pandas.io.common import stringify_path

from modin.config import NPartitions
from modin.config import MinPartitionSize, NPartitions
from modin.core.io.file_dispatcher import FileDispatcher, OpenFile
from modin.core.io.text.utils import CustomNewlineIterator
from modin.core.storage_formats.pandas.utils import compute_chunksize
Expand Down Expand Up @@ -571,7 +571,8 @@ def _define_metadata(
"""
# This is the number of splits for the columns
num_splits = min(len(column_names) or 1, NPartitions.get())
column_chunksize = compute_chunksize(df.shape[1], num_splits)
min_block_size = MinPartitionSize.get()
column_chunksize = compute_chunksize(df.shape[1], num_splits, min_block_size)
if column_chunksize > len(column_names):
column_widths = [len(column_names)]
# This prevents us from unnecessarily serializing a bunch of empty
Expand Down
Loading
Loading