Skip to content

Commit

Permalink
adding _extract_partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
arunjose696 committed Feb 22, 2024
1 parent b1d145f commit 11eb8ab
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
37 changes: 20 additions & 17 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3203,6 +3203,25 @@ def _prepare_frame_to_broadcast(self, axis, indices, broadcast_all):
passed_len += len(internal)
return result_dict

def _extract_partitions(self):
"""
Extract partitions if partitions are present.
If partitions are empty return a dummy partition with empty data but
index and columns of current dataframe.
Returns
-------
np.ndarray
NumPy array with extracted partitions.
"""
if self._partitions.size > 0:
return self._partitions
else:
return self._partition_mgr_cls.create_partition_from_metadata(

Check warning on line 3221 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L3221

Added line #L3221 was not covered by tests
index=self.index, columns=self.columns
)

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply_select_indices(
self,
Expand Down Expand Up @@ -3348,26 +3367,10 @@ def broadcast_apply_full_axis(
PandasDataframe
New Modin DataFrame.
"""

def get_partitions(df):
"""Deal with the corner case if the "other" dataframe has no partitions."""
if df._partitions.size > 0:
return df._partitions
else:
return np.array(
[
[
self._partition_mgr_cls._partition_class.put(
pandas.DataFrame(index=df.index, columns=df.columns)
)
]
]
)

if other is not None:
if not isinstance(other, list):
other = [other]
other = [get_partitions(o) for o in other] if len(other) else None
other = [o._extract_partitions() for o in other] if len(other) else None

if apply_indices is not None:
numeric_indices = self.get_axis(axis ^ 1).get_indexer_for(apply_indices)
Expand Down
17 changes: 9 additions & 8 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,21 +176,22 @@ def preprocess_func(cls, map_func):
# END Abstract Methods

@classmethod
def create_partition_from_data(cls, data):
def create_partition_from_metadata(cls, **metadata):
"""
Create NumPy array of partitions that wrapps the given data.
Create NumPy array of partitions that holds an empty dataframe with given metadata.
Parameters
----------
data : pandas.DataFrame or pandas.Series
Data that has to be wrapped in partition.
metadata : dict
Metadata that has to be wrapped in a partition.
Returns
-------
np.ndarray
A NumPy 2D array of a single partition which contains the data.
"""
return np.array([[cls._partition_class.put(data)]])
metadata_dataframe = pandas.DataFrame(**metadata)
return np.array([[cls._partition_class.put(metadata_dataframe)]])

Check warning on line 194 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L193-L194

Added lines #L193 - L194 were not covered by tests

@classmethod
def column_partitions(cls, partitions, full_axis=True):
Expand Down Expand Up @@ -1131,15 +1132,15 @@ def combine(cls, partitions):
A NumPy 2D array of a single partition.
"""

if partitions.size <= 1:
return partitions

def to_pandas_remote(df, partition_shape, *dfs):
"""Copy of ``cls.to_pandas()`` method adapted for a remote function."""
return create_pandas_df_from_partitions(
(df,) + dfs, partition_shape, called_from_remote=True
)

if partitions.size <= 1:
return partitions

preprocessed_func = cls.preprocess_func(to_pandas_remote)
partition_shape = partitions.shape
partitions_flattened = partitions.flatten()
Expand Down

0 comments on commit 11eb8ab

Please sign in to comment.