Skip to content

Commit

Permalink
FEAT-#3111: Ensure relabeling Modin Frame does not lose partition sha…
Browse files Browse the repository at this point in the history
…pe (#3662)

Co-authored-by: Devin Petersohn <devin.petersohn@gmail.com>
Signed-off-by: Naren Krishna <naren@ponder.io>
  • Loading branch information
naren-ponder and devin-petersohn committed Jan 10, 2022
1 parent ab2855b commit 3c740db
Showing 1 changed file with 145 additions and 5 deletions.
150 changes: 145 additions & 5 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,98 @@
from modin.pandas.utils import is_full_grab_slice


def lazy_metadata_decorator(apply_axis=None, axis_arg=-1, transpose=False):
"""
Lazily propagate metadata for the ``PandasDataframe``.
This decorator first adds the minimum required reindexing operations
to each partition's queue of functions to be lazily applied for
each PandasDataframe in the arguments by applying the function
run_f_on_minimally_updated_metadata. The decorator also sets the
flags for deferred metadata synchronization on the function result
if necessary.
Parameters
----------
apply_axis : str, default: None
The axes on which to apply the reindexing operations to the `self._partitions` lazily.
Case None: No lazy metadata propagation.
Case "both": Add reindexing operations on both axes to partition queue.
Case "opposite": Add reindexing operations complementary to given axis.
Case "rows": Add reindexing operations on row axis to partition queue.
axis_arg : int, default: -1
The index or column axis.
transpose : bool, default: False
Boolean for if a transpose operation is being used.
Returns
-------
Wrapped Function.
"""

def decorator(f):
from functools import wraps

@wraps(f)
def run_f_on_minimally_updated_metadata(self, *args, **kwargs):
for obj in (
[self]
+ [o for o in args if isinstance(o, PandasDataframe)]
+ [v for v in kwargs.values() if isinstance(v, PandasDataframe)]
+ [
d
for o in args
if isinstance(o, list)
for d in o
if isinstance(d, PandasDataframe)
]
+ [
d
for _, o in kwargs.items()
if isinstance(o, list)
for d in o
if isinstance(d, PandasDataframe)
]
):
if apply_axis == "both":
if obj._deferred_index and obj._deferred_column:
obj._propagate_index_objs(axis=None)
elif obj._deferred_index:
obj._propagate_index_objs(axis=0)
elif obj._deferred_column:
obj._propagate_index_objs(axis=1)
elif apply_axis == "opposite":
if "axis" not in kwargs:
axis = args[axis_arg]
else:
axis = kwargs["axis"]
if axis == 0 and obj._deferred_column:
obj._propagate_index_objs(axis=1)
elif axis == 1 and obj._deferred_index:
obj._propagate_index_objs(axis=0)
elif apply_axis == "rows":
obj._propagate_index_objs(axis=0)
result = f(self, *args, **kwargs)
if apply_axis is None and not transpose:
result._deferred_index = self._deferred_index
result._deferred_column = self._deferred_column
elif apply_axis is None and transpose:
result._deferred_index = self._deferred_column
result._deferred_column = self._deferred_index
elif apply_axis == "opposite":
if axis == 0:
result._deferred_index = self._deferred_index
else:
result._deferred_column = self._deferred_column
elif apply_axis == "rows":
result._deferred_column = self._deferred_column
return result

return run_f_on_minimally_updated_metadata

return decorator


class PandasDataframe(object):
"""
An abstract class that represents the parent class for any pandas storage format dataframe class.
Expand All @@ -59,6 +151,9 @@ class PandasDataframe(object):

_partition_mgr_cls = None
_query_compiler_cls = PandasQueryCompiler
# These properties flag whether or not we are deferring the metadata synchronization
_deferred_index = False
_deferred_column = False

@property
def __constructor__(self):
Expand Down Expand Up @@ -334,6 +429,24 @@ def _filter_empties(self):
self._row_lengths_cache = [r for r in self._row_lengths if r != 0]

def synchronize_labels(self, axis=None):
"""
Set the deferred axes variables for the ``PandasDataframe``.
Parameters
----------
axis : int, default: None
The deferred axis.
0 for the index, 1 for the columns.
"""
if axis is None:
self._deferred_index = True
self._deferred_column = True
elif axis == 0:
self._deferred_index = True
else:
self._deferred_column = True

def _propagate_index_objs(self, axis=None):
"""
Synchronize labels by applying the index object for specific `axis` to the `self._partitions` lazily.
Expand Down Expand Up @@ -375,6 +488,8 @@ def apply_idx_objs(df, idx, cols):
for i in range(len(self._partitions))
]
)
self._deferred_index = False
self._deferred_column = False
elif axis == 0:

