Skip to content

Commit

Permalink
Prepare changes for code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Retribution98 committed Apr 11, 2024
1 parent 583d0e4 commit b761492
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 180 deletions.
103 changes: 0 additions & 103 deletions check_perfomance.py

This file was deleted.

43 changes: 39 additions & 4 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, NPartitions
from modin.config import CpuCount, Engine, IsRayCluster, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2171,9 +2171,44 @@ def map(
PandasDataframe
A new dataframe.
"""
new_partitions = self._partition_mgr_cls.new_map(
self._partitions, lazy, func, func_args, func_kwargs
)
if self.num_parts <= 1.5 * CpuCount.get():
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
# old way
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
axis = (
1
if abs(self._partitions.shape[0] - CpuCount.get())
< abs(self._partitions.shape[1] - CpuCount.get())
else 0
)
column_splits = (
self._partitions.shape[0]
// (CpuCount.get() // self._partitions.shape[1])
if CpuCount.get() > self._partitions.shape[1]
else 1
)

if axis == 1 or column_splits <= 1:
# splitting by full_axis rows
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
func,
keep_partitioning=True,
map_func_args=func_args,
**func_kwargs if func_kwargs is not None else {},
)
else:
new_partitions = (
self._partition_mgr_cls.map_partitions_splitting_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)
)
if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
self.columns
Expand Down
4 changes: 2 additions & 2 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def apply(
list
A list of `PandasDataframePartition` objects.
"""
if False and not self.full_axis:
if not self.full_axis:
# If this is not a full axis partition, it already contains a subset of
# the full axis, so we shouldn't split the result further.
num_splits = 1
Expand Down Expand Up @@ -295,7 +295,7 @@ def apply(
return result
else:
# If this is not a full axis partition, just take out the single split in the result.
return result
return result[0]

def split(
self, split_func, num_splits, f_args=None, f_kwargs=None, extract_metadata=False
Expand Down
128 changes: 57 additions & 71 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

from modin.config import (
BenchmarkMode,
CpuCount,
Engine,
NPartitions,
PersistentPickle,
Expand Down Expand Up @@ -750,82 +749,69 @@ def map_axis_partitions(
)

@classmethod
def new_map(cls, parts, lazy, func, func_args, func_kwargs):
map_fn = cls.lazy_map_partitions if lazy else cls.map_partitions
if os.environ["MY_STRATAGY"] == "1": # np.prod(parts.shape) <= CpuCount.get():
# old way
result_partitions = map_fn(parts, func, func_args, func_kwargs)
else:
axis = (
1
if abs(parts.shape[0] - CpuCount.get())
< abs(parts.shape[1] - CpuCount.get())
else 0
)
column_splits = (
parts.shape[0] // (CpuCount.get() // parts.shape[1])
if CpuCount.get() > parts.shape[1]
else 1
)
def map_partitions_splitting_by_column(
cls,
partitions,
column_splits,
map_func,
map_func_args=None,
map_func_kwargs=None,
):
"""
Combine several blocks by column into one virtual partition and apply “map_funk” to them.
if os.environ["MY_STRATAGY"] == "2": # axis == 1 or column_splits <= 1:
# previous way
result_partitions = cls.map_axis_partitions(
axis,
parts,
func,
keep_partitioning=True,
map_func_args=func_args,
**func_kwargs if func_kwargs is not None else {},
)
elif os.environ["MY_STRATAGY"] == "3": # just else
# it is a trick using only for check perfomance
# column_splits <= 1 is not expected in final version
if column_splits < 1:
column_splits = 1
Parameters
----------
partitions : NumPy 2D array
Partitions of Modin Frame.
column_splits : int
The number of splits by column.
map_func : callable
Function to apply.
func_args : iterable, optional
Positional arguments for the 'map_func'.
func_kwargs : dict, optional
Keyword arguments for the 'map_func'.
new_partitions = np.array(
[
cls.column_partitions(
parts[i : i + column_splits],
full_axis=False,
)
for i in range(
0,
parts.shape[0],
column_splits,
)
]
Returns
-------
NumPy array
An array of new partitions for Modin Frame.
"""
new_partitions = np.array(
[
cls.column_partitions(
partitions[i : i + column_splits],
# full_axis=False,
)
for i in range(
0,
partitions.shape[0],
column_splits,
)
preprocessed_map_func = cls.preprocess_func(func)

# In some cases we can better split the parts,
# but we must change the metadata in the modin frame
# num_splits = math.ceil(NPartitions.get() / column_splits)
kw = {
"num_splits": column_splits,
}
result_partitions = np.concatenate(
]
)
preprocessed_map_func = cls.preprocess_func(map_func)
kw = {
"num_splits": column_splits,
}
return np.concatenate(
[
np.stack(
[
np.stack(
[
part.apply(
preprocessed_map_func,
*func_args if func_args is not None else (),
**kw,
**func_kwargs if func_kwargs is not None else {},
)
for part in row_of_parts
],
axis=-1,
part.apply(
preprocessed_map_func,
*map_func_args if map_func_args is not None else (),
**kw,
**map_func_kwargs if map_func_kwargs is not None else {},
)
for row_of_parts in new_partitions
]
for part in row_of_parts
],
axis=-1,
)
else:
raise ValueError("Inccorect MY_STRATAGY")

return result_partitions
for row_of_parts in new_partitions
]
)

@classmethod
def concat(cls, axis, left_parts, right_parts):
Expand Down

0 comments on commit b761492

Please sign in to comment.