Skip to content

Commit

Permalink
Fix df/series.{apply, map_chunk} when some chunk output empty data (m…
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang committed Sep 7, 2021
1 parent 7c43c8d commit 7221874
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 7 deletions.
14 changes: 11 additions & 3 deletions mars/dataframe/base/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ...utils import enter_current_session, quiet_stdio
from ..operands import DataFrameOperandMixin, DataFrameOperand
from ..utils import build_df, build_series, parse_index, validate_axis, \
validate_output_types, make_dtypes, make_dtype
validate_output_types, make_dtypes, make_dtype, build_empty_df, build_empty_series


class ApplyOperand(DataFrameOperand, DataFrameOperandMixin):
Expand Down Expand Up @@ -86,13 +86,22 @@ def kwds(self):
@enter_current_session
def execute(cls, ctx, op):
input_data = ctx[op.inputs[0].key]
out = op.outputs[0]
if len(input_data) == 0:
if op.output_types[0] == OutputType.dataframe:
ctx[out.key] = build_empty_df(out.dtypes)
else:
ctx[out.key] = build_empty_series(
out.dtype, name=out.name)
return

if isinstance(input_data, pd.DataFrame):
result = input_data.apply(op.func, axis=op.axis, raw=op.raw, result_type=op.result_type,
args=op.args, **op.kwds)
else:
result = input_data.apply(op.func, convert_dtype=op.convert_dtype, args=op.args,
**op.kwds)
ctx[op.outputs[0].key] = result
ctx[out.key] = result

@classmethod
def _tile_df(cls, op):
Expand Down Expand Up @@ -285,7 +294,6 @@ def _call_series(self, series, dtypes=None, dtype=None, name=None, index=None):
else:
raise TypeError('Cannot determine dtypes, '
'please specify `dtypes` as argument')

columns_value = parse_index(dtypes.index, store_data=True)

return self.new_dataframe([series], shape=(series.shape[0], len(dtypes)),
Expand Down
17 changes: 13 additions & 4 deletions mars/dataframe/base/map_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ...utils import enter_current_session, has_unknown_shape, quiet_stdio
from ..operands import DataFrameOperand, DataFrameOperandMixin, OutputType
from ..utils import build_df, build_empty_df, build_series, parse_index, \
validate_output_types
validate_output_types, build_empty_series


class DataFrameMapChunk(DataFrameOperand, DataFrameOperandMixin):
Expand Down Expand Up @@ -175,12 +175,21 @@ def tile(cls, op: "DataFrameMapChunk"):
@enter_current_session
def execute(cls, ctx, op: "DataFrameMapChunk"):
inp = ctx[op.input.key]
out_chunk = op.outputs[0]
out = op.outputs[0]
if len(inp) == 0:
if op.output_types[0] == OutputType.dataframe:
ctx[out.key] = build_empty_df(out.dtypes)
elif op.output_types[0] == OutputType.series:
ctx[out.key] = build_empty_series(
out.dtype, name=out.name)
else:
raise ValueError(f'Chunk can not be empty except for dataframe/series.')
return

kwargs = op.kwargs or dict()
if op.with_chunk_index:
kwargs['chunk_index'] = out_chunk.index
ctx[out_chunk.key] = op.func(inp, *op.args, **kwargs)
kwargs['chunk_index'] = out.index
ctx[out.key] = op.func(inp, *op.args, **kwargs)


def map_chunk(df_or_series, func, args=(), **kwargs):
Expand Down
30 changes: 30 additions & 0 deletions mars/dataframe/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,36 @@ def test_to_frame_or_series(setup):
pd.testing.assert_series_equal(raw.to_series(name='new_name'), result)


def test_to_frame_or_series_apply(setup):
df1 = DataFrame(pd.DataFrame([[0, 1], [2, 3]], columns=['col1', 'col2']))
df2 = df1.append(DataFrame(pd.DataFrame(columns=['col1', 'col2'])))
pd_df2 = df2.apply(lambda row: pd.Series([1, 2], index=['c', 'd']), axis=1).to_pandas()
assert pd_df2.columns.tolist() == ['c', 'd']

def f(df):
df['col3'] = df['col2']
return df

pd_df3 = df2.groupby(['col1']).apply(f).to_pandas()
assert pd_df3.columns.tolist() == ['col1', 'col2', 'col3']

pd_df4 = df2.map_chunk(lambda chunk_df: chunk_df.apply(
lambda row: pd.Series([1, 2], index=['c', 'd']), axis=1)).to_pandas()
assert pd_df4.columns.tolist() == ['c', 'd']

ser1 = Series(pd.Series(data={'a': 1, 'b': 2, 'c': 3}, index=['a', 'b', 'c']))
ser2 = ser1.append(Series(pd.Series(dtype=np.int64)))
pd_ser2 = ser2.apply(lambda v: str(v)).execute()
assert pd_ser2.dtype == object

ser3 = ser2.map_chunk(lambda chunk_series: chunk_series.apply(lambda x: float(x))).execute()

def check_dtype(s):
assert s.dtypes == np.float64
return s
ser3.map_chunk(check_dtype).execute()


def test_assign(setup):
rs = np.random.RandomState(0)
raw = pd.DataFrame({"A": rs.rand(10), "B": rs.rand(10)})
Expand Down

0 comments on commit 7221874

Please sign in to comment.