Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#6879: Convert the right DF to single partition before broadcasting in query_compiler.merge #6880

Merged
merged 16 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2762,6 +2762,35 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe":
partitions, new_index, new_columns, row_lengths, column_widths
)

def combine(self) -> "PandasDataframe":
"""
Create a single partition PandasDataframe from the partitions of the current dataframe.

Returns
-------
PandasDataframe
A single partition PandasDataframe.
"""
partitions = self._partition_mgr_cls.combine(self._partitions)
result = self.__constructor__(
partitions,
index=self.copy_index_cache(),
columns=self.copy_columns_cache(),
row_lengths=(
[sum(self._row_lengths_cache)]
if self._row_lengths_cache is not None
else None
),
column_widths=(
[sum(self._column_widths_cache)]
if self._column_widths_cache is not None
else None
),
dtypes=self.copy_dtypes_cache(),
)
result.synchronize_labels()
YarShev marked this conversation as resolved.
Show resolved Hide resolved
return result

@lazy_metadata_decorator(apply_axis="both")
def apply_full_axis(
self,
Expand Down
83 changes: 38 additions & 45 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
PersistentPickle,
ProgressBar,
)
from modin.core.dataframe.pandas.utils import concatenate
from modin.core.dataframe.pandas.utils import create_dataframe_from_partition_data
from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.error_message import ErrorMessage
from modin.logging import ClassLogger
Expand Down Expand Up @@ -781,50 +781,7 @@
A pandas DataFrame
"""
retrieved_objects = cls.get_objects_from_partitions(partitions.flatten())
if all(
isinstance(obj, (pandas.DataFrame, pandas.Series))
for obj in retrieved_objects
):
height, width, *_ = tuple(partitions.shape) + (0,)
# restore 2d array
objs = iter(retrieved_objects)
retrieved_objects = [
[next(objs) for _ in range(width)] for __ in range(height)
]
else:
# Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables.
# This implementation comes from the fact that calling `partition.get`
# function is not always equivalent to `partition.to_pandas`.
retrieved_objects = [
[obj.to_pandas() for obj in part] for part in partitions
]
if all(
isinstance(part, pandas.Series) for row in retrieved_objects for part in row
):
axis = 0
elif all(
isinstance(part, pandas.DataFrame)
for row in retrieved_objects
for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)

def is_part_empty(part):
return part.empty and (
not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0)
)

df_rows = [
pandas.concat([part for part in row], axis=axis, copy=False)
for row in retrieved_objects
if not all(is_part_empty(part) for part in row)
]
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return concatenate(df_rows)
return create_dataframe_from_partition_data(retrieved_objects, partitions.shape)

@classmethod
def to_numpy(cls, partitions, **kwargs):
Expand Down Expand Up @@ -1141,6 +1098,42 @@
preprocessed_func = cls.preprocess_func(func)
return [obj.apply(preprocessed_func, **kwargs) for obj in partitions]

@classmethod
def combine(cls, partitions):
"""
Convert a NumPy 2D array of partitions to a NumPy 2D array of a single partition.

Parameters
----------
partitions : np.ndarray
The partitions which have to be converted to a single partition.

Returns
-------
np.ndarray
A NumPy 2D array of a single partition.
"""

def to_pandas_remote(data, partition_shape, *partition_data):
arunjose696 marked this conversation as resolved.
Show resolved Hide resolved
Fixed Show fixed Hide fixed
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
YarShev marked this conversation as resolved.
Show resolved Hide resolved
YarShev marked this conversation as resolved.
Show resolved Hide resolved
"""Copy of ``cls.to_pandas()`` method adapted for a remote function."""
return create_dataframe_from_partition_data(
partition_data, partition_shape, called_from_remote=True
YarShev marked this conversation as resolved.
Show resolved Hide resolved
)
Fixed Show fixed Hide fixed

preprocessed_func = cls.preprocess_func(to_pandas_remote)
Fixed Show fixed Hide fixed
partition_shape = partitions.shape
partitions_flattened = partitions.flatten()
for idx, part in enumerate(partitions_flattened):
if hasattr(part, "force_materialization"):
partitions_flattened[idx] = part.force_materialization()

Check warning on line 1128 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L1128

Added line #L1128 was not covered by tests
partition_refs = [
partition.list_of_blocks[0] for partition in partitions_flattened
YarShev marked this conversation as resolved.
Show resolved Hide resolved
]
YarShev marked this conversation as resolved.
Show resolved Hide resolved
combined_partition = partitions.flat[0].apply(
preprocessed_func, partition_shape, *partition_refs
)
return np.array([combined_partition]).reshape(1, -1)

@classmethod
@wait_computations_if_benchmark_mode
def apply_func_to_select_indices(
Expand Down
68 changes: 65 additions & 3 deletions modin/core/dataframe/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import pandas
from pandas.api.types import union_categoricals

from modin.error_message import ErrorMessage

def concatenate(dfs):

def concatenate(dfs, make_copy=True):
"""
Concatenate pandas DataFrames with saving 'category' dtype.

