Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#4909: Properly implement map operator #5118

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/flow/modin/core/dataframe/pandas/dataframe.rst
Expand Up @@ -16,8 +16,8 @@ providing set of methods to perform operations on the internal data.

As mentioned above, ``PandasDataframe`` shouldn't work with stored partitions directly and
the responsibility for modifying partitions array has to lay on :doc:`partitioning/partition_manager`. For example, method
:meth:`~modin.core.dataframe.pandas.dataframe.dataframe.PandasDataframe.broadcast_apply_full_axis` redirects applying
function to :meth:`~PandasDataframePartitionManager.broadcast_axis_partitions` method.
:meth:`~modin.core.dataframe.pandas.dataframe.dataframe.PandasDataframe.map_full_axis` redirects applying
function to :meth:`~PandasDataframePartitionManager.map_partitions_full_axis` method.

``Modin PandasDataframe`` can be created from ``pandas.DataFrame``, ``pyarrow.Table``
(methods :meth:`~modin.core.dataframe.pandas.dataframe.dataframe.PandasDataframe.from_pandas`,
Expand Down
33 changes: 24 additions & 9 deletions modin/core/dataframe/algebra/binary.py
Expand Up @@ -44,7 +44,13 @@ def register(cls, func, join_type="outer", labels="replace"):
"""

def caller(
query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs
query_compiler,
other,
broadcast=False,
*args,
dtypes=None,
copy_dtypes=False,
**kwargs
):
"""
Apply binary `func` to passed operands.
Expand All @@ -61,8 +67,14 @@ def caller(
at the query compiler level, so this parameter is a hint that passed from a high level API.
*args : args,
Arguments that will be passed to `func`.
dtypes : "copy" or None, default: None
Whether to keep old dtypes or infer new dtypes from data.
dtypes : pandas.Series or scalar type, optional
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
If the argument is a scalar type, then that type is assigned to each result column.
copy_dtypes : bool, default False
If True, the dtypes of the resulting dataframe are copied from the original,
and the ``dtypes`` argument is ignored.
**kwargs : kwargs,
Arguments that will be passed to `func`.

Expand All @@ -76,21 +88,22 @@ def caller(
if broadcast:
assert (
len(other.columns) == 1
), "Invalid broadcast argument for `broadcast_apply`, too many columns: {}".format(
), "Invalid broadcast argument for `map` with broadcast, too many columns: {}".format(
len(other.columns)
)
# Transpose on `axis=1` because we always represent an individual
# column or row as a single-column Modin DataFrame
if axis == 1:
other = other.transpose()
return query_compiler.__constructor__(
query_compiler._modin_frame.broadcast_apply(
axis,
query_compiler._modin_frame.map(
lambda l, r: func(l, r.squeeze(), *args, **kwargs),
other._modin_frame,
axis=axis,
other=other._modin_frame,
join_type=join_type,
labels=labels,
dtypes=dtypes,
copy_dtypes=copy_dtypes,
)
)
else:
Expand All @@ -105,17 +118,19 @@ def caller(
# TODO: it's possible to chunk the `other` and broadcast them to partitions
# accordingly, in that way we will be able to use more efficient `._modin_frame.map()`
if isinstance(other, (dict, list, np.ndarray, pandas.Series)):
new_modin_frame = query_compiler._modin_frame.apply_full_axis(
axis,
new_modin_frame = query_compiler._modin_frame.map_full_axis(
lambda df: func(df, other, *args, **kwargs),
axis=axis,
new_index=query_compiler.index,
new_columns=query_compiler.columns,
dtypes=dtypes,
copy_dtypes=copy_dtypes,
)
else:
new_modin_frame = query_compiler._modin_frame.map(
lambda df: func(df, other, *args, **kwargs),
dtypes=dtypes,
copy_dtypes=copy_dtypes,
)
return query_compiler.__constructor__(new_modin_frame)

Expand Down
22 changes: 16 additions & 6 deletions modin/core/dataframe/base/dataframe/dataframe.py
Expand Up @@ -19,6 +19,9 @@

from abc import ABC, abstractmethod
from typing import List, Hashable, Optional, Callable, Union, Dict

import pandas

from modin.core.dataframe.base.dataframe.utils import Axis, JoinType


Expand Down Expand Up @@ -91,8 +94,10 @@ def filter_by_types(self, types: List[Hashable]) -> "ModinDataframe":
def map(
self,
function: Callable,
*,
axis: Optional[Union[int, Axis]] = None,
dtypes: Optional[str] = None,
dtypes: Optional[Union[pandas.Series, type]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, there was a discussion regarding limiting the usage of pandas entities in the base classes of Modin internals. Some executions may not require pandas at all and wouldn't like to deal with handling breaking changes introduced by some pandas updates.

May we define the dtypes type as something abstract like collections.abc.Mapping so every execution would use whatever container they like?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that makes senes. Is there some other generic container that would accept pandas.Series though? It seems like it's not a subclass of Mapping.

copy_dtypes: bool = False,
) -> "ModinDataframe":
"""
Apply a user-defined function row-wise if `axis`=0, column-wise if `axis`=1, and cell-wise if `axis` is None.
Expand All @@ -102,11 +107,14 @@ def map(
function : callable(row|col|cell) -> row|col|cell
The function to map across the dataframe.
axis : int or modin.core.dataframe.base.utils.Axis, optional
The axis to map over.
dtypes : str, optional
The axis to map over. If None, the map will be performed element-wise.
dtypes : pandas.Series or scalar type, optional
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
copy_dtypes : bool, default: False
If True, the dtypes of the resulting dataframe are copied from the original,
and the ``dtypes`` argument is ignored.
Comment on lines +115 to +117
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we offer a copy_dtypes option only for the map operator but not for reduce and tree_reduce?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure, though my guess is that the frequently dimension-reducing nature of reduce/tree-reduce makes the argument less relevant for those cases. Here, I introduced copy_dtypes as a replacement for dtypes="copy", which is a little hacky.


Returns
-------
Expand Down Expand Up @@ -258,7 +266,8 @@ def reduce(
self,
axis: Union[int, Axis],
function: Callable,
dtypes: Optional[str] = None,
*,
dtypes: Optional[pandas.Series] = None,
) -> "ModinDataframe":
"""
Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton.
Expand All @@ -269,7 +278,7 @@ def reduce(
The axis to perform the reduce over.
function : callable(row|col) -> single value
The reduce function to apply to each column.
dtypes : str, optional
dtypes : pandas.Series, optional
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
Expand All @@ -291,7 +300,8 @@ def tree_reduce(
axis: Union[int, Axis],
map_func: Callable,
reduce_func: Optional[Callable] = None,
dtypes: Optional[str] = None,
*,
dtypes: Optional[pandas.Series] = None,
) -> "ModinDataframe":
"""
Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton using a tree-reduce computation pattern.
Expand Down
10 changes: 10 additions & 0 deletions modin/core/dataframe/base/dataframe/utils.py
Expand Up @@ -19,6 +19,12 @@
"""

from enum import Enum
import sys

if sys.version_info.minor < 8:
from typing_extensions import Literal
else:
from typing import Literal # type: ignore


class Axis(Enum): # noqa: PR01
Expand All @@ -36,6 +42,10 @@ class Axis(Enum): # noqa: PR01
CELL_WISE = None


AxisInt = Literal[0, 1]
"""Type for the two possible integer values of an axis argument (0 or 1)."""


Comment on lines +45 to +48
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? I mean, why would we extend internal dataframe API to also be able to accept AxisInt when we already have Axis enum?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of the codebase (mostly the query compiler) is written to call dataframe methods with a literal int rather than the Axis enum. I think it would be easier to re-wrap the axis with the enum from within dataframe methods (as is done now) than to go through and fix every instance where relevant dataframe methods are called to use the enum instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need this Axis enum then. I really don't like this mixing of Axis, AxisInt, and actual integers for an axis value. I think we should pick only one of the ways of interpreting an axis and then really stick to this, not introducing a variety of axis types in order to cover an existing zoo of value types.

class JoinType(Enum): # noqa: PR01
"""
An enum that represents the `join_type` argument provided to the algebra operators.
Expand Down