New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PicklingError with dask.distributed joblib inside Executor.map #389

Closed
stsievert opened this Issue Sep 2, 2016 · 5 comments

Comments

Projects
None yet
3 participants
@stsievert

stsievert commented Sep 2, 2016

In my use case, I often want to call a function repeatably with different parameters (test_model below). As these functions are a significant task load, I use distributed to run these functions on separate machines. In these functions I call with different parameters, I would like to call another function (model_performance) many times to obtain some average.

The natural way to do this is use distributed's joblib frontend to distribute these tasks on the different machines. However, in distributed#465 we boiled it down to the case where the code containing the joblib function can't be pickled and throws a PicklingError (full stack trace for example below).

It'd be nice to the functionality below -- we found a way to make it work only using distributed, but it's not nearly as clean.

import distributed.joblib
import joblib
from distributed import Executor
import np

def model_performance(param):
    """ Returns 0/1 if model has failure/success respectively """
    # ...
    return random.choice([0, 1])

def test_model(param, n=10):
    """ calls model_performance n times to get average performance """
    # ideally: "with joblib.parallel_backend('distributed', ...):"
    y = joblib.Parallel()(joblib.delayed(model_performance)(param) for _ in range(n))
    return {'error', sum(y) / n, 'param': param}

e = Executor()
error = e.map(repeated_tests, [1, 2, 3, 4])
error = e.gather(error)
error = list(error)
@GaelVaroquaux

This comment has been minimized.

Show comment
Hide comment
@GaelVaroquaux

GaelVaroquaux Sep 2, 2016

Member

It seems that you forgot the traceback.

As far as I can tell, your code example does not allow to reproduce the
problem (more so, it doesn't run, but even trying to interpret it to fix
the obvious errors with it doesn't lead to a problem).

If I understand things correctly, the problem is unrelated to dask, and
is that you are trying to run joblib.Parallel on a function that does not
pickle.

If that's the case, I am afraid that joblib Parallel does not support
functions that cannot be not pickled. This is a limitation, but not a
bug. You need to fix your function so that it pickles.

Could you either confirm my interpretation, or provide us with the
traceback and a simple example that runs and on which we can reproduce
the problem.

Member

GaelVaroquaux commented Sep 2, 2016

It seems that you forgot the traceback.

As far as I can tell, your code example does not allow to reproduce the
problem (more so, it doesn't run, but even trying to interpret it to fix
the obvious errors with it doesn't lead to a problem).

If I understand things correctly, the problem is unrelated to dask, and
is that you are trying to run joblib.Parallel on a function that does not
pickle.

If that's the case, I am afraid that joblib Parallel does not support
functions that cannot be not pickled. This is a limitation, but not a
bug. You need to fix your function so that it pickles.

Could you either confirm my interpretation, or provide us with the
traceback and a simple example that runs and on which we can reproduce
the problem.

@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Sep 6, 2016

I've resolved those issues. Now, I show (a) the normal functionality without using distributed (which works as expected) and (b) with dask.distributed (which throws a PicklingError).

import distributed.joblib
import joblib
from distributed import Executor
import numpy as np

def model_performance(param):
    return np.random.choice([0, 1])

def test_model(param, n=100):
    """ calls model_performance n times to get average performance """
    # ideally: "with joblib.parallel_backend('distributed', ...):"
    y = joblib.Parallel()(joblib.delayed(model_performance)(param)
                          for _ in range(n))
    return {'error': sum(y) / n, 'param': param}

plain = list(map(test_model, [1, 2, 3, 4]))
print(plain)

e = Executor()
dist = e.map(test_model, [1, 2, 3, 4])
dist = e.gather(dist)

The traceback remains the same as above:

Traceback (most recent call last):
  File "joblib_issue_v1.py", line 23, in <module>
    dist = e.gather(dist)
  File "/Users/scott/anaconda/lib/python3.5/site-packages/distributed/executor.py", line 828, in gather
    return sync(self.loop, self._gather, futures, errors=errors)
  File "/Users/scott/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 116, in sync
    six.reraise(type(error[0]), error[0], traceback[0])
  File "/Users/scott/anaconda/lib/python3.5/site-packages/six.py", line 686, in reraise
    raise value
  File "/Users/scott/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 102, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/Users/scott/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/Users/scott/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/Users/scott/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/scott/anaconda/lib/python3.5/site-packages/distributed/executor.py", line 747, in _gather
    d['traceback'])
  File "/Users/scott/anaconda/lib/python3.5/site-packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "joblib_issue_v1.py", line 15, in test_model
    for _ in range(n))
  File "/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py", line 758, in __call__
    while self.dispatch_one_batch(iterator):
  File "/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py", line 603, in dispatch_one_batch
    tasks = BatchedCalls(itertools.islice(iterator, batch_size))
  File "/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py", line 127, in __init__
    self.items = list(iterator_slice)
  File "joblib_issue_v1.py", line 15, in <genexpr>
    for _ in range(n))
  File "/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py", line 183, in delayed
    pickle.dumps(function)
_pickle.PicklingError: Can't pickle <function model_performance at 0x1150e19d8>: it's not the same object as __main__.model_performance

stsievert commented Sep 6, 2016

I've resolved those issues. Now, I show (a) the normal functionality without using distributed (which works as expected) and (b) with dask.distributed (which throws a PicklingError).

import distributed.joblib
import joblib
from distributed import Executor
import numpy as np

def model_performance(param):
    return np.random.choice([0, 1])

def test_model(param, n=100):
    """ calls model_performance n times to get average performance """
    # ideally: "with joblib.parallel_backend('distributed', ...):"
    y = joblib.Parallel()(joblib.delayed(model_performance)(param)
                          for _ in range(n))
    return {'error': sum(y) / n, 'param': param}

plain = list(map(test_model, [1, 2, 3, 4]))
print(plain)

e = Executor()
dist = e.map(test_model, [1, 2, 3, 4])
dist = e.gather(dist)

The traceback remains the same as above:

Traceback (most recent call last):
  File "joblib_issue_v1.py", line 23, in <module>
    dist = e.gather(dist)
  File "/Users/scott/anaconda/lib/python3.5/site-packages/distributed/executor.py", line 828, in gather
    return sync(self.loop, self._gather, futures, errors=errors)
  File "/Users/scott/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 116, in sync
    six.reraise(type(error[0]), error[0], traceback[0])
  File "/Users/scott/anaconda/lib/python3.5/site-packages/six.py", line 686, in reraise
    raise value
  File "/Users/scott/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 102, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/Users/scott/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/Users/scott/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/Users/scott/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/scott/anaconda/lib/python3.5/site-packages/distributed/executor.py", line 747, in _gather
    d['traceback'])
  File "/Users/scott/anaconda/lib/python3.5/site-packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "joblib_issue_v1.py", line 15, in test_model
    for _ in range(n))
  File "/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py", line 758, in __call__
    while self.dispatch_one_batch(iterator):
  File "/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py", line 603, in dispatch_one_batch
    tasks = BatchedCalls(itertools.islice(iterator, batch_size))
  File "/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py", line 127, in __init__
    self.items = list(iterator_slice)
  File "joblib_issue_v1.py", line 15, in <genexpr>
    for _ in range(n))
  File "/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py", line 183, in delayed
    pickle.dumps(function)
_pickle.PicklingError: Can't pickle <function model_performance at 0x1150e19d8>: it's not the same object as __main__.model_performance
@lesteve

This comment has been minimized.

Show comment
Hide comment
@lesteve

lesteve Sep 7, 2016

Contributor

Does it work if test_model (edited sorry I meant model_performance) is in a separate module ? If that is the case it would be an easy work-around.

From the dask issue mentioned above:

I recommend checking in with the Joblib developers and verifying that joblib can be run in multiple threads safely.

@ogrisel, @GaelVaroquaux have you encountered problems related to running joblib in multiple threads?

Contributor

lesteve commented Sep 7, 2016

Does it work if test_model (edited sorry I meant model_performance) is in a separate module ? If that is the case it would be an easy work-around.

From the dask issue mentioned above:

I recommend checking in with the Joblib developers and verifying that joblib can be run in multiple threads safely.

@ogrisel, @GaelVaroquaux have you encountered problems related to running joblib in multiple threads?

@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Sep 8, 2016

Does it work if model_performance is in a separate module?

Yes, it works. However, doing this throws a warning:

/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py:540: UserWarning: Multiprocessing-backed parallel loops cannot be nested, setting n_jobs=1
**self._backend_args)

Note that this was run with the case above with the default Parallel. I'd rather not have this -- on my local machine with a simple test, I don't see any speed improvement.

stsievert commented Sep 8, 2016

Does it work if model_performance is in a separate module?

Yes, it works. However, doing this throws a warning:

/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py:540: UserWarning: Multiprocessing-backed parallel loops cannot be nested, setting n_jobs=1
**self._backend_args)

Note that this was run with the case above with the default Parallel. I'd rather not have this -- on my local machine with a simple test, I don't see any speed improvement.

@GaelVaroquaux

This comment has been minimized.

Show comment
Hide comment
@GaelVaroquaux

GaelVaroquaux Oct 1, 2016

Member

The problem here is that the function cannot be pickle, as it is not defined in a module with a clear import path (this is how Python pickles function, not something that we can do much about it).

I am closing this issue, to avoid clogging the tracker.

Member

GaelVaroquaux commented Oct 1, 2016

The problem here is that the function cannot be pickle, as it is not defined in a module with a clear import path (this is how Python pickles function, not something that we can do much about it).

I am closing this issue, to avoid clogging the tracker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment