Skip to content

Commit

Permalink
Update modin/core/dataframe/pandas/partitioning/partition_manager.py
Browse files Browse the repository at this point in the history
Co-authored-by: Anatoly Myachev <anatoliimyachev@mail.com>
  • Loading branch information
arunjose696 and anmyachev committed Feb 7, 2024
1 parent 07522fb commit d6e62ba
Showing 1 changed file with 50 additions and 3 deletions.
53 changes: 50 additions & 3 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,11 +1166,58 @@ def convert_partitions_to_pandas_in_remote_func(cls, partitions):
PandasDataframePartition
A PandasDataframePartition object which holds the data of all partitions.
"""
cls.finalize(partitions)

def call_to_pandas(data, partitions):
return cls.to_pandas(partitions)
def convert_to_pandas(data,partition_shape, *partition_data):

if all(
isinstance(obj, (pandas.DataFrame, pandas.Series))
for obj in partition_data
):
height, width, *_ = tuple(partition_shape) + (0,)
# restore 2d array
objs = iter(partition_data)
partition_data = [
[next(objs) for _ in range(width)] for __ in range(height)
]
else:
# Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables.
# This implementation comes from the fact that calling `partition.get`
# function is not always equivalent to `partition.to_pandas`.
partition_data = [
[obj.to_pandas() for obj in part] for part in partitions
]
if all(
isinstance(part, pandas.Series) for row in partition_data for part in row
):
axis = 0
elif all(
isinstance(part, pandas.DataFrame)
for row in partition_data
for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)

def is_part_empty(part):
return part.empty and (
not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0)
)

return partitions.flat[0].apply(call_to_pandas, partitions=partitions)
df_rows = [
pandas.concat([part for part in row], axis=axis, copy=False)
for row in partition_data
if not all(is_part_empty(part) for part in row)
]
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return concatenate(df_rows)
preprocessed_func = cls.preprocess_func(convert_to_pandas)
getdata = lambda partition: partition._data
partition_data = list(np.vectorize(getdata)(partitions).flatten())
return partitions.flat[0].apply(preprocessed_func,partitions.shape, *partition_data)

@classmethod
@wait_computations_if_benchmark_mode
Expand Down

0 comments on commit d6e62ba

Please sign in to comment.