Skip to content

Commit

Permalink
removing duplicated code
Browse files Browse the repository at this point in the history
Signed-off-by: arunjose696 <arunjose696@gmail.com>
  • Loading branch information
arunjose696 committed Feb 12, 2024
1 parent 12826e0 commit ff5639d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 91 deletions.
96 changes: 5 additions & 91 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
PersistentPickle,
ProgressBar,
)
from modin.core.dataframe.pandas.utils import concatenate
from modin.core.dataframe.pandas.utils import (
create_dataframe_from_partition_data,
)
from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.error_message import ErrorMessage
from modin.logging import ClassLogger
Expand Down Expand Up @@ -781,50 +783,7 @@ def to_pandas(cls, partitions):
A pandas DataFrame
"""
retrieved_objects = cls.get_objects_from_partitions(partitions.flatten())
if all(
isinstance(obj, (pandas.DataFrame, pandas.Series))
for obj in retrieved_objects
):
height, width, *_ = tuple(partitions.shape) + (0,)
# restore 2d array
objs = iter(retrieved_objects)
retrieved_objects = [
[next(objs) for _ in range(width)] for __ in range(height)
]
else:
# Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables.
# This implementation comes from the fact that calling `partition.get`
# function is not always equivalent to `partition.to_pandas`.
retrieved_objects = [
[obj.to_pandas() for obj in part] for part in partitions
]
if all(
isinstance(part, pandas.Series) for row in retrieved_objects for part in row
):
axis = 0
elif all(
isinstance(part, pandas.DataFrame)
for row in retrieved_objects
for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)

def is_part_empty(part):
return part.empty and (
not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0)
)

df_rows = [
pandas.concat([part for part in row], axis=axis, copy=False)
for row in retrieved_objects
if not all(is_part_empty(part) for part in row)
]
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return concatenate(df_rows)
create_dataframe_from_partition_data(retrieved_objects, partitions.shape)

@classmethod
def to_numpy(cls, partitions, **kwargs):
Expand Down Expand Up @@ -1159,52 +1118,7 @@ def combine(cls, partitions):

def to_pandas_remote(data, partition_shape, *partition_data):
"""Copy of ``cls.to_pandas()`` method adapted for a remote function."""
if all(
isinstance(obj, (pandas.DataFrame, pandas.Series))
for obj in partition_data
):
height, width, *_ = tuple(partition_shape) + (0,)
# restore 2d array
objs = iter(partition_data)
partition_data = [
[next(objs) for _ in range(width)] for __ in range(height)
]
else:
# Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables.
# This implementation comes from the fact that calling `partition.get`
# function is not always equivalent to `partition.to_pandas`.
partition_data = [
[obj.to_pandas() for obj in part] for part in partition_data
]
if all(
isinstance(part, pandas.Series)
for row in partition_data
for part in row
):
axis = 0
elif all(
isinstance(part, pandas.DataFrame)
for row in partition_data
for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)

def is_part_empty(part):
return part.empty and (
not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0)
)

df_rows = [
pandas.concat([part for part in row], axis=axis, copy=False)
for row in partition_data
if not all(is_part_empty(part) for part in row)
]
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return concatenate(df_rows)
return create_dataframe_from_partition_data(partition_data, partition_shape)

preprocessed_func = cls.preprocess_func(to_pandas_remote)
partition_shape = partitions.shape
Expand Down
52 changes: 52 additions & 0 deletions modin/core/dataframe/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import pandas
from pandas.api.types import union_categoricals
from modin.error_message import ErrorMessage


def concatenate(dfs):
Expand Down Expand Up @@ -65,3 +66,54 @@ def concatenate(dfs):
# so do it explicitly
return dfs[0].copy()
return pandas.concat(dfs, copy=True)


def create_dataframe_from_partition_data(partition_data, partition_shape):
"""
Convert partition data of multiple dataframes to a single dataframe.
Parameters
----------
partition_data : list
List of pandas DataFrames/Object references holding pandas DataFrames.
Returns
-------
pandas.DataFrame
A pandas DataFrame.
"""
if all(
isinstance(obj, (pandas.DataFrame, pandas.Series)) for obj in partition_data
):
height, width, *_ = tuple(partition_shape) + (0,)
# restore 2d array
objs = iter(partition_data)
partition_data = [[next(objs) for _ in range(width)] for __ in range(height)]
else:
# Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables.
# This implementation comes from the fact that calling `partition.get`
# function is not always equivalent to `partition.to_pandas`.
partition_data = [[obj.to_pandas() for obj in part] for part in partition_data]
if all(isinstance(part, pandas.Series) for row in partition_data for part in row):
axis = 0
elif all(
isinstance(part, pandas.DataFrame) for row in partition_data for part in row
):
axis = 1
else:
ErrorMessage.catch_bugs_and_request_email(True)

def is_part_empty(part):
return part.empty and (
not isinstance(part, pandas.DataFrame) or (len(part.columns) == 0)
)

df_rows = [
pandas.concat([part for part in row], axis=axis, copy=False)
for row in partition_data
if not all(is_part_empty(part) for part in row)
]
if len(df_rows) == 0:
return pandas.DataFrame()
else:
return concatenate(df_rows)

0 comments on commit ff5639d

Please sign in to comment.