Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Apr 12, 2024
1 parent be26a1a commit 40b5569
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 52 deletions.
15 changes: 11 additions & 4 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, NPartitions
from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -1549,7 +1549,9 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
# the "standard" partitioning. Knowing the standard partitioning scheme
# we are able to compute new row lengths.
new_lengths = get_length_list(
axis_len=len(row_idx), num_splits=ordered_rows.shape[0]
axis_len=len(row_idx),
num_splits=ordered_rows.shape[0],
min_block_size=MinPartitionSize.get(),
)
else:
# If the frame's partitioning was preserved then
Expand Down Expand Up @@ -1585,7 +1587,9 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
# the "standard" partitioning. Knowing the standard partitioning scheme
# we are able to compute new column widths.
new_widths = get_length_list(
axis_len=len(col_idx), num_splits=ordered_cols.shape[1]
axis_len=len(col_idx),
num_splits=ordered_cols.shape[1],
min_block_size=MinPartitionSize.get(),
)
else:
# If the frame's partitioning was preserved then
Expand Down Expand Up @@ -3500,7 +3504,9 @@ def broadcast_apply_full_axis(
if kw["row_lengths"] is None and is_index_materialized:
if axis == 0:
kw["row_lengths"] = get_length_list(
axis_len=len(new_index), num_splits=new_partitions.shape[0]
axis_len=len(new_index),
num_splits=new_partitions.shape[0],
min_block_size=MinPartitionSize.get(),
)
elif axis == 1:
if self._row_lengths_cache is not None and len(new_index) == sum(
Expand All @@ -3512,6 +3518,7 @@ def broadcast_apply_full_axis(
kw["column_widths"] = get_length_list(
axis_len=len(new_columns),
num_splits=new_partitions.shape[1],
min_block_size=MinPartitionSize.get(),
)
elif axis == 0:
if self._column_widths_cache is not None and len(
Expand Down
13 changes: 10 additions & 3 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ def deploy_axis_func(
num_splits,
maintain_partitioning,
*partitions,
min_block_size,
lengths=None,
min_block_size=None,
manual_partition=False,
return_generator=False,
):
Expand All @@ -416,6 +416,8 @@ def deploy_axis_func(
If False, create a new partition layout.
*partitions : iterable
All partitions that make up the full axis (row or column).
min_block_size : int
Minimum number of rows/columns in a single split.
lengths : list, optional
The list of lengths to shuffle the object.
manual_partition : bool, default: False
Expand Down Expand Up @@ -477,12 +479,12 @@ def deploy_axis_func(
axis,
num_splits,
result,
min_block_size,
lengths,
min_block_size=min_block_size,
)
else:
return split_result_of_axis_func_pandas(

Check warning on line 486 in modin/core/dataframe/pandas/partitioning/axis_partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/axis_partition.py#L486

Added line #L486 was not covered by tests
axis, num_splits, result, lengths, min_block_size=min_block_size
axis, num_splits, result, min_block_size, lengths
)

@classmethod
Expand All @@ -496,6 +498,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
min_block_size,
return_generator=False,
):
"""
Expand All @@ -520,6 +523,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
min_block_size : int
Minimum number of rows/columns in a single split.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.
Expand Down Expand Up @@ -566,12 +571,14 @@ def deploy_func_between_two_axis_partitions(
axis,
num_splits,
result,
min_block_size,
)
else:
return split_result_of_axis_func_pandas(
axis,
num_splits,
result,
min_block_size,
)

@classmethod
Expand Down
6 changes: 4 additions & 2 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from modin.config import (
BenchmarkMode,
Engine,
MinPartitionSize,
NPartitions,
PersistentPickle,
ProgressBar,
Expand Down Expand Up @@ -890,8 +891,9 @@ def from_pandas(cls, df, return_dims=False):
A NumPy array with partitions (with dimensions or not).
"""
num_splits = NPartitions.get()
row_chunksize = compute_chunksize(df.shape[0], num_splits)
col_chunksize = compute_chunksize(df.shape[1], num_splits)
min_block_size = MinPartitionSize.get()
row_chunksize = compute_chunksize(df.shape[0], num_splits, min_block_size)
col_chunksize = compute_chunksize(df.shape[1], num_splits, min_block_size)

bar_format = (
"{l_bar}{bar}{r_bar}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import numpy as np
import ray

from modin.config import GpuCount
from modin.config import GpuCount, MinPartitionSize
from modin.core.execution.ray.common import RayWrapper
from modin.core.execution.ray.generic.partitioning import (
GenericRayDataframePartitionManager,
Expand Down Expand Up @@ -125,7 +125,9 @@ def from_pandas(cls, df, return_dims=False):
num_splits = GpuCount.get()
put_func = cls._partition_class.put
# For now, we default to row partitioning
pandas_dfs = split_result_of_axis_func_pandas(0, num_splits, df)
pandas_dfs = split_result_of_axis_func_pandas(
0, num_splits, df, min_block_size=MinPartitionSize.get()
)
keys = [
put_func(cls._get_gpu_managers()[i], pandas_dfs[i])
for i in range(num_splits)
Expand Down
5 changes: 3 additions & 2 deletions modin/core/io/column_stores/column_store_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def build_index(cls, partition_ids):
row_lengths : list
List with lengths of index chunks.
"""
num_partitions = NPartitions.get()
index_len = (
0 if len(partition_ids) == 0 else cls.materialize(partition_ids[-2][0])
)
Expand All @@ -130,7 +129,9 @@ def build_index(cls, partition_ids):
else:
index = index_len
index_len = len(index)
index_chunksize = compute_chunksize(index_len, num_partitions)
num_partitions = NPartitions.get()
min_block_size = MinPartitionSize.get()
index_chunksize = compute_chunksize(index_len, num_partitions, min_block_size)

Check warning on line 134 in modin/core/io/column_stores/column_store_dispatcher.py

View check run for this annotation

Codecov / codecov/patch

modin/core/io/column_stores/column_store_dispatcher.py#L132-L134

Added lines #L132 - L134 were not covered by tests
if index_chunksize > index_len:
row_lengths = [index_len] + [0 for _ in range(num_partitions - 1)]
else:
Expand Down
8 changes: 6 additions & 2 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths):
for i in range(num_splits):
new_parts[offset + i].append(split[i])

new_row_lengths.extend(get_length_list(part_len, num_splits))
new_row_lengths.extend(

Check warning on line 725 in modin/core/io/column_stores/parquet_dispatcher.py

View check run for this annotation

Codecov / codecov/patch

modin/core/io/column_stores/parquet_dispatcher.py#L725

Added line #L725 was not covered by tests
get_length_list(part_len, num_splits, MinPartitionSize.get())
)

remote_parts = np.array(new_parts)
row_lengths = new_row_lengths
Expand All @@ -746,7 +748,9 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths):
for row_parts in remote_parts
]
)
column_widths = get_length_list(sum(column_widths), desired_col_nparts)
column_widths = get_length_list(

Check warning on line 751 in modin/core/io/column_stores/parquet_dispatcher.py

View check run for this annotation

Codecov / codecov/patch

modin/core/io/column_stores/parquet_dispatcher.py#L751

Added line #L751 was not covered by tests
sum(column_widths), desired_col_nparts, MinPartitionSize.get()
)

return remote_parts, row_lengths, column_widths

Expand Down
5 changes: 3 additions & 2 deletions modin/core/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pandas.core.dtypes.common import is_list_like
from pandas.io.common import stringify_path

from modin.config import NPartitions
from modin.config import MinPartitionSize, NPartitions
from modin.core.io.file_dispatcher import FileDispatcher, OpenFile
from modin.core.io.text.utils import CustomNewlineIterator
from modin.core.storage_formats.pandas.utils import compute_chunksize
Expand Down Expand Up @@ -571,7 +571,8 @@ def _define_metadata(
"""
# This is the number of splits for the columns
num_splits = min(len(column_names) or 1, NPartitions.get())
column_chunksize = compute_chunksize(df.shape[1], num_splits)
min_block_size = MinPartitionSize.get()
column_chunksize = compute_chunksize(df.shape[1], num_splits, min_block_size)

Check warning on line 575 in modin/core/io/text/text_file_dispatcher.py

View check run for this annotation

Codecov / codecov/patch

modin/core/io/text/text_file_dispatcher.py#L574-L575

Added lines #L574 - L575 were not covered by tests
if column_chunksize > len(column_names):
column_widths = [len(column_names)]
# This prevents us from unnecessarily serializing a bunch of empty
Expand Down
5 changes: 4 additions & 1 deletion modin/core/storage_formats/cudf/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pandas.core.dtypes.concat import union_categoricals
from pandas.io.common import infer_compression

from modin.config import MinPartitionSize

Check warning on line 23 in modin/core/storage_formats/cudf/parser.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/cudf/parser.py#L23

Added line #L23 was not covered by tests
from modin.core.execution.ray.implementations.cudf_on_ray.partitioning.partition_manager import (
GPU_MANAGERS,
)
Expand All @@ -39,7 +40,9 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover
Returns:
A list of pandas DataFrames.
"""
splits = split_result_of_axis_func_pandas(axis, num_splits, df)
splits = split_result_of_axis_func_pandas(
axis, num_splits, df, min_block_size=MinPartitionSize.get()
)
if not isinstance(splits, list):
splits = [splits]
return splits
Expand Down
5 changes: 4 additions & 1 deletion modin/core/storage_formats/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from pandas.io.common import infer_compression
from pandas.util._decorators import doc

from modin.config import MinPartitionSize
from modin.core.io.file_dispatcher import OpenFile
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas
from modin.db_conn import ModinDatabaseConnection
Expand Down Expand Up @@ -113,7 +114,9 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover
list
A list of pandas DataFrames.
"""
splits = split_result_of_axis_func_pandas(axis, num_splits, df)
splits = split_result_of_axis_func_pandas(
axis, num_splits, df, min_block_size=MinPartitionSize.get()
)
if not isinstance(splits, list):
splits = [splits]
return splits
Expand Down
Loading

0 comments on commit 40b5569

Please sign in to comment.