diff --git a/mars/dataframe/base/apply.py b/mars/dataframe/base/apply.py index 09dc96580d..6df98c8212 100644 --- a/mars/dataframe/base/apply.py +++ b/mars/dataframe/base/apply.py @@ -26,7 +26,7 @@ from ...utils import enter_current_session from ..operands import DataFrameOperandMixin, DataFrameOperand from ..utils import build_df, build_series, parse_index, validate_axis, \ - validate_output_types, quiet_stdio + validate_output_types, quiet_stdio, make_dtypes, make_dtype class ApplyOperand(DataFrameOperand, DataFrameOperandMixin): @@ -196,7 +196,7 @@ def tile(cls, op): else: return cls._tile_series(op) - def _infer_df_func_returns(self, df, dtypes, index): + def _infer_df_func_returns(self, df, dtypes, dtype=None, name=None, index=None): if isinstance(self._func, np.ufunc): output_type, new_dtypes, index_value, new_elementwise = \ OutputType.dataframe, None, 'inherit', True @@ -219,7 +219,7 @@ def _infer_df_func_returns(self, df, dtypes, index): new_dtypes = new_dtypes or infer_df.dtypes else: output_type = output_type or OutputType.series - new_dtypes = new_dtypes or infer_df.dtype + new_dtypes = (name or infer_df.name, dtype or infer_df.dtype) new_elementwise = False if new_elementwise is None else new_elementwise except: # noqa: E722 # nosec pass @@ -230,8 +230,11 @@ def _infer_df_func_returns(self, df, dtypes, index): self._elementwise = new_elementwise if self._elementwise is None else self._elementwise return dtypes, index_value - def _call_dataframe(self, df, dtypes=None, index=None): - dtypes, index_value = self._infer_df_func_returns(df, dtypes, index) + def _call_dataframe(self, df, dtypes=None, dtype=None, name=None, index=None): + # for backward compatibility + dtype = dtype if dtype is not None else dtypes + dtypes, index_value = self._infer_df_func_returns( + df, dtypes, dtype=dtype, name=name, index=index) if index_value is None: index_value = parse_index(None, (df.key, df.index_value.key)) for arg, desc in zip((self.output_types, dtypes), ('output_types', 'dtypes')): @@ -259,9 +262,12 @@ def _call_dataframe(self, df, dtypes=None, index=None): return self.new_dataframe([df], shape=shape, dtypes=dtypes, index_value=df.index_value, columns_value=parse_index(dtypes.index)) else: - return self.new_series([df], shape=shape, dtype=dtypes, index_value=index_value) + name, dtype = dtypes + return self.new_series([df], shape=shape, name=name, dtype=dtype, index_value=index_value) - def _call_series(self, series, dtype=None, index=None): + def _call_series(self, series, dtypes=None, dtype=None, name=None, index=None): + # for backward compatibility + dtype = dtype if dtype is not None else dtypes if self._convert_dtype: try: test_series = build_series(series, size=2, name=series.name) @@ -280,13 +286,12 @@ def _call_series(self, series, dtype=None, index=None): index_value = parse_index(None, series) if output_type == OutputType.dataframe: - if dtype is not None: - dtypes = dtype - elif infer_series is not None and infer_series.ndim == 2: - dtypes = infer_series.dtypes - else: - raise TypeError('Cannot determine dtypes, ' - 'please specify `dtypes` as argument') + if dtypes is None: + if infer_series is not None and infer_series.ndim == 2: + dtypes = infer_series.dtypes + else: + raise TypeError('Cannot determine dtypes, ' + 'please specify `dtypes` as argument') columns_value = parse_index(dtypes.index, store_data=True) @@ -297,30 +302,197 @@ def _call_series(self, series, dtype=None, index=None): if dtype is None and infer_series is not None and infer_series.ndim == 1: dtype = infer_series.dtype else: - dtype = np.dtype(object) + dtype = dtype if dtype is not None else np.dtype(object) if infer_series is not None and infer_series.ndim == 1: - name = infer_series.name - else: - name = None + name = name or infer_series.name return self.new_series([series], dtype=dtype, shape=series.shape, index_value=index_value, name=name) else: - dtype, name = np.dtype('object'), None + dtype = dtype if dtype is not None else np.dtype('object') return self.new_series([series], dtype=dtype, shape=series.shape, index_value=series.index_value, name=name) - def __call__(self, df_or_series, dtypes=None, index=None): + def __call__(self, df_or_series, dtypes=None, dtype=None, name=None, index=None): axis = getattr(self, 'axis', None) or 0 + dtypes = make_dtypes(dtypes) + dtype = make_dtype(dtype) self._axis = validate_axis(axis, df_or_series) if df_or_series.op.output_types[0] == OutputType.dataframe: return self._call_dataframe(df_or_series, dtypes=dtypes, index=index) else: - return self._call_series(df_or_series, dtype=dtypes, index=index) + return self._call_series(df_or_series, dtypes=dtypes, dtype=dtype, name=name, index=index) def df_apply(df, func, axis=0, raw=False, result_type=None, args=(), dtypes=None, - output_type=None, index=None, elementwise=None, **kwds): + dtype=None, name=None, output_type=None, index=None, elementwise=None, + **kwds): + """ + Apply a function along an axis of the DataFrame. + + Objects passed to the function are Series objects whose index is + either the DataFrame's index (``axis=0``) or the DataFrame's columns + (``axis=1``). By default (``result_type=None``), the final return type + is inferred from the return type of the applied function. Otherwise, + it depends on the `result_type` argument. + + Parameters + ---------- + func : function + Function to apply to each column or row. + axis : {0 or 'index', 1 or 'columns'}, default 0 + Axis along which the function is applied: + + * 0 or 'index': apply function to each column. + * 1 or 'columns': apply function to each row. + + raw : bool, default False + Determines if row or column is passed as a Series or ndarray object: + + * ``False`` : passes each row or column as a Series to the + function. + * ``True`` : the passed function will receive ndarray objects + instead. + If you are just applying a NumPy reduction function this will + achieve much better performance. + + result_type : {'expand', 'reduce', 'broadcast', None}, default None + These only act when ``axis=1`` (columns): + + * 'expand' : list-like results will be turned into columns. + * 'reduce' : returns a Series if possible rather than expanding + list-like results. This is the opposite of 'expand'. + * 'broadcast' : results will be broadcast to the original shape + of the DataFrame, the original index and columns will be + retained. + + The default behaviour (None) depends on the return value of the + applied function: list-like results will be returned as a Series + of those. However if the apply function returns a Series these + are expanded to columns. + + output_type : {'dataframe', 'series'}, default None + Specify type of returned object. See `Notes` for more details. + + dtypes : Series, default None + Specify dtypes of returned DataFrames. See `Notes` for more details. + + dtype : numpy.dtype, default None + Specify dtype of returned Series. See `Notes` for more details. + + name : str, default None + Specify name of returned Series. See `Notes` for more details. + + index : Index, default None + Specify index of returned object. See `Notes` for more details. + + elementwise : bool, default False + Specify whether ``func`` is an elementwise function: + + * ``False`` : The function is not elementwise. Mars will try + concatenating chunks in rows (when ``axis=0``) or in columns + (when ``axis=1``) and then apply ``func`` onto the concatenated + chunk. The concatenation step can cause extra latency. + * ``True`` : The function is elementwise. Mars will apply + ``func`` to original chunks. This will not introduce extra + concatenation step and reduce overhead. + + args : tuple + Positional arguments to pass to `func` in addition to the + array/series. + + **kwds + Additional keyword arguments to pass as keywords arguments to + `func`. + + Returns + ------- + Series or DataFrame + Result of applying ``func`` along the given axis of the + DataFrame. + + See Also + -------- + DataFrame.applymap: For elementwise operations. + DataFrame.aggregate: Only perform aggregating type operations. + DataFrame.transform: Only perform transforming type operations. + + Notes + ----- + When deciding output dtypes and shape of the return value, Mars will + try applying ``func`` onto a mock DataFrame, and the apply call may + fail. When this happens, you need to specify the type of apply call + (DataFrame or Series) in output_type. + + * For DataFrame output, you need to specify a list or a pandas Series + as ``dtypes`` of output DataFrame. ``index`` of output can also be + specified. + * For Series output, you need to specify ``dtype`` and ``name`` of + output Series. + + Examples + -------- + >>> import numpy as np + >>> import mars.tensor as mt + >>> import mars.dataframe as md + >>> df = md.DataFrame([[4, 9]] * 3, columns=['A', 'B']) + >>> df.execute() + A B + 0 4 9 + 1 4 9 + 2 4 9 + + Using a reducing function on either axis + + >>> df.apply(np.sum, axis=0).execute() + A 12 + B 27 + dtype: int64 + + >>> df.apply(np.sum, axis=1).execute() + 0 13 + 1 13 + 2 13 + dtype: int64 + + Returning a list-like will result in a Series + + >>> df.apply(lambda x: [1, 2], axis=1).execute() + 0 [1, 2] + 1 [1, 2] + 2 [1, 2] + dtype: object + + Passing ``result_type='expand'`` will expand list-like results + to columns of a Dataframe + + >>> df.apply(lambda x: [1, 2], axis=1, result_type='expand').execute() + 0 1 + 0 1 2 + 1 1 2 + 2 1 2 + + Returning a Series inside the function is similar to passing + ``result_type='expand'``. The resulting column names + will be the Series index. + + >>> df.apply(lambda x: md.Series([1, 2], index=['foo', 'bar']), axis=1).execute() + foo bar + 0 1 2 + 1 1 2 + 2 1 2 + + Passing ``result_type='broadcast'`` will ensure the same shape + result, whether list-like or scalar is returned by the function, + and broadcast it along the axis. The resulting column names will + be the originals. + + >>> df.apply(lambda x: [1, 2], axis=1, result_type='broadcast').execute() + A B + 0 1 2 + 1 1 2 + 2 1 2 + """ if isinstance(func, (list, dict)): return df.aggregate(func) @@ -340,11 +512,132 @@ def df_apply(df, func, axis=0, raw=False, result_type=None, args=(), dtypes=None op = ApplyOperand(func=func, axis=axis, raw=raw, result_type=result_type, args=args, kwds=kwds, output_type=output_type, elementwise=elementwise) - return op(df, dtypes=dtypes, index=index) - - -def series_apply(series, func, convert_dtype=True, output_type=None, - args=(), index=None, **kwds): + return op(df, dtypes=dtypes, dtype=dtype, name=name, index=index) + + +def series_apply(series, func, convert_dtype=True, output_type=None, args=(), + dtypes=None, dtype=None, name=None, index=None, **kwds): + """ + Invoke function on values of Series. + + Can be ufunc (a NumPy function that applies to the entire Series) + or a Python function that only works on single values. + + Parameters + ---------- + func : function + Python function or NumPy ufunc to apply. + + convert_dtype : bool, default True + Try to find better dtype for elementwise function results. If + False, leave as dtype=object. + + output_type : {'dataframe', 'series'}, default None + Specify type of returned object. See `Notes` for more details. + + dtypes : Series, default None + Specify dtypes of returned DataFrames. See `Notes` for more details. + + dtype : numpy.dtype, default None + Specify dtype of returned Series. See `Notes` for more details. + + name : str, default None + Specify name of returned Series. See `Notes` for more details. + + index : Index, default None + Specify index of returned object. See `Notes` for more details. + + args : tuple + Positional arguments passed to func after the series value. + + **kwds + Additional keyword arguments passed to func. + + Returns + ------- + Series or DataFrame + If func returns a Series object the result will be a DataFrame. + + See Also + -------- + Series.map: For element-wise operations. + Series.agg: Only perform aggregating type operations. + Series.transform: Only perform transforming type operations. + + Notes + ----- + When deciding output dtypes and shape of the return value, Mars will + try applying ``func`` onto a mock Series, and the apply call may fail. + When this happens, you need to specify the type of apply call + (DataFrame or Series) in output_type. + + * For DataFrame output, you need to specify a list or a pandas Series + as ``dtypes`` of output DataFrame. ``index`` of output can also be + specified. + * For Series output, you need to specify ``dtype`` and ``name`` of + output Series. + + Examples + -------- + Create a series with typical summer temperatures for each city. + + >>> import mars.tensor as mt + >>> import mars.dataframe as md + >>> s = md.Series([20, 21, 12], + ... index=['London', 'New York', 'Helsinki']) + >>> s.execute() + London 20 + New York 21 + Helsinki 12 + dtype: int64 + + Square the values by defining a function and passing it as an + argument to ``apply()``. + + >>> def square(x): + ... return x ** 2 + >>> s.apply(square).execute() + London 400 + New York 441 + Helsinki 144 + dtype: int64 + + Square the values by passing an anonymous function as an + argument to ``apply()``. + + >>> s.apply(lambda x: x ** 2).execute() + London 400 + New York 441 + Helsinki 144 + dtype: int64 + + Define a custom function that needs additional positional + arguments and pass these additional arguments using the + ``args`` keyword. + + >>> def subtract_custom_value(x, custom_value): + ... return x - custom_value + + >>> s.apply(subtract_custom_value, args=(5,)).execute() + London 15 + New York 16 + Helsinki 7 + dtype: int64 + + Define a custom function that takes keyword arguments + and pass these arguments to ``apply``. + + >>> def add_custom_values(x, **kwargs): + ... for month in kwargs: + ... x += kwargs[month] + ... return x + + >>> s.apply(add_custom_values, june=30, july=20, august=25).execute() + London 95 + New York 96 + Helsinki 87 + dtype: int64 + """ if isinstance(func, (list, dict)): return series.aggregate(func) @@ -364,8 +657,7 @@ def series_apply(series, func, convert_dtype=True, output_type=None, output_types = validate_output_types( output_type=output_type, output_types=output_types, object_type=object_type) output_type = output_types[0] if output_types else OutputType.series - dtypes = kwds.pop('dtypes', kwds.pop('dtype', None)) op = ApplyOperand(func=func, convert_dtype=convert_dtype, args=args, kwds=kwds, output_type=output_type) - return op(series, dtypes=dtypes, index=index) + return op(series, dtypes=dtypes, dtype=dtype, name=name, index=index) diff --git a/mars/dataframe/base/tests/test_base.py b/mars/dataframe/base/tests/test_base.py index e9fbf2f989..50c366bb84 100644 --- a/mars/dataframe/base/tests/test_base.py +++ b/mars/dataframe/base/tests/test_base.py @@ -188,6 +188,20 @@ def testDataFrameApply(self): df = from_pandas_df(df_raw, chunk_size=5) + def df_func_with_err(v): + assert len(v) > 2 + return v.sort_values() + + with self.assertRaises(TypeError): + df.apply(df_func_with_err) + + r = df.apply(df_func_with_err, output_type='dataframe', + dtypes=df_raw.dtypes) + self.assertEqual(r.shape, (np.nan, df.shape[-1])) + self.assertEqual(r.op._op_type_, opcodes.APPLY) + self.assertEqual(r.op.output_types[0], OutputType.dataframe) + self.assertFalse(r.op.elementwise) + r = df.apply('ffill') self.assertEqual(r.op._op_type_, opcodes.FILL_NA) @@ -351,6 +365,23 @@ def rename_fn(f, new_name): options.chunk_store_limit = 20 # DATAFRAME CASES + + # test transform with infer failure + def transform_df_with_err(v): + assert len(v) > 2 + return v.sort_values() + + with self.assertRaises(TypeError): + df.transform(transform_df_with_err) + + r = df.transform(transform_df_with_err, dtypes=df_raw.dtypes).tiles() + self.assertEqual(r.shape, df.shape) + self.assertEqual(r.op._op_type_, opcodes.TRANSFORM) + self.assertEqual(r.op.output_types[0], OutputType.dataframe) + self.assertEqual(r.chunks[0].shape, (df.shape[0], 20 // df.shape[0])) + self.assertEqual(r.chunks[0].inputs[0].shape[0], df_raw.shape[0]) + self.assertEqual(r.chunks[0].inputs[0].op._op_type_, opcodes.CONCATENATE) + # test transform scenarios on data frames r = df.transform(lambda x: list(range(len(x)))).tiles() self.assertTrue(all(v == np.dtype('int64') for v in r.dtypes)) diff --git a/mars/dataframe/base/transform.py b/mars/dataframe/base/transform.py index 9357fc5ba8..c9ae0615a0 100644 --- a/mars/dataframe/base/transform.py +++ b/mars/dataframe/base/transform.py @@ -24,7 +24,7 @@ from ..core import DATAFRAME_CHUNK_TYPE, DATAFRAME_TYPE from ..operands import DataFrameOperandMixin, DataFrameOperand from ..utils import build_df, build_series, validate_axis, \ - parse_index, filter_dtypes_by_index, quiet_stdio + parse_index, filter_dtypes_by_index, make_dtypes, quiet_stdio class TransformOperand(DataFrameOperand, DataFrameOperandMixin): @@ -41,10 +41,10 @@ class TransformOperand(DataFrameOperand, DataFrameOperandMixin): _call_agg = BoolField('call_agg') def __init__(self, func=None, axis=None, convert_dtype=None, args=None, kwds=None, - call_agg=None, output_types=None, tileable_op_key=None, **kw): + call_agg=None, output_types=None, tileable_op_key=None, memory_scale=None, **kw): super().__init__(_func=func, _axis=axis, _convert_dtype=convert_dtype, _args=args, _kwds=kwds, _call_agg=call_agg, _output_types=output_types, - _tileable_op_key=tileable_op_key, **kw) + _tileable_op_key=tileable_op_key, _memory_scale=memory_scale, **kw) @property def func(self): @@ -211,10 +211,10 @@ def _infer_df_func_returns(self, df, dtypes): is_df = isinstance(infer_df, pd.DataFrame) if is_df: - new_dtypes = dtypes or infer_df.dtypes + new_dtypes = make_dtypes(dtypes) if dtypes is not None else infer_df.dtypes self.output_types = [OutputType.dataframe] else: - new_dtypes = dtypes or (infer_df.name, infer_df.dtype) + new_dtypes = dtypes if dtypes is not None else (infer_df.name, infer_df.dtype) self.output_types = [OutputType.series] return new_dtypes @@ -225,11 +225,6 @@ def __call__(self, df, dtypes=None, index=None): dtypes = self._infer_df_func_returns(df, dtypes) - for arg, desc in zip((self.output_types, dtypes), ('output_types', 'dtypes')): - if arg is None: - raise TypeError(f'Cannot determine {desc} by calculating with enumerate data, ' - 'please specify it as arguments') - if self.output_types[0] == OutputType.dataframe: new_shape = list(df.shape) new_index_value = df.index_value @@ -257,12 +252,173 @@ def __call__(self, df, dtypes=None, index=None): def df_transform(df, func, axis=0, *args, dtypes=None, **kwargs): + """ + Call ``func`` on self producing a DataFrame with transformed values. + + Produced DataFrame will have same axis length as self. + + Parameters + ---------- + func : function, str, list or dict + Function to use for transforming the data. If a function, must either + work when passed a DataFrame or when passed to DataFrame.apply. + + Accepted combinations are: + + - function + - string function name + - list of functions and/or function names, e.g. ``[np.exp. 'sqrt']`` + - dict of axis labels -> functions, function names or list of such. + axis : {0 or 'index', 1 or 'columns'}, default 0 + If 0 or 'index': apply function to each column. + If 1 or 'columns': apply function to each row. + + dtypes : Series, default None + Specify dtypes of returned DataFrames. See `Notes` for more details. + + *args + Positional arguments to pass to `func`. + **kwargs + Keyword arguments to pass to `func`. + + Returns + ------- + DataFrame + A DataFrame that must have the same length as self. + + Raises + ------ + ValueError : If the returned DataFrame has a different length than self. + + See Also + -------- + DataFrame.agg : Only perform aggregating type operations. + DataFrame.apply : Invoke function on a DataFrame. + + Notes + ----- + When deciding output dtypes and shape of the return value, Mars will + try applying ``func`` onto a mock DataFrame and the apply call may + fail. When this happens, you need to specify a list or a pandas + Series as ``dtypes`` of output DataFrame. + + Examples + -------- + >>> import mars.tensor as mt + >>> import mars.dataframe as md + >>> df = md.DataFrame({'A': range(3), 'B': range(1, 4)}) + >>> df.execute() + A B + 0 0 1 + 1 1 2 + 2 2 3 + >>> df.transform(lambda x: x + 1).execute() + A B + 0 1 2 + 1 2 3 + 2 3 4 + + Even though the resulting DataFrame must have the same length as the + input DataFrame, it is possible to provide several input functions: + + >>> s = md.Series(range(3)) + >>> s.execute() + 0 0 + 1 1 + 2 2 + dtype: int64 + >>> s.transform([mt.sqrt, mt.exp]).execute() + sqrt exp + 0 0.000000 1.000000 + 1 1.000000 2.718282 + 2 1.414214 7.389056 + """ op = TransformOperand(func=func, axis=axis, args=args, kwds=kwargs, output_types=[OutputType.dataframe], call_agg=kwargs.pop('_call_agg', False)) return op(df, dtypes=dtypes) def series_transform(series, func, convert_dtype=True, axis=0, *args, dtype=None, **kwargs): + """ + Call ``func`` on self producing a Series with transformed values. + + Produced Series will have same axis length as self. + + Parameters + ---------- + func : function, str, list or dict + Function to use for transforming the data. If a function, must either + work when passed a Series or when passed to Series.apply. + + Accepted combinations are: + + - function + - string function name + - list of functions and/or function names, e.g. ``[np.exp. 'sqrt']`` + - dict of axis labels -> functions, function names or list of such. + axis : {0 or 'index'} + Parameter needed for compatibility with DataFrame. + + dtype : numpy.dtype, default None + Specify dtypes of returned DataFrames. See `Notes` for more details. + + *args + Positional arguments to pass to `func`. + **kwargs + Keyword arguments to pass to `func`. + + Returns + ------- + Series + A Series that must have the same length as self. + + Raises + ------ + ValueError : If the returned Series has a different length than self. + + See Also + -------- + Series.agg : Only perform aggregating type operations. + Series.apply : Invoke function on a Series. + + Notes + ----- + When deciding output dtypes and shape of the return value, Mars will + try applying ``func`` onto a mock Series, and the transform call may + fail. When this happens, you need to specify ``dtype`` of output + Series. + + Examples + -------- + >>> import mars.tensor as mt + >>> import mars.dataframe as md + >>> df = md.DataFrame({'A': range(3), 'B': range(1, 4)}) + >>> df.execute() + A B + 0 0 1 + 1 1 2 + 2 2 3 + >>> df.transform(lambda x: x + 1).execute() + A B + 0 1 2 + 1 2 3 + 2 3 4 + + Even though the resulting Series must have the same length as the + input Series, it is possible to provide several input functions: + + >>> s = md.Series(range(3)) + >>> s.execute() + 0 0 + 1 1 + 2 2 + dtype: int64 + >>> s.transform([mt.sqrt, mt.exp]).execute() + sqrt exp + 0 0.000000 1.000000 + 1 1.000000 2.718282 + 2 1.414214 7.389056 + """ op = TransformOperand(func=func, axis=axis, convert_dtype=convert_dtype, args=args, kwds=kwargs, output_types=[OutputType.series], call_agg=kwargs.pop('_call_agg', False)) dtypes = (series.name, dtype) if dtype is not None else None diff --git a/mars/dataframe/groupby/apply.py b/mars/dataframe/groupby/apply.py index 02a48c74fa..1c5180e82e 100644 --- a/mars/dataframe/groupby/apply.py +++ b/mars/dataframe/groupby/apply.py @@ -16,13 +16,13 @@ import pandas as pd from ... import opcodes -from ...core import OutputType, get_output_types +from ...core import OutputType from ...custom_log import redirect_custom_log from ...serialize import TupleField, DictField, FunctionField, StringField from ...utils import enter_current_session from ..operands import DataFrameOperandMixin, DataFrameOperand -from ..utils import build_df, build_empty_df, build_series, build_empty_series, \ - parse_index, validate_output_types, quiet_stdio +from ..utils import build_empty_df, build_empty_series, parse_index, \ + validate_output_types, make_dtypes, make_dtype, quiet_stdio class GroupByApply(DataFrameOperand, DataFrameOperandMixin): @@ -109,41 +109,27 @@ def tile(cls, op): kw['nsplits'] = ((np.nan,) * len(chunks),) return new_op.new_tileables([in_groupby], **kw) - def _infer_df_func_returns(self, in_groupby, in_df, dtypes, index): + def _infer_df_func_returns(self, in_groupby, in_df, dtypes, dtype=None, + name=None, index=None): index_value, output_type, new_dtypes = None, None, None try: - if in_df.op.output_types[0] == OutputType.dataframe: - test_df = build_df(in_df, size=2) - else: - test_df = build_series(in_df, size=2, name=in_df.name) - - selection = getattr(in_groupby.op, 'selection', None) - if selection: - test_df = test_df[selection] - - with np.errstate(all='ignore'): - infer_df = self.func(test_df, *self.args, **self.kwds) + infer_df = in_groupby.op.build_mock_groupby().apply(self.func, *self.args, **self.kwds) # todo return proper index when sort=True is implemented index_value = parse_index(None, in_df.key, self.func) - if infer_df is None: - output_type = get_output_types(in_df)[0] - index_value = parse_index(pd.Index([], dtype=np.object)) - if output_type == OutputType.dataframe: - new_dtypes = pd.Series([], index=pd.Index([])) - else: - new_dtypes = (None, np.dtype('O')) - elif isinstance(infer_df, pd.DataFrame): + # for backward compatibility + dtype = dtype if dtype is not None else dtypes + if isinstance(infer_df, pd.DataFrame): output_type = output_type or OutputType.dataframe new_dtypes = new_dtypes or infer_df.dtypes elif isinstance(infer_df, pd.Series): output_type = output_type or OutputType.series - new_dtypes = new_dtypes or (infer_df.name, infer_df.dtype) + new_dtypes = new_dtypes or (name or infer_df.name, dtype or infer_df.dtype) else: output_type = OutputType.series - new_dtypes = (None, pd.Series(infer_df).dtype) + new_dtypes = (name, dtype or pd.Series(infer_df).dtype) except: # noqa: E722 # nosec pass @@ -152,13 +138,14 @@ def _infer_df_func_returns(self, in_groupby, in_df, dtypes, index): index_value = index_value if index is None else parse_index(index) return dtypes, index_value - def __call__(self, groupby, dtypes=None, index=None): + def __call__(self, groupby, dtypes=None, dtype=None, name=None, index=None): in_df = groupby while in_df.op.output_types[0] not in (OutputType.dataframe, OutputType.series): in_df = in_df.inputs[0] with quiet_stdio(): - dtypes, index_value = self._infer_df_func_returns(groupby, in_df, dtypes, index) + dtypes, index_value = self._infer_df_func_returns( + groupby, in_df, dtypes, dtype=dtype, name=name, index=index) if index_value is None: index_value = parse_index(None, (in_df.key, in_df.index_value.key)) for arg, desc in zip((self.output_types, dtypes), ('output_types', 'dtypes')): @@ -171,21 +158,87 @@ def __call__(self, groupby, dtypes=None, index=None): return self.new_dataframe([groupby], shape=new_shape, dtypes=dtypes, index_value=index_value, columns_value=parse_index(dtypes.index, store_data=True)) else: - name, dtype = dtypes + name = name or dtypes[0] + dtype = dtype or dtypes[1] new_shape = (np.nan,) return self.new_series([groupby], name=name, shape=new_shape, dtype=dtype, index_value=index_value) -def groupby_apply(groupby, func, *args, dtypes=None, index=None, output_type=None, **kwargs): - # todo this can be done with sort_index implemented - if not groupby.op.groupby_params.get('as_index', True): - raise NotImplementedError('apply when set_index == False is not supported') - +def groupby_apply(groupby, func, *args, output_type=None, dtypes=None, dtype=None, + name=None, index=None, **kwargs): + """ + Apply function `func` group-wise and combine the results together. + + The function passed to `apply` must take a dataframe as its first + argument and return a DataFrame, Series or scalar. `apply` will + then take care of combining the results back together into a single + dataframe or series. `apply` is therefore a highly flexible + grouping method. + + While `apply` is a very flexible method, its downside is that + using it can be quite a bit slower than using more specific methods + like `agg` or `transform`. Pandas offers a wide range of method that will + be much faster than using `apply` for their specific purposes, so try to + use them before reaching for `apply`. + + Parameters + ---------- + func : callable + A callable that takes a dataframe as its first argument, and + returns a dataframe, a series or a scalar. In addition the + callable may take positional and keyword arguments. + + output_type : {'dataframe', 'series'}, default None + Specify type of returned object. See `Notes` for more details. + + dtypes : Series, default None + Specify dtypes of returned DataFrames. See `Notes` for more details. + + dtype : numpy.dtype, default None + Specify dtype of returned Series. See `Notes` for more details. + + name : str, default None + Specify name of returned Series. See `Notes` for more details. + + index : Index, default None + Specify index of returned object. See `Notes` for more details. + + args, kwargs : tuple and dict + Optional positional and keyword arguments to pass to `func`. + + Returns + ------- + applied : Series or DataFrame + + See Also + -------- + pipe : Apply function to the full GroupBy object instead of to each + group. + aggregate : Apply aggregate function to the GroupBy object. + transform : Apply function column-by-column to the GroupBy object. + Series.apply : Apply a function to a Series. + DataFrame.apply : Apply a function to each row or column of a DataFrame. + + Notes + ----- + When deciding output dtypes and shape of the return value, Mars will + try applying ``func`` onto a mock grouped object, and the apply call + may fail. When this happens, you need to specify the type of apply + call (DataFrame or Series) in output_type. + + * For DataFrame output, you need to specify a list or a pandas Series + as ``dtypes`` of output DataFrame. ``index`` of output can also be + specified. + * For Series output, you need to specify ``dtype`` and ``name`` of + output Series. + """ output_types = kwargs.pop('output_types', None) object_type = kwargs.pop('object_type', None) output_types = validate_output_types( output_types=output_types, output_type=output_type, object_type=object_type) + dtypes = make_dtypes(dtypes) + dtype = make_dtype(dtype) op = GroupByApply(func=func, args=args, kwds=kwargs, output_types=output_types) - return op(groupby, dtypes=dtypes, index=index) + return op(groupby, dtypes=dtypes, dtype=dtype, name=name, index=index) diff --git a/mars/dataframe/groupby/core.py b/mars/dataframe/groupby/core.py index 865be20bad..3917ec2bfb 100644 --- a/mars/dataframe/groupby/core.py +++ b/mars/dataframe/groupby/core.py @@ -97,14 +97,19 @@ def groupby_params(self): def build_mock_groupby(self, **kwargs): in_df = self.inputs[0] if self.is_dataframe_obj: - empty_df = build_df(in_df, size=1) + mock_obj = pd.concat([ + build_df(in_df, size=2, fill_value=1), + build_df(in_df, size=2, fill_value=2), + ]) obj_dtypes = in_df.dtypes[in_df.dtypes == np.dtype('O')] - empty_df[obj_dtypes.index] = 'O' + mock_obj[obj_dtypes.index] = mock_obj[obj_dtypes.index].radd('O') else: + mock_obj = pd.concat([ + build_series(in_df, size=2, fill_value=1, name=in_df.name), + build_series(in_df, size=2, fill_value=2, name=in_df.name), + ]) if in_df.dtype == np.dtype('O'): - empty_df = pd.Series('O', index=pd.RangeIndex(2), name=in_df.name, dtype=np.dtype('O')) - else: - empty_df = build_series(in_df, size=1, name=in_df.name) + mock_obj = mock_obj.radd('O') new_kw = self.groupby_params new_kw.update(kwargs) @@ -114,11 +119,16 @@ def build_mock_groupby(self, **kwargs): new_by = [] for v in new_kw['by']: if isinstance(v, (Base, Entity)): - new_by.append(build_series(v, size=1, name=v.name)) + build_fun = build_df if v.ndim == 2 else build_series + mock_by = pd.concat([ + build_fun(v, size=2, fill_value=1, name=v.name), + build_fun(v, size=2, fill_value=2, name=v.name), + ]) + new_by.append(mock_by) else: new_by.append(v) new_kw['by'] = new_by - return empty_df.groupby(**new_kw) + return mock_obj.groupby(**new_kw) def _set_inputs(self, inputs): super()._set_inputs(inputs) diff --git a/mars/dataframe/groupby/cum.py b/mars/dataframe/groupby/cum.py index 4974a9788b..858df7a31c 100644 --- a/mars/dataframe/groupby/cum.py +++ b/mars/dataframe/groupby/cum.py @@ -45,13 +45,13 @@ def ascending(self) -> bool: return self._ascending def _calc_out_dtypes(self, in_groupby): - empty_groupby = in_groupby.op.build_mock_groupby() + mock_groupby = in_groupby.op.build_mock_groupby() func_name = getattr(self, '_func_name') if func_name == 'cumcount': - result_df = empty_groupby.cumcount(ascending=self.ascending) + result_df = mock_groupby.cumcount(ascending=self.ascending) else: - result_df = getattr(empty_groupby, func_name)(axis=self.axis) + result_df = getattr(mock_groupby, func_name)(axis=self.axis) if isinstance(result_df, pd.DataFrame): self.output_types = [OutputType.dataframe] diff --git a/mars/dataframe/groupby/tests/test_groupby.py b/mars/dataframe/groupby/tests/test_groupby.py index ee0253ace1..2dbf6b28e5 100644 --- a/mars/dataframe/groupby/tests/test_groupby.py +++ b/mars/dataframe/groupby/tests/test_groupby.py @@ -134,11 +134,29 @@ def testGroupByApply(self): def apply_df(df): return df.sort_index() + def apply_df_with_error(df): + assert len(df) > 2 + return df.sort_index() + def apply_series(s): return s.sort_index() mdf = md.DataFrame(df1, chunk_size=3) + with self.assertRaises(TypeError): + mdf.groupby('b').apply(apply_df_with_error) + + applied = mdf.groupby('b').apply( + apply_df_with_error, output_type='dataframe', + dtypes=df1.dtypes).tiles() + pd.testing.assert_series_equal(applied.dtypes, df1.dtypes) + self.assertEqual(applied.shape, (np.nan, 3)) + self.assertEqual(applied.op._op_type_, opcodes.APPLY) + self.assertEqual(applied.op.output_types[0], OutputType.dataframe) + self.assertEqual(len(applied.chunks), 3) + self.assertEqual(applied.chunks[0].shape, (np.nan, 3)) + pd.testing.assert_series_equal(applied.chunks[0].dtypes, df1.dtypes) + applied = mdf.groupby('b').apply(apply_df).tiles() pd.testing.assert_series_equal(applied.dtypes, df1.dtypes) self.assertEqual(applied.shape, (np.nan, 3)) @@ -191,11 +209,28 @@ def testGroupByTransform(self): def transform_df(df): return df.sort_index() + def transform_df_with_err(df): + assert len(df) > 2 + return df.sort_index() + mdf = md.DataFrame(df1, chunk_size=3) with self.assertRaises(TypeError): mdf.groupby('b').transform(['cummax', 'cumcount']) + with self.assertRaises(TypeError): + mdf.groupby('b').transform(transform_df_with_err) + + r = mdf.groupby('b').transform(transform_df_with_err, + dtypes=df1.dtypes.drop('b')).tiles() + self.assertListEqual(r.dtypes.index.tolist(), list('acdef')) + self.assertEqual(r.shape, (9, 5)) + self.assertEqual(r.op._op_type_, opcodes.TRANSFORM) + self.assertEqual(r.op.output_types[0], OutputType.dataframe) + self.assertEqual(len(r.chunks), 3) + self.assertEqual(r.chunks[0].shape, (np.nan, 5)) + self.assertListEqual(r.chunks[0].dtypes.index.tolist(), list('acdef')) + r = mdf.groupby('b').transform(transform_df).tiles() self.assertListEqual(r.dtypes.index.tolist(), list('acdef')) self.assertEqual(r.shape, (9, 5)) diff --git a/mars/dataframe/groupby/tests/test_groupby_execution.py b/mars/dataframe/groupby/tests/test_groupby_execution.py index 150da3a3a0..5139597de6 100644 --- a/mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/mars/dataframe/groupby/tests/test_groupby_execution.py @@ -385,11 +385,14 @@ def testGroupByApply(self): 'b': [1, 3, 4, 5, 6, 5, 4, 4, 4], 'c': list('aabaaddce')}) - def apply_df(df): + def apply_df(df, ret_series=False): df = df.sort_index() df.a += df.b if len(df.index) > 0: - df = df.iloc[:-1, :] + if not ret_series: + df = df.iloc[:-1, :] + else: + df = df.iloc[-1, :] return df def apply_series(s, truncate=True): @@ -408,6 +411,10 @@ def apply_series(s, truncate=True): pd.testing.assert_frame_equal(self.executor.execute_dataframe(applied, concat=True)[0].sort_index(), df1.groupby('b').apply(apply_df).sort_index()) + applied = mdf.groupby('b').apply(apply_df, ret_series=True) + pd.testing.assert_frame_equal(self.executor.execute_dataframe(applied, concat=True)[0].sort_index(), + df1.groupby('b').apply(apply_df, ret_series=True).sort_index()) + applied = mdf.groupby('b').apply(lambda df: df.a, output_type='series') pd.testing.assert_series_equal(self.executor.execute_dataframe(applied, concat=True)[0].sort_index(), df1.groupby('b').apply(lambda df: df.a).sort_index()) @@ -428,7 +435,7 @@ def apply_series(s, truncate=True): series1.groupby(lambda x: x % 3).apply(apply_series).sort_index()) sindex2 = pd.MultiIndex.from_arrays([list(range(9)), list('ABCDEFGHI')]) - series2 = pd.Series([3, 4, 5, 3, 5, 4, 1, 2, 3], index=sindex2) + series2 = pd.Series(list('CDECEDABC'), index=sindex2) ms2 = md.Series(series2, chunk_size=3) applied = ms2.groupby(lambda x: x[0] % 3).apply(apply_series) diff --git a/mars/dataframe/groupby/transform.py b/mars/dataframe/groupby/transform.py index 6dd7c78bf7..f033465fe5 100644 --- a/mars/dataframe/groupby/transform.py +++ b/mars/dataframe/groupby/transform.py @@ -69,12 +69,12 @@ def _infer_df_func_returns(self, in_groupby, dtypes, index): if in_groupby.op.output_types[0] == OutputType.dataframe_groupby else [OutputType.series] try: - empty_groupby = in_groupby.op.build_mock_groupby() + mock_groupby = in_groupby.op.build_mock_groupby() with np.errstate(all='ignore'), quiet_stdio(): if self.call_agg: - infer_df = empty_groupby.agg(self.func, *self.args, **self.kwds) + infer_df = mock_groupby.agg(self.func, *self.args, **self.kwds) else: - infer_df = empty_groupby.transform(self.func, *self.args, **self.kwds) + infer_df = mock_groupby.transform(self.func, *self.args, **self.kwds) # todo return proper index when sort=True is implemented index_value = parse_index(None, in_groupby.key, self.func) @@ -93,12 +93,15 @@ def _infer_df_func_returns(self, in_groupby, dtypes, index): index_value = index_value if index is None else parse_index(index) return dtypes, index_value - def __call__(self, groupby, dtypes=None, index=None): + def __call__(self, groupby, dtypes=None, dtype=None, name=None, index=None): in_df = groupby.inputs[0] + if dtypes is None and dtype is not None: + dtypes = (name, dtype) dtypes, index_value = self._infer_df_func_returns(groupby, dtypes, index) - for arg, desc in zip((self.output_types, dtypes, index_value), - ('output_types', 'dtypes', 'index')): + if index_value is None: + index_value = parse_index(None, (in_df.key, in_df.index_value.key)) + for arg, desc in zip((self.output_types, dtypes), ('output_types', 'dtypes')): if arg is None: raise TypeError(f'Cannot determine {desc} by calculating with enumerate data, ' 'please specify it as arguments') @@ -169,15 +172,107 @@ def execute(cls, ctx, op): ctx[op.outputs[0].key] = result -def groupby_transform(groupby, func, *args, dtypes=None, index=None, output_types=None, **kwargs): - # todo this can be done with sort_index implemented - if not groupby.op.groupby_params.get('as_index', True): - raise NotImplementedError('transform when set_index == False is not supported') - +def groupby_transform(groupby, f, *args, dtypes=None, dtype=None, name=None, index=None, + output_types=None, **kwargs): + """ + Call function producing a like-indexed DataFrame on each group and + return a DataFrame having the same indexes as the original object + filled with the transformed values + + Parameters + ---------- + f : function + Function to apply to each group. + + dtypes : Series, default None + Specify dtypes of returned DataFrames. See `Notes` for more details. + + dtype : numpy.dtype, default None + Specify dtype of returned Series. See `Notes` for more details. + + name : str, default None + Specify name of returned Series. See `Notes` for more details. + + *args + Positional arguments to pass to func + + **kwargs + Keyword arguments to be passed into func. + + Returns + ------- + DataFrame + + See Also + -------- + DataFrame.groupby.apply + DataFrame.groupby.aggregate + DataFrame.transform + + Notes + ----- + Each group is endowed the attribute 'name' in case you need to know + which group you are working on. + + The current implementation imposes three requirements on f: + + * f must return a value that either has the same shape as the input + subframe or can be broadcast to the shape of the input subframe. + For example, if `f` returns a scalar it will be broadcast to have the + same shape as the input subframe. + * if this is a DataFrame, f must support application column-by-column + in the subframe. If f also supports application to the entire subframe, + then a fast path is used starting from the second chunk. + * f must not mutate groups. Mutation is not supported and may + produce unexpected results. + + Notes + ----- + When deciding output dtypes and shape of the return value, Mars will + try applying ``func`` onto a mock grouped object, and the transform call + may fail. + + * For DataFrame output, you need to specify a list or a pandas Series + as ``dtypes`` of output DataFrame. ``index`` of output can also be + specified. + * For Series output, you need to specify ``dtype`` and ``name`` of + output Series. + + Examples + -------- + + >>> import mars.dataframe as md + >>> df = md.DataFrame({'A' : ['foo', 'bar', 'foo', 'bar', + ... 'foo', 'bar'], + ... 'B' : ['one', 'one', 'two', 'three', + ... 'two', 'two'], + ... 'C' : [1, 5, 5, 2, 5, 5], + ... 'D' : [2.0, 5., 8., 1., 2., 9.]}) + >>> grouped = df.groupby('A') + >>> grouped.transform(lambda x: (x - x.mean()) / x.std()).execute() + C D + 0 -1.154701 -0.577350 + 1 0.577350 0.000000 + 2 0.577350 1.154701 + 3 -1.154701 -1.000000 + 4 0.577350 -0.577350 + 5 0.577350 1.000000 + + Broadcast result of the transformation + + >>> grouped.transform(lambda x: x.max() - x.min()).execute() + C D + 0 4 6.0 + 1 3 8.0 + 2 4 6.0 + 3 3 8.0 + 4 4 6.0 + 5 3 8.0 + """ call_agg = kwargs.pop('_call_agg', False) - if not call_agg and isinstance(func, (dict, list)): - raise TypeError(f'Does not support transform with {type(func)}') + if not call_agg and isinstance(f, (dict, list)): + raise TypeError(f'Does not support transform with {type(f)}') - op = GroupByTransform(func=func, args=args, kwds=kwargs, output_types=output_types, + op = GroupByTransform(func=f, args=args, kwds=kwargs, output_types=output_types, call_agg=call_agg) - return op(groupby, dtypes=dtypes, index=index) + return op(groupby, dtypes=dtypes, dtype=dtype, name=name, index=index) diff --git a/mars/dataframe/tests/test_utils.py b/mars/dataframe/tests/test_utils.py index 3c6d43a2d4..4e38d4e1f2 100644 --- a/mars/dataframe/tests/test_utils.py +++ b/mars/dataframe/tests/test_utils.py @@ -15,6 +15,7 @@ import operator import sys import unittest +from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor from numbers import Integral @@ -26,7 +27,7 @@ from mars.dataframe.core import IndexValue from mars.dataframe.utils import decide_dataframe_chunk_sizes, decide_series_chunk_size, \ split_monotonic_index_min_max, build_split_idx_to_origin_idx, parse_index, filter_index_value, \ - infer_dtypes, infer_index_value, validate_axis, fetch_corner_data, quiet_stdio + infer_dtypes, infer_index_value, validate_axis, fetch_corner_data, make_dtypes, quiet_stdio from mars.session import new_session @@ -415,6 +416,19 @@ def testFetchDataFrameCornerData(self): pdf.to_string(max_rows=max_rows, min_rows=min_rows), f'failed when row == {row}') + def testMakeDtypes(self): + s = make_dtypes([int, float, np.dtype(int)]) + pd.testing.assert_series_equal(s, pd.Series([np.dtype(int), np.dtype(float), np.dtype(int)])) + + s = make_dtypes(OrderedDict([('a', int), ('b', float), ('c', np.dtype(int))])) + pd.testing.assert_series_equal(s, pd.Series([np.dtype(int), np.dtype(float), np.dtype(int)], + index=list('abc'))) + + s = make_dtypes(pd.Series([int, float, np.dtype(int)])) + pd.testing.assert_series_equal(s, pd.Series([np.dtype(int), np.dtype(float), np.dtype(int)])) + + self.assertIsNone(make_dtypes(None)) + def testQuietStdio(self): old_stdout, old_stderr = sys.stdout, sys.stderr diff --git a/mars/dataframe/utils.py b/mars/dataframe/utils.py index 940fc8f19f..b648d8598a 100644 --- a/mars/dataframe/utils.py +++ b/mars/dataframe/utils.py @@ -23,6 +23,7 @@ import numpy as np import pandas as pd from pandas.api.types import is_string_dtype +from pandas.api.extensions import ExtensionDtype from pandas.core.dtypes.cast import find_common_type try: import pyarrow as pa @@ -1021,6 +1022,23 @@ def to_arrow_dtypes(dtypes, test_df=None): return new_dtypes +def make_dtype(dtype): + if isinstance(dtype, (np.dtype, ExtensionDtype)): + return dtype + return np.dtype(dtype) if dtype is not None else None + + +def make_dtypes(dtypes): + if dtypes is None: + return None + if not isinstance(dtypes, pd.Series): + if isinstance(dtypes, dict): + dtypes = pd.Series(dtypes.values(), index=dtypes.keys()) + else: + dtypes = pd.Series(dtypes) + return dtypes.apply(make_dtype) + + _io_quiet_local = threading.local() _io_quiet_lock = threading.Lock() diff --git a/mars/deploy/local/tests/test_cluster.py b/mars/deploy/local/tests/test_cluster.py index 7f4f9f8793..102071070a 100644 --- a/mars/deploy/local/tests/test_cluster.py +++ b/mars/deploy/local/tests/test_cluster.py @@ -582,7 +582,7 @@ def wrapped(*arg, **kwargs): func = lambda x: x[1] + 1 r2 = df.apply(execute_with_session_check(func, session.session_id), - dtypes=np.dtype('int'), axis=1).to_pandas() + dtype=np.dtype('int'), axis=1).to_pandas() expected2 = raw.apply(func, axis=1) pd.testing.assert_series_equal(r2, expected2) diff --git a/mars/operands.py b/mars/operands.py index 93d8a7fe99..f34d866127 100644 --- a/mars/operands.py +++ b/mars/operands.py @@ -96,6 +96,8 @@ class Operand(AttributeAsDictKey, metaclass=OperandMetaclass): on_serialize=OutputType.serialize_list, on_deserialize=OutputType.deserialize_list) + _memory_scale = Int32Field('memory_scale') + _stage = Int32Field('stage', on_serialize=lambda s: s.value if s is not None else s, on_deserialize=lambda n: OperandStage(n) if n is not None else n) @@ -192,6 +194,10 @@ def pure_depends(self): return [False] * len(self.inputs or ()) return val + @property + def memory_scale(self): + return getattr(self, '_memory_scale', None) or 1 + @property def stage(self) -> Union[None, "OperandStage"]: return getattr(self, '_stage', None) @@ -521,7 +527,7 @@ def estimate_size(cls, ctx, op): max_sparse_size = np.nan if not np.isnan(max_sparse_size): store_size = min(store_size, max_sparse_size) - ctx[out.key] = (store_size, exec_size // len(outputs)) + ctx[out.key] = (store_size, exec_size * op.memory_scale // len(outputs)) @classmethod def concat_tileable_chunks(cls, tileable):