Skip to content

Commit

Permalink
Prepare changes for code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Retribution98 committed Apr 16, 2024
1 parent fcd37c2 commit c5d2ca7
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 180 deletions.
103 changes: 0 additions & 103 deletions check_perfomance.py

This file was deleted.

43 changes: 39 additions & 4 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2202,9 +2202,44 @@ def map(
PandasDataframe
A new dataframe.
"""
new_partitions = self._partition_mgr_cls.new_map(
self._partitions, lazy, func, func_args, func_kwargs
)
if self.num_parts <= 1.5 * CpuCount.get():
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
# old way
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
axis = (
1
if abs(self._partitions.shape[0] - CpuCount.get())
< abs(self._partitions.shape[1] - CpuCount.get())
else 0
)
column_splits = (
self._partitions.shape[0]
// (CpuCount.get() // self._partitions.shape[1])
if CpuCount.get() > self._partitions.shape[1]
else 1
)

if axis == 1 or column_splits <= 1:
# splitting by full_axis rows
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
func,
keep_partitioning=True,
map_func_args=func_args,
**func_kwargs if func_kwargs is not None else {},
)
else:
new_partitions = (
self._partition_mgr_cls.map_partitions_splitting_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)
)
if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
self.columns
Expand Down
4 changes: 2 additions & 2 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def apply(
list
A list of `PandasDataframePartition` objects.
"""
if False and not self.full_axis:
if not self.full_axis:
# If this is not a full axis partition, it already contains a subset of
# the full axis, so we shouldn't split the result further.
num_splits = 1
Expand Down Expand Up @@ -298,7 +298,7 @@ def apply(
return result
else:
# If this is not a full axis partition, just take out the single split in the result.
return result
return result[0]

def split(
self, split_func, num_splits, f_args=None, f_kwargs=None, extract_metadata=False
Expand Down
128 changes: 57 additions & 71 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

from modin.config import (
BenchmarkMode,
CpuCount,
Engine,
MinPartitionSize,
NPartitions,
Expand Down Expand Up @@ -751,82 +750,69 @@ def map_axis_partitions(
)

@classmethod
def new_map(cls, parts, lazy, func, func_args, func_kwargs):
map_fn = cls.lazy_map_partitions if lazy else cls.map_partitions
if os.environ["MY_STRATAGY"] == "1": # np.prod(parts.shape) <= CpuCount.get():
# old way
result_partitions = map_fn(parts, func, func_args, func_kwargs)
else:
axis = (
1
if abs(parts.shape[0] - CpuCount.get())
< abs(parts.shape[1] - CpuCount.get())
else 0
)
column_splits = (
parts.shape[0] // (CpuCount.get() // parts.shape[1])
if CpuCount.get() > parts.shape[1]
else 1
)
def map_partitions_splitting_by_column(
cls,
partitions,
column_splits,
map_func,
map_func_args=None,
map_func_kwargs=None,
):
"""
Combine several blocks by column into one virtual partition and apply “map_funk” to them.
if os.environ["MY_STRATAGY"] == "2": # axis == 1 or column_splits <= 1:
# previous way
result_partitions = cls.map_axis_partitions(
axis,
parts,
func,
keep_partitioning=True,
map_func_args=func_args,
**func_kwargs if func_kwargs is not None else {},
)
elif os.environ["MY_STRATAGY"] == "3": # just else
# it is a trick using only for check perfomance
# column_splits <= 1 is not expected in final version
if column_splits < 1:
column_splits = 1
Parameters
----------
partitions : NumPy 2D array
Partitions of Modin Frame.
column_splits : int
The number of splits by column.
map_func : callable
Function to apply.
func_args : iterable, optional
Positional arguments for the 'map_func'.
func_kwargs : dict, optional
Keyword arguments for the 'map_func'.
new_partitions = np.array(
[
cls.column_partitions(
parts[i : i + column_splits],
full_axis=False,
)
for i in range(
0,
parts.shape[0],
column_splits,
)
]
Returns
-------
NumPy array
An array of new partitions for Modin Frame.
"""
new_partitions = np.array(
[
cls.column_partitions(
partitions[i : i + column_splits],
# full_axis=False,
)
for i in range(
0,
partitions.shape[0],
column_splits,
)
preprocessed_map_func = cls.preprocess_func(func)

# In some cases we can better split the parts,
# but we must change the metadata in the modin frame
# num_splits = math.ceil(NPartitions.get() / column_splits)
kw = {
"num_splits": column_splits,
}
result_partitions = np.concatenate(
]
)
preprocessed_map_func = cls.preprocess_func(map_func)
kw = {
"num_splits": column_splits,
}
return np.concatenate(
[
np.stack(
[
np.stack(
[
part.apply(
preprocessed_map_func,
*func_args if func_args is not None else (),
**kw,
**func_kwargs if func_kwargs is not None else {},
)
for part in row_of_parts
],
axis=-1,
part.apply(
preprocessed_map_func,
*map_func_args if map_func_args is not None else (),
**kw,
**map_func_kwargs if map_func_kwargs is not None else {},
)
for row_of_parts in new_partitions
]
for part in row_of_parts
],
axis=-1,
)
else:
raise ValueError("Inccorect MY_STRATAGY")

return result_partitions
for row_of_parts in new_partitions
]
)

@classmethod
def concat(cls, axis, left_parts, right_parts):
Expand Down

0 comments on commit c5d2ca7

Please sign in to comment.