Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#6935: Fix Merge failed when right operand is an empty dataframe #6941

Merged
merged 5 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3348,10 +3348,21 @@ def broadcast_apply_full_axis(
PandasDataframe
New Modin DataFrame.
"""

def get_partitions(df):
"""Deal with the corner case if the "other" dataframe has no partitions."""
if df._partitions.size > 0:
return df._partitions
else:
empty_partition = self._partition_mgr_cls.create_partition_from_data(
pandas.DataFrame(index=df.index, columns=df.columns)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make this as a part of combine function?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave out create_partition_from_data. What do you think?

cc @arunjose696, @anmyachev

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean leave it as is? Then this will only work for broadcast_apply_full function; if we want to use this approach with other functions, we will have to duplicate this logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to leave the create_partition_from_data function as is. The issue is indeed in broadcast_apply_full_axis as we dont send any data to remote function for empty partition dataframe, so I assume the issue would be better addressed in this function itself.

The combine function is called only in merge()/join() so even if we use the logic there to create a empty partition(with index and columns), the broadcast_apply_full_axis function would fail for other operations eg df.compare(df2),

To use this approach in other functions, the alternative I think would be to make get_partitions() a method of dataframe(and let it handle the case if partitions are empty) class would this be fine ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use this approach in other functions, the alternative I think would be to make get_partitions() a method of dataframe(and let it handle the case if partitions are empty) class would this be fine ?

This sounds good to me. We could do get_partitions (I would name it as _extract_partitions) as a method of the PandasDataframe class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
empty_partition = self._partition_mgr_cls.create_partition_from_data(
pandas.DataFrame(index=df.index, columns=df.columns)
)
return np.array([[self._partition_mgr_cls._partition_class.put(
pandas.DataFrame(index=df.index, columns=df.columns)
]])

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

committed this, for now but I think the readability would be better with using empty_partition or empty_data_partition as a variable and returning the variable. Is it a better practice to not create a variable and return directly as this would be slightly confusing?

Also wouldnt using self._partition_mgr_cls._partition_class be using the private attributes in a different class, wouldnt this be a bad practice?

return empty_partition
arunjose696 marked this conversation as resolved.
Show resolved Hide resolved

if other is not None:
if not isinstance(other, list):
other = [other]
other = [o._partitions for o in other] if len(other) else None
other = [get_partitions(o) for o in other] if len(other) else None

if apply_indices is not None:
numeric_indices = self.get_axis(axis ^ 1).get_indexer_for(apply_indices)
Expand Down
20 changes: 20 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,23 @@ def preprocess_func(cls, map_func):

# END Abstract Methods

@classmethod
def create_partition_from_data(cls, data):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I keep this function as is or else we would have to use self._partition_mgr_cls._partition_class.put which would be calling a private function from a private attribute?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this method to create_partition_from_metadata?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have renamed and changed arguments to take in a metadata dict as the name would suggest it accepts metadata.

"""
Create NumPy array of partitions that wrapps the given data.

Parameters
----------
data : pandas.DataFrame or pandas.Series
Data that has to be wrapped in partition.

Returns
-------
np.ndarray
A NumPy 2D array of a single partition which contains the data.
"""
return np.array([[cls._partition_class.put(data)]])

@classmethod
def column_partitions(cls, partitions, full_axis=True):
"""
Expand Down Expand Up @@ -1120,6 +1137,9 @@ def to_pandas_remote(df, partition_shape, *dfs):
(df,) + dfs, partition_shape, called_from_remote=True
)

if partitions.size <= 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put this at the beggining of the function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return partitions

preprocessed_func = cls.preprocess_func(to_pandas_remote)
partition_shape = partitions.shape
partitions_flattened = partitions.flatten()
Expand Down
14 changes: 14 additions & 0 deletions modin/pandas/test/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,20 @@ def test_merge(test_data, test_data2):
modin_df.merge("Non-valid type")


def test_merge_empty():
data = np.random.uniform(0, 100, size=(2**6, 2**6))
pandas_df = pandas.DataFrame(data)
pandas_df2 = pandas_df.iloc[:0]
modin_df = pd.DataFrame(data)
modin_df_2 = modin_df.iloc[:0]
modin_result = pd.merge(
modin_df,
modin_df_2,
)
pandas_result = pandas.merge(pandas_df, pandas_df2)
df_equals(modin_result, pandas_result)
YarShev marked this conversation as resolved.
Show resolved Hide resolved


def test_merge_with_mi_columns():
modin_df1, pandas_df1 = create_test_dfs(
{
Expand Down