Expand All @@ -28,6 +30,8 @@
----------
dfs : list
List of pandas DataFrames to concatenate.
make_copy : bool, default: True
YarShev marked this conversation as resolved.
Show resolved Hide resolved
Make explicit copy when creating dataframe.

Returns
-------
Expand Down Expand Up @@ -60,8 +64,66 @@
i, pandas.Categorical(df.iloc[:, i], categories=union.categories)
)
# `ValueError: buffer source array is read-only` if copy==False
if len(dfs) == 1:
if len(dfs) == 1 and make_copy:
# concat doesn't make a copy if len(dfs) == 1,
# so do it explicitly
return dfs[0].copy()
return pandas.concat(dfs, copy=True)
return pandas.concat(dfs, copy=make_copy)


def create_dataframe_from_partition_data(
YarShev marked this conversation as resolved.
Show resolved Hide resolved
partition_data, partition_shape, called_from_remote=False
):
"""
Convert partition data of multiple dataframes to a single dataframe.

Parameters
----------
partition_data : list
List of pandas DataFrames or list of Object references holding pandas DataFrames.
partition_shape : int or tuple
Shape of the partitions NumPy array.
called_from_remote : bool, default: False
Flag used to check if explicit copy should be done in concat.

Returns
-------
pandas.DataFrame
A pandas DataFrame.
"""
if all(
isinstance(obj, (pandas.DataFrame, pandas.Series)) for obj in partition_data
):
height, width, *_ = tuple(partition_shape) + (0,)
# restore 2d array
objs = iter(partition_data)
partition_data = [[next(objs) for _ in range(width)] for __ in range(height)]
else:
# Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables.
# This implementation comes from the fact that calling `partition.get`
# function is not always equivalent to `partition.to_pandas`.
partition_data = [[obj.to_pandas() for obj in part] for part in partition_data]

Check warning on line 105 in modin/core/dataframe/pandas/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/utils.py#L105

Added line #L105 was not covered by tests
if all(isinstance(part, pandas.Series) for row in partition_data for part in row):
axis = 0
elif all(
isinstance(part, pandas.DataFrame) for row in partition_data for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)

Check warning on line 113 in modin/core/dataframe/pandas/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/utils.py#L113

Added line #L113 was not covered by tests

def is_part_empty(part):
return part.empty and (
not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0)
)

df_rows = [
pandas.concat([part for part in row], axis=axis, copy=False)
for row in partition_data
if not all(is_part_empty(part) for part in row)
]
make_copy = not called_from_remote
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return concatenate(df_rows, make_copy)
6 changes: 4 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ def merge(self, right, **kwargs):
left_index = kwargs.get("left_index", False)
right_index = kwargs.get("right_index", False)
sort = kwargs.get("sort", False)
right_to_broadcast = right._modin_frame.combine()

if how in ["left", "inner"] and left_index is False and right_index is False:
kwargs["sort"] = False
Expand Down Expand Up @@ -620,7 +621,7 @@ def map_func(
axis=1,
func=map_func,
enumerate_partitions=how == "left",
other=right._modin_frame,
other=right_to_broadcast,
# We're going to explicitly change the shape across the 1-axis,
# so we want for partitioning to adapt as well
keep_partitioning=False,
Expand Down Expand Up @@ -681,6 +682,7 @@ def join(self, right, **kwargs):
on = kwargs.get("on", None)
how = kwargs.get("how", "left")
sort = kwargs.get("sort", False)
right_to_broadcast = right._modin_frame.combine()

if how in ["left", "inner"]:

Expand All @@ -697,7 +699,7 @@ def map_func(left, right, kwargs=kwargs): # pragma: no cover
num_splits=merge_partitioning(
self._modin_frame, right._modin_frame, axis=1
),
other=right._modin_frame,
other=right_to_broadcast,
)
)
return new_self.sort_rows_by_column_values(on) if sort else new_self
Expand Down