Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Fix df/series.{apply, map_chunk} when some chunk output empty data (#2434) #2437

Merged
merged 1 commit into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions mars/dataframe/base/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ...utils import enter_current_session, quiet_stdio
from ..operands import DataFrameOperandMixin, DataFrameOperand
from ..utils import build_df, build_series, parse_index, validate_axis, \
validate_output_types, make_dtypes, make_dtype
validate_output_types, make_dtypes, make_dtype, build_empty_df, build_empty_series


class ApplyOperand(DataFrameOperand, DataFrameOperandMixin):
Expand Down Expand Up @@ -86,13 +86,22 @@ def kwds(self):
@enter_current_session
def execute(cls, ctx, op):
input_data = ctx[op.inputs[0].key]
out = op.outputs[0]
if len(input_data) == 0:
if op.output_types[0] == OutputType.dataframe:
ctx[out.key] = build_empty_df(out.dtypes)
else:
ctx[out.key] = build_empty_series(
out.dtype, name=out.name)
return

if isinstance(input_data, pd.DataFrame):
result = input_data.apply(op.func, axis=op.axis, raw=op.raw, result_type=op.result_type,
args=op.args, **op.kwds)
else:
result = input_data.apply(op.func, convert_dtype=op.convert_dtype, args=op.args,
**op.kwds)
ctx[op.outputs[0].key] = result
ctx[out.key] = result

@classmethod
def _tile_df(cls, op):
Expand Down Expand Up @@ -285,7 +294,6 @@ def _call_series(self, series, dtypes=None, dtype=None, name=None, index=None):
else:
raise TypeError('Cannot determine dtypes, '
'please specify `dtypes` as argument')

columns_value = parse_index(dtypes.index, store_data=True)

return self.new_dataframe([series], shape=(series.shape[0], len(dtypes)),
Expand Down
17 changes: 13 additions & 4 deletions mars/dataframe/base/map_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ...utils import enter_current_session, has_unknown_shape, quiet_stdio
from ..operands import DataFrameOperand, DataFrameOperandMixin, OutputType
from ..utils import build_df, build_empty_df, build_series, parse_index, \
validate_output_types
validate_output_types, build_empty_series


class DataFrameMapChunk(DataFrameOperand, DataFrameOperandMixin):
Expand Down Expand Up @@ -175,12 +175,21 @@ def tile(cls, op: "DataFrameMapChunk"):
@enter_current_session
def execute(cls, ctx, op: "DataFrameMapChunk"):
inp = ctx[op.input.key]
out_chunk = op.outputs[0]
out = op.outputs[0]
if len(inp) == 0:
if op.output_types[0] == OutputType.dataframe:
ctx[out.key] = build_empty_df(out.dtypes)
elif op.output_types[0] == OutputType.series:
ctx[out.key] = build_empty_series(
out.dtype, name=out.name)
else:
raise ValueError(f'Chunk can not be empty except for dataframe/series.')
return

kwargs = op.kwargs or dict()
if op.with_chunk_index:
kwargs['chunk_index'] = out_chunk.index
ctx[out_chunk.key] = op.func(inp, *op.args, **kwargs)
kwargs['chunk_index'] = out.index
ctx[out.key] = op.func(inp, *op.args, **kwargs)


def map_chunk(df_or_series, func, args=(), **kwargs):
Expand Down
30 changes: 30 additions & 0 deletions mars/dataframe/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,36 @@ def test_to_frame_or_series(setup):
pd.testing.assert_series_equal(raw.to_series(name='new_name'), result)


def test_to_frame_or_series_apply(setup):
df1 = DataFrame(pd.DataFrame([[0, 1], [2, 3]], columns=['col1', 'col2']))
df2 = df1.append(DataFrame(pd.DataFrame(columns=['col1', 'col2'])))
pd_df2 = df2.apply(lambda row: pd.Series([1, 2], index=['c', 'd']), axis=1).to_pandas()
assert pd_df2.columns.tolist() == ['c', 'd']

def f(df):
df['col3'] = df['col2']
return df

pd_df3 = df2.groupby(['col1']).apply(f).to_pandas()
assert pd_df3.columns.tolist() == ['col1', 'col2', 'col3']

pd_df4 = df2.map_chunk(lambda chunk_df: chunk_df.apply(
lambda row: pd.Series([1, 2], index=['c', 'd']), axis=1)).to_pandas()
assert pd_df4.columns.tolist() == ['c', 'd']

ser1 = Series(pd.Series(data={'a': 1, 'b': 2, 'c': 3}, index=['a', 'b', 'c']))
ser2 = ser1.append(Series(pd.Series(dtype=np.int64)))
pd_ser2 = ser2.apply(lambda v: str(v)).execute()
assert pd_ser2.dtype == object

ser3 = ser2.map_chunk(lambda chunk_series: chunk_series.apply(lambda x: float(x))).execute()

def check_dtype(s):
assert s.dtypes == np.float64
return s
ser3.map_chunk(check_dtype).execute()


def test_assign(setup):
rs = np.random.RandomState(0)
raw = pd.DataFrame({"A": rs.rand(10), "B": rs.rand(10)})
Expand Down