Skip to content

Commit

Permalink
Fixing partitioning offset (#151)
Browse files Browse the repository at this point in the history
* Fixing partitioning offset

* Formatting

* Move import

* Fixing cases where number of partitions changes

* Fixing series issue

* Add docs
  • Loading branch information
devin-petersohn committed Oct 11, 2018
1 parent 8eb0965 commit d2c5e7a
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions modin/data_management/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import division
from __future__ import print_function

import numpy as np
import pandas
import ray

Expand Down Expand Up @@ -143,18 +144,27 @@ class RayRowPartition(RayAxisPartition):
axis = 1


def split_result_of_axis_func_pandas(axis, num_splits, result):
def split_result_of_axis_func_pandas(axis, num_splits, result, length_list=None):
"""Split the Pandas result evenly based on the provided number of splits.
Args:
axis: The axis to split across.
num_splits: The number of even splits to create.
result: The result of the computation. This should be a Pandas
DataFrame.
length_list: The list of lengths to split this DataFrame into. This is used to
return the DataFrame to its original partitioning schema.
Returns:
A list of Pandas DataFrames.
"""
if length_list is not None:
length_list.insert(0, 0)
sums = np.cumsum(length_list)
if axis == 0:
return [result.iloc[sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]
else:
return [result.iloc[:, sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]
# We do this to restore block partitioning
if axis == 0 or type(result) is pandas.Series:
chunksize = compute_chunksize(len(result), num_splits)
Expand Down Expand Up @@ -186,7 +196,18 @@ def deploy_ray_axis_func(axis, func, num_splits, kwargs, *partitions):
"""
dataframe = pandas.concat(partitions, axis=axis, copy=False)
result = func(dataframe, **kwargs)
return split_result_of_axis_func_pandas(axis, num_splits, result)
if num_splits != len(partitions) or isinstance(result, pandas.Series):
lengths = None
else:
if axis == 0:
lengths = [len(part) for part in partitions]
if sum(lengths) != len(result):
lengths = None
else:
lengths = [len(part.columns) for part in partitions]
if sum(lengths) != len(result.columns):
lengths = None
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)


@ray.remote
Expand Down

0 comments on commit d2c5e7a

Please sign in to comment.