Skip to content

Commit

Permalink
calling to_pandas in remote function
Browse files Browse the repository at this point in the history
Signed-off-by: arunjose696 <arunjose696@gmail.com>
  • Loading branch information
arunjose696 committed Feb 5, 2024
1 parent 23ec81a commit 07522fb
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 15 deletions.
24 changes: 13 additions & 11 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2703,25 +2703,27 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe":
partitions, new_index, new_columns, row_lengths, column_widths
)

def force_materialization(self) -> "PandasDataframe":
def to_pandas_in_remote_function(self) -> "PandasDataframe":
"""
Materialize axis partitions into a single partition.
Applies the identity function first across rows, and thereafter across columns
to get a df single partition.
Create a PandasFrame with single partition from the partitions of the current dataframe.
Returns
-------
PandasDataframe
An PandasDataframe containing only a single partition.
"""
row_partitions = self._partition_mgr_cls.row_partitions(self._partitions)
col_partition = self._partition_mgr_cls.column_partitions(
np.asarray([row_partitions]).T
partition_with_to_pandas_data = [
self._partition_mgr_cls.convert_partitions_to_pandas_in_remote_func(
self._partitions
)
]
partitions = np.array(partition_with_to_pandas_data).reshape(1, -1)

result = self.__constructor__(
partitions, self.index, columns=self.columns, dtypes=self.dtypes
)
new_frame = np.array([col_partition[0].apply(lambda df: df, num_splits=1)])
single_partition_df = self.__constructor__(new_frame)
return single_partition_df
result.synchronize_labels()
return result

@lazy_metadata_decorator(apply_axis="both")
def apply_full_axis(
Expand Down
24 changes: 24 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,30 @@ def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs):
preprocessed_func = cls.preprocess_func(func)
return [obj.apply(preprocessed_func, **kwargs) for obj in partitions]

@classmethod
def convert_partitions_to_pandas_in_remote_func(cls, partitions):
"""
Call to_pandas for the given NumPy array of partitions.
For a NumPy array of partitions create a single partition
that has the data for all partitions.
Parameters
----------
partitions : np.ndarray
The partitions which have to be converted to a single partition.
Returns
-------
PandasDataframePartition
A PandasDataframePartition object which holds the data of all partitions.
"""

def call_to_pandas(data, partitions):
return cls.to_pandas(partitions)

return partitions.flat[0].apply(call_to_pandas, partitions=partitions)

@classmethod
@wait_computations_if_benchmark_mode
def apply_func_to_select_indices(
Expand Down
9 changes: 5 additions & 4 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ def merge(self, right, **kwargs):
left_index = kwargs.get("left_index", False)
right_index = kwargs.get("right_index", False)
sort = kwargs.get("sort", False)
right = self.__constructor__(right._modin_frame.force_materialization())
right_to_broadcast = right._modin_frame.to_pandas_in_remote_function()

if how in ["left", "inner"] and left_index is False and right_index is False:
kwargs["sort"] = False

Expand Down Expand Up @@ -614,13 +615,12 @@ def map_func(
.set_index(list(left_renamer.values()))
)
new_dtypes = ModinDtypes.concat([left_dtypes, right_dtypes])

new_self = self.__constructor__(
self._modin_frame.broadcast_apply_full_axis(
axis=1,
func=map_func,
enumerate_partitions=how == "left",
other=right._modin_frame,
other=right_to_broadcast,
# We're going to explicitly change the shape across the 1-axis,
# so we want for partitioning to adapt as well
keep_partitioning=False,
Expand Down Expand Up @@ -681,6 +681,7 @@ def join(self, right, **kwargs):
on = kwargs.get("on", None)
how = kwargs.get("how", "left")
sort = kwargs.get("sort", False)
right_to_broadcast = right._modin_frame.to_pandas_in_remote_function()

if how in ["left", "inner"]:

Expand All @@ -697,7 +698,7 @@ def map_func(left, right, kwargs=kwargs): # pragma: no cover
num_splits=merge_partitioning(
self._modin_frame, right._modin_frame, axis=1
),
other=right._modin_frame,
other=right_to_broadcast,
)
)
return new_self.sort_rows_by_column_values(on) if sort else new_self
Expand Down

0 comments on commit 07522fb

Please sign in to comment.