New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: parallel support in .apply #13111

Open
jreback opened this Issue May 7, 2016 · 7 comments

Comments

Projects
None yet
7 participants
@jreback
Contributor

jreback commented May 7, 2016

xref #5751

questions from SO.
mrocklins nice example of using .apply

So here is an example of how to do a parallel apply using dask. This could be baked into .apply() in pandas by the following signature enhancement:

current:

DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, 
                args=(), **kwds)

proposed:

DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, 
                engine=None, chunksize=None, args=(), **kwds)

where engine='dask' (or numba at some point) are possibilities
chunksize would map directly to npartitions and default to the number of cores if not specified.
further would allow engine to be a meta object like Dask(scheduler='multiprocessing') to support other options one would commonly pass (could also move chunksize inside that instead of as a separate object).

impl and timings:

from functools import partial
import pandas as pd
import dask
import dask.dataframe as dd
from dask import threaded, multiprocessing
from time import sleep

pd.__version__
dask.__version__

def make_frame(N):
    return pd.DataFrame({'A' : range(N)})
def slow_func(x):
    sleep(0.5)
    return x
df = make_frame(40)

# reg apply
def f1(df):
    return df.apply(slow_func, axis=1)
# dask apply
def f2(df, get):
    ddf = dd.from_pandas(df, npartitions=8, sort=False)
    return ddf.apply(slow_func, columns=df.columns, axis=1).compute(get=get)

f1 = partial(f1, df)
f2_threaded = partial(f2, df, threaded.get)
f2_multi = partial(f2, df, multiprocessing.get)

result1 = f1()
result2 = f2_threaded()
result3 = f2_multi()
In [18]: result1.equals(result2)
Out[18]: True

In [19]: result1.equals(result3)
Out[19]: True

In [20]: %timeit -n 1 -r 1 f1()
1 loop, best of 1: 20.6 s per loop

In [21]: %timeit -n 1 -r 1 f2_threaded()
1 loop, best of 1: 3.03 s per loop

In [22]: %timeit -n 1 -r 1 f2_multi()
1 loop, best of 1: 3.07 s per loop

Now for some caveats.

People want to parallelize a poor implementation. Generally you proceed thru the following steps first:

  • get your problem correct; optimizing incorrect results is useless
  • profile profile profile. This is always the first thing to do
  • use built-in pandas / numpy vectorized routines
  • use cython or numba on the user defined function
  • .apply is always the last choice
  • if its still not enough, parallelizaton.

You always want to make code simpler, not more complex. Its hard to know a-priori where bottlenecks are. People think .apply is some magical thing, its NOT, its JUST A FOR LOOP. The problem is people tend to throw in the kitchen sink, and just everything, which is just a terrible idea.

Ok my 2c about optimizing things.

In order for parallelization to actually matter the function you are computing should take some non-trivial amount of time to things like:

  • iteration costs of the loop
  • serialization time (esp if using multi-processing / distributed computing)
  • does the function release the GIL if not, then threading will probably not help much
  • development resources (your time)

If these criteria are met, then sure give it a try.

I think providing pandas a first class way to parallelize things, even tough people will just naively use it is probably not a bad thing.

Further extensions to this are: to_dask() (return a dask.dataframe to the user directly), and engine='dask' syntax for .groupby()

@jreback

This comment has been minimized.

Show comment
Hide comment
@jreback

jreback May 7, 2016

Contributor
Contributor

jreback commented May 7, 2016

@mcg1969

This comment has been minimized.

Show comment
Hide comment
@mcg1969

mcg1969 May 7, 2016

I too worry about premature parallelization, but I'm not sure how you prevent that. I think I'd rather try and find ways to encourage Numba compilation (and if there are missing Numba features preventing that from being effective, address them). At least then you could engage target=parallel and get multicore apply.

mcg1969 commented May 7, 2016

I too worry about premature parallelization, but I'm not sure how you prevent that. I think I'd rather try and find ways to encourage Numba compilation (and if there are missing Numba features preventing that from being effective, address them). At least then you could engage target=parallel and get multicore apply.

@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin May 7, 2016

Contributor

As a reminder, here is the recipe for a simple convert-apply-convert with minimal overhead

for dask > 0.8.2
out = dd.from_pandas(df, npartitions=..., sort=False, name='x').apply(udf).compute()

Contributor

mrocklin commented May 7, 2016

As a reminder, here is the recipe for a simple convert-apply-convert with minimal overhead

for dask > 0.8.2
out = dd.from_pandas(df, npartitions=..., sort=False, name='x').apply(udf).compute()

@jorisvandenbossche jorisvandenbossche modified the milestones: Next Major Release, 0.19.0 Aug 13, 2016

@rtkaleta

This comment has been minimized.

Show comment
Hide comment
@rtkaleta

rtkaleta Mar 3, 2017

Contributor

For those also wondering the dd above is from import dask.dataframe as dd.

You must also pass axis=1 in the call to apply because at present dask only supports applying functions to each row. Consider specifying appropriate meta too.

Contributor

rtkaleta commented Mar 3, 2017

For those also wondering the dd above is from import dask.dataframe as dd.

You must also pass axis=1 in the call to apply because at present dask only supports applying functions to each row. Consider specifying appropriate meta too.

@enlighter

This comment has been minimized.

Show comment
Hide comment
@enlighter

enlighter May 9, 2018

What kind of effort would be required exactly to implement this feature?

I think if someone gives a fairly detailed answer to that question, it should help all concerned people in the community.

What kind of effort would be required exactly to implement this feature?

I think if someone gives a fairly detailed answer to that question, it should help all concerned people in the community.

@TomAugspurger

This comment has been minimized.

Show comment
Hide comment
@TomAugspurger

TomAugspurger May 9, 2018

Contributor

@enlighter does the dask-based solution work for you? Properly supporting this in pandas would be a decent amount of effort. Dask already takes care of much of that.

Contributor

TomAugspurger commented May 9, 2018

@enlighter does the dask-based solution work for you? Properly supporting this in pandas would be a decent amount of effort. Dask already takes care of much of that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment