Skip to content

Commit

Permalink
remove return_type from map_partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Aug 7, 2015
1 parent d462484 commit bca17d3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 51 deletions.
51 changes: 16 additions & 35 deletions dask/dataframe/core.py
Expand Up @@ -169,17 +169,15 @@ def drop_duplicates(self):
def __len__(self):
return reduction(self, len, np.sum).compute()

def map_partitions(self, func, columns=no_default, return_type=None):
def map_partitions(self, func, columns=no_default):
""" Apply Python function on each DataFrame block
When using ``map_partitions`` you should provide both column
information, the name or list of column names of the output, as well as
the output type (``dd.DataFrame`` or ``dd.Series``).
When using ``map_partitions`` you should provide either the column
names (if the result is a DataFrame) or the name of the Series (if the
result is a Series). The output type will be determined by the type of
``columns``.
If you provide the column information only then the return type will be
inferred from the number of names
>>> df.map_partitions(lambda df: df.x + 1, name='x') # doctest: +SKIP
>>> df.map_partitions(lambda df: df.x + 1, columns='x') # doctest: +SKIP
>>> df.map_partitions(lambda df: df.head(), columns=df.columns) # doctest: +SKIP
Expand All @@ -189,15 +187,10 @@ def map_partitions(self, func, columns=no_default, return_type=None):
column_info: tuple or string
Column names or name of the output.
Defaults to names of the input.
return_type: class, default None
Specify the class of the result. Be sure to pass correct class
because it will not be validated by the function.
If None, class will be inferred from column names or default to
the class of the input.
"""
if columns == no_default:
columns = self.column_info
return map_partitions(func, columns, self, return_type=return_type)
return map_partitions(func, columns, self)

def random_split(self, p, seed=None):
""" Pseudorandomly split dataframe into different pieces row-wise
Expand Down Expand Up @@ -1221,42 +1214,30 @@ def apply_concat_apply(args, chunk=None, aggregate=None, columns=None,
aca = apply_concat_apply


def map_partitions(func, column_info, *args, **kwargs):
def map_partitions(func, columns, *args, **kwargs):
""" Apply Python function on each DataFrame block
column_info: tuple or string
Column names or name of the output
targets: list
List of target DataFrame / Series.
return_type: class, default None
Specify the class of the result. Be sure to pass correct class
because it will not be validated by the function.
If None, class of 1st element of ``args`` will be used.
"""
assert all(not isinstance(arg, _Frame) or
arg.divisions == args[0].divisions for arg in args)

token = kwargs.pop('token', None)

return_type = kwargs.pop('return_type', None)
if return_type is None:
if (isinstance(column_info, (str, unicode)) or not
isinstance(column_info, Iterable)):
return_type = Series
else:
return_type = type(args[0])

if (return_type == DataFrame and isinstance(column_info, (str, unicode)) or
return_type == Series and isinstance(column_info, (tuple, list, pd.Index))):
raise ValueError("Arguments to map_partitions are not consistent.\n"
"Received columns=%s and return_type=%s" %
(str(column_info), str(return_type)))

if kwargs:
raise ValueError("Keyword arguments not yet supported in map_partitions")

if (isinstance(columns, (str, unicode)) or not
isinstance(columns, Iterable)):
return_type = Series
else:
return_type = DataFrame

token = ((token or func),
column_info,
columns,
[arg._name if isinstance(arg, _Frame) else arg for arg in args])

name = 'map-partitions' + tokenize(token)
Expand All @@ -1268,7 +1249,7 @@ def map_partitions(func, column_info, *args, **kwargs):
for i in range(args[0].npartitions))

dasks = [arg.dask for arg in args if isinstance(arg, _Frame)]
return return_type(merge(dsk, *dasks), name, column_info, args[0].divisions)
return return_type(merge(dsk, *dasks), name, columns, args[0].divisions)


def categorize_block(df, categories):
Expand Down
18 changes: 2 additions & 16 deletions dask/dataframe/tests/test_dataframe.py
Expand Up @@ -393,7 +393,7 @@ def test_map_partitions_multi_argument():
def test_map_partitions():
assert eq(d.map_partitions(lambda df: df, columns=d.columns), full)

result = d.map_partitions(lambda df: df.sum(axis=1), 'a', return_type=dd.Series)
result = d.map_partitions(lambda df: df.sum(axis=1), 'a')
assert eq(result,full.sum(axis=1))


Expand Down Expand Up @@ -425,8 +425,7 @@ def test_map_partitions_column_info():
assert b.name == a.x.name
assert eq(df.x, b)

b = dd.map_partitions(lambda df: df.x + df.y, None, a,
return_type=dd.Series)
b = dd.map_partitions(lambda df: df.x + df.y, None, a)
assert b.name == None
assert isinstance(b, dd.Series)

Expand All @@ -452,19 +451,6 @@ def test_map_partitions_method_names():
assert b.name == 'x'


def test_map_partitions_return_type_and_names_agree():
df = pd.DataFrame({'x': [1, 2, 3, 4], 'y': [5, 6, 7, 8]})
a = dd.from_pandas(df, npartitions=2)
try:
a.map_partitions(lambda x: x, columns='zzzz', return_type=dd.DataFrame)
except ValueError as e:
assert 'zzzz' in str(e)
try:
a.map_partitions(lambda x: x, columns=['zzzz'], return_type=dd.Series)
except ValueError as e:
assert 'zzzz' in str(e)


def test_drop_duplicates():
assert eq(d.a.drop_duplicates(), full.a.drop_duplicates())
assert eq(d.drop_duplicates(), full.drop_duplicates())
Expand Down

0 comments on commit bca17d3

Please sign in to comment.