def apply_idx_objs(df, idx):
Expand All @@ -394,6 +509,7 @@ def apply_idx_objs(df, idx):
for i in range(len(self._partitions))
]
)
self._deferred_index = False
elif axis == 1:

def apply_idx_objs(df, cols):
Expand All @@ -416,7 +532,9 @@ def apply_idx_objs(df, cols):
ErrorMessage.catch_bugs_and_request_email(
axis is not None and axis not in [0, 1]
)
self._deferred_column = False

@lazy_metadata_decorator(apply_axis=None)
def mask(
self,
row_indices=None,
Expand Down Expand Up @@ -615,6 +733,7 @@ def mask(
row_numeric_idx=new_row_order, col_numeric_idx=new_col_order
)

@lazy_metadata_decorator(apply_axis="rows")
def from_labels(self) -> "PandasDataframe":
"""
Convert the row labels to a column of data, inserted at the first position.
Expand Down Expand Up @@ -688,8 +807,8 @@ def from_labels_executor(df, **kwargs):
row_lengths=self._row_lengths_cache,
column_widths=new_column_widths,
)
# Propagate the new row labels to the all dataframe partitions
result.synchronize_labels(0)
# Set flag for propagating deferred row labels across dataframe partitions
result.synchronize_labels(axis=0)
return result

def to_labels(self, column_list: List[Hashable]) -> "PandasDataframe":
Expand Down Expand Up @@ -717,6 +836,7 @@ def to_labels(self, column_list: List[Hashable]) -> "PandasDataframe":
result.index = new_labels
return result

@lazy_metadata_decorator(apply_axis="both")
def _reorder_labels(self, row_numeric_idx=None, col_numeric_idx=None):
"""
Reorder the column and or rows in this DataFrame.
Expand Down Expand Up @@ -753,6 +873,7 @@ def _reorder_labels(self, row_numeric_idx=None, col_numeric_idx=None):
col_idx = self.columns
return self.__constructor__(ordered_cols, row_idx, col_idx)

@lazy_metadata_decorator(apply_axis=None)
def copy(self):
"""
Copy this object.
Expand Down Expand Up @@ -800,6 +921,7 @@ def combine_dtypes(cls, list_of_dtypes, column_names):
dtypes.index = column_names
return dtypes

@lazy_metadata_decorator(apply_axis="both")
def astype(self, col_dtypes):
"""
Convert the columns dtypes to given dtypes.
Expand Down Expand Up @@ -1145,7 +1267,6 @@ def make_reindexer(do_reindex: bool, frame_idx: int):
copy=True,
allow_dups=True,
)

return lambda df: df.reindex(joined_index, axis=axis)

return joined_index, make_reindexer
Expand Down Expand Up @@ -1231,6 +1352,7 @@ def _compute_map_reduce_metadata(self, axis, new_parts):
)
return result

@lazy_metadata_decorator(apply_axis="both")
def fold_reduce(self, axis, func):
"""
Apply function that reduces Frame Manager to series but requires knowledge of full axis.
Expand All @@ -1253,6 +1375,7 @@ def fold_reduce(self, axis, func):
)
return self._compute_map_reduce_metadata(axis, new_parts)

@lazy_metadata_decorator(apply_axis="opposite", axis_arg=0)
def map_reduce(self, axis, map_func, reduce_func=None):
"""
Apply function that will reduce the data to a pandas Series.
Expand Down Expand Up @@ -1284,6 +1407,7 @@ def map_reduce(self, axis, map_func, reduce_func=None):
)
return self._compute_map_reduce_metadata(axis, reduce_parts)

@lazy_metadata_decorator(apply_axis=None)
def map(self, func, dtypes=None):
"""
Perform a function that maps across the entire dataset.
Expand Down Expand Up @@ -1318,6 +1442,7 @@ def map(self, func, dtypes=None):
dtypes=dtypes,
)

@lazy_metadata_decorator(apply_axis="both")
def fold(self, axis, func):
"""
Perform a function across an entire axis.
Expand Down Expand Up @@ -1349,6 +1474,7 @@ def fold(self, axis, func):
self._column_widths,
)

@lazy_metadata_decorator(apply_axis="both")
def filter_full_axis(self, axis, func):
"""
Filter data based on the function provided along an entire axis.
Expand Down Expand Up @@ -1384,6 +1510,7 @@ def filter_full_axis(self, axis, func):
self.dtypes if axis == 0 else None,
)

@lazy_metadata_decorator(apply_axis="both")
def explode(self, axis, func):
"""
Explode list-like entries along an entire axis.
Expand Down Expand Up @@ -1412,6 +1539,7 @@ def explode(self, axis, func):
new_columns = self._compute_axis_labels(1, partitions)
return self.__constructor__(partitions, new_index, new_columns)

@lazy_metadata_decorator(apply_axis="both")
def apply_full_axis(
self,
axis,
Expand Down Expand Up @@ -1458,6 +1586,7 @@ def apply_full_axis(
other=None,
)

@lazy_metadata_decorator(apply_axis="both")
def apply_full_axis_select_indices(
self,
axis,
Expand Down Expand Up @@ -1519,6 +1648,7 @@ def apply_full_axis_select_indices(
new_columns = self.columns if axis == 0 else None
return self.__constructor__(new_partitions, new_index, new_columns, None, None)

@lazy_metadata_decorator(apply_axis="both")
def apply_select_indices(
self,
axis,
Expand Down Expand Up @@ -1625,6 +1755,7 @@ def apply_select_indices(
self._column_widths_cache,
)

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply(
self, axis, func, other, join_type="left", preserve_labels=True, dtypes=None
):
Expand Down Expand Up @@ -1720,6 +1851,7 @@ def get_len(part):
passed_len += len(internal)
return result_dict

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply_select_indices(
self,
axis,
Expand Down Expand Up @@ -1807,6 +1939,7 @@ def broadcast_apply_select_indices(

return self.__constructor__(new_partitions, *new_axes)

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply_full_axis(
self,
axis,
Expand Down Expand Up @@ -1891,9 +2024,9 @@ def broadcast_apply_full_axis(
dtypes,
)
if new_index is not None:
result.synchronize_labels(0)
result.synchronize_labels(axis=0)
if new_columns is not None:
result.synchronize_labels(1)
result.synchronize_labels(axis=1)
return result

def _copartition(self, axis, other, how, sort, force_repartition=False):
Expand Down Expand Up @@ -2007,6 +2140,7 @@ def get_axis_lengths(partitions, axis):
)
return reindexed_frames[0], reindexed_frames[1:], joined_index

@lazy_metadata_decorator(apply_axis="both")
def binary_op(self, op, right_frame, join_type="outer"):
"""
Perform an operation that requires joining with another Modin DataFrame.
Expand Down Expand Up @@ -2036,6 +2170,7 @@ def binary_op(self, op, right_frame, join_type="outer"):
new_columns = self.columns.join(right_frame.columns, how=join_type)
return self.__constructor__(new_frame, joined_index, new_columns, None, None)

@lazy_metadata_decorator(apply_axis="both")
def concat(self, axis, others, how, sort):
"""
Concatenate `self` with one or more other Modin DataFrames.
Expand Down Expand Up @@ -2104,6 +2239,7 @@ def concat(self, axis, others, how, sort):
new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes
)

@lazy_metadata_decorator(apply_axis="opposite", axis_arg=0)
def groupby_reduce(
self,
axis,
Expand Down Expand Up @@ -2142,6 +2278,8 @@ def groupby_reduce(
New Modin DataFrame.
"""
by_parts = by if by is None else by._partitions
if by is None:
self._propagate_index_objs(axis=0)

if apply_indices is not None:
numeric_indices = self.axes[axis ^ 1].get_indexer_for(apply_indices)
Expand Down Expand Up @@ -2244,6 +2382,7 @@ def _arrow_type_to_dtype(cls, arrow_type):
return np.dtype(res)
return res

@lazy_metadata_decorator(apply_axis="both")
def to_pandas(self):
"""
Convert this Modin DataFrame to a pandas DataFrame.
Expand Down Expand Up @@ -2281,6 +2420,7 @@ def to_numpy(self, **kwargs):
"""
return self._partition_mgr_cls.to_numpy(self._partitions, **kwargs)

@lazy_metadata_decorator(apply_axis=None, transpose=True)
def transpose(self):
"""
Transpose the index and columns of this Modin DataFrame.
Expand Down

0 comments on commit 3c740db

Please sign in to comment.