Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MRG+1] Refactored Parallel backends #306

Closed
wants to merge 6 commits into from

Conversation

@NielsZeilemaker
Copy link

NielsZeilemaker commented Jan 25, 2016

  • The sequential, threadpool, and multiprocessing backends are now
    refactored into separate classes.
  • Fixed tests accordingly

Some context, I am planning to create a YARN backend to allow you to run a python proces in containers spawned by YARN. It's going to be a separate plugin, to not have all hdfs/yarn dependencies in this project. My endgoal is to be able to run scikit-learn on YARN.

scikit-learn/scikit-learn#6223


from .format_stack import format_exc, format_outer_frames
from .logger import Logger, short_format_time
from .my_exceptions import TransportableException, _mk_exception
from .disk import memstr_to_kbytes
from ._backends import MultiProcessingBackend, ThreadingBackend, SequentialBackend

This comment has been minimized.

Copy link
@aabadie

aabadie Jan 26, 2016

Contributor

This line is too long

class MultiProcessingBackend(ThreadingBackend):

def effective_n_jobs(self, n_jobs):
""" Determine the number of jobs which are going to run in parallel. Will

This comment has been minimized.

Copy link
@aabadie

aabadie Jan 26, 2016

Contributor

This line should be splitted.

ideal_batch_size = int(
old_batch_size * MIN_IDEAL_BATCH_DURATION / batch_duration)
# Multiply by two to limit oscilations between min and max.
self._effective_batch_size = batch_size = max(2 * ideal_batch_size, 1)

This comment has been minimized.

Copy link
@aabadie

aabadie Jan 26, 2016

Contributor

line too long

self.parallel.print_progress()
if self.parallel._original_iterator is not None:
self.parallel.dispatch_next()


###############################################################################
def set_parallel_backend(name, cls):

This comment has been minimized.

Copy link
@aabadie

aabadie Jan 26, 2016

Contributor

Maybe this function should be called 'register_parallel_backend' to better reflect what is does. I think it would be worth adding some checks on the class registered (just to be sure it's a valid parallel backend).

This comment has been minimized.

Copy link
@NielsZeilemaker

NielsZeilemaker Jan 26, 2016

Author

I could do a check for

issubclass(cls, SequentialBackend) 

or add another class which defines the interface by raising notimplemented errors.
Naming wise, whatever you guys prefer.

This comment has been minimized.

Copy link
@aabadie

aabadie Jan 26, 2016

Contributor

I agree with using issubclass. I'm now wondering if we want to accept overriding an already registered backend. Maybe @GaelVaroquaux has an idea on this ?

This comment has been minimized.

Copy link
@NielsZeilemaker

NielsZeilemaker Jan 26, 2016

Author

Maybe we could add a force argument, which prevents you from overriding an already registered backend by mistake?

This comment has been minimized.

Copy link
@GaelVaroquaux

GaelVaroquaux Jan 26, 2016

Member

Maybe we could add a force argument, which prevents you from overriding an
already registered backend by mistake?

I'd rather not: it forces the user to be aware of too many things.

What I think would be good is to change the default value for the
"backend" keyword argument in joblib.Parallel from "multiprocessing" to
"default". Hence, when the user doesn't specify a backend, it the global
"set_backend" function can override the default. But if the user is
explicit about the backend, that choice is kept.

This comment has been minimized.

Copy link
@NielsZeilemaker

NielsZeilemaker Jan 26, 2016

Author

So basically:

  • be able to overwrite the default backend
  • add new backends which are not already registered

The "multiprocessing" to "default" change was is already implemented.

@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Jan 26, 2016

retest this please

@GaelVaroquaux

This comment has been minimized.

Copy link
Member

GaelVaroquaux commented Jan 26, 2016

A few early comments:

  • It would be good to have a context manager to set the backend in a
    local context
  • The file _backends.py should probably be called _parallel_backends.py

Thanks a lot for this contribution, it will be useful!

@aabadie

This comment has been minimized.

Copy link
Contributor

aabadie commented Jan 26, 2016

It also seems that the test results on appveyor are not stable, see https://ci.appveyor.com/project/joblib-ci/joblib/build/1.0.445/job/7li1lxc290hb640x

Maybe @ogrisel has an idea.

register_parallel_backend("default", TestParallelBackend)
assert_equal(VALID_BACKENDS["default"], TestParallelBackend)
register_parallel_backend("default", VALID_BACKENDS["multiprocessing"])

This comment has been minimized.

Copy link
@aabadie

aabadie Jan 26, 2016

Contributor

Sorry, I was not clear. I thought of adding another test function for the new TestParallelBackend class and in the initial test_overwrite_default_backend add an assert to verify the 'default' backend has correctly been updated. Something like that:

def test_overwrite_default_backend():
    register_parallel_backend("default", VALID_BACKENDS["multiprocessing"])
    assert_equal(VALID_BACKENDS["default"], VALID_BACKENDS["multiprocessing"])

def test_register_parallel_backend():
    register_parallel_backend("unit-testing", TestParallelBackend)
    assert_true("unit-testing" in VALID_BACKENDS)
    assert_equal(VALID_BACKENDS["unit-testing"], TestParallelBackend)

Maybe this is a bit overkill though.
@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Jan 26, 2016

@GaelVaroquaux I'm not sure what you mean by having a context manager. Could you give an example?
Furthermore, I'll rebase and squash all commits after the pull request is approved by you guys.

@GaelVaroquaux

This comment has been minimized.

Copy link
Member

GaelVaroquaux commented Jan 26, 2016

@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Jan 27, 2016

@GaelVaroquaux I like the idea of a context manager. However, I feel it would be an extension to this pull request, and therefore a seperate one.

The contextmanager should then look something like:

@contextmanager
def parallel_backend(cls):
    old_backend = VALID_BACKENDS['default']
    register_parallel_backend('default', cls)
    yield
    register_parallel_backend('default', old_backend)
def __init__(self, parallel):
self.parallel = parallel

def effective_n_jobs(self, n_jobs):

This comment has been minimized.

Copy link
@GaelVaroquaux

GaelVaroquaux Jan 27, 2016

Member

It would be good to have on liner docstrings on all these methods: it would help other developers understand what the philosophy/purpose is behind them.

This comment has been minimized.

Copy link
@NielsZeilemaker
@lesteve

This comment has been minimized.

Copy link
Member

lesteve commented Jan 27, 2016

It would be good to have a context manager to set the backend in a local context

I forgot to comment on this: I did not get the point of being able to set the default backend in a local context but maybe I am missing something. Can you not pass explicitly backend=whatever when you create your Parallel object?

@GaelVaroquaux

This comment has been minimized.

Copy link
Member

GaelVaroquaux commented Jan 27, 2016

def effective_n_jobs(self, n_jobs):
""" Determine the number of jobs which are going to run in parallel """
if n_jobs == 0:
raise ValueError('n_jobs == 0 in Parallel has no meaning')

This comment has been minimized.

Copy link
@GaelVaroquaux

GaelVaroquaux Jan 27, 2016

Member

Shouldn't this error be raised in all backends?

This comment has been minimized.

Copy link
@NielsZeilemaker
# to sequential mode
return 1
elif n_jobs < 0:
n_jobs = max(mp.cpu_count() + 1 + n_jobs, 1)

This comment has been minimized.

Copy link
@GaelVaroquaux

GaelVaroquaux Jan 27, 2016

Member

We should really implement a helper function to compute this, and expose it in the joblib public API (the init). The reason is that application code sometimes really on such value, for instance to split the data in batches for each worker.

This comment has been minimized.

Copy link
@NielsZeilemaker

NielsZeilemaker Jan 27, 2016

Author

I'm not sure about that. Depending on the backend, n_jobs == -1 could have a different meaning. So it should be computed in the backend class you're going to use to distribute the work on.

E.g. in the YARN context, we not going to limit ourselves to the local amount of cores available.

This comment has been minimized.

Copy link
@GaelVaroquaux

GaelVaroquaux via email Jan 27, 2016

Member

This comment has been minimized.

Copy link
@NielsZeilemaker

NielsZeilemaker Jan 27, 2016

Author

I'll implement a pulbic effective_n_jobs method in parallel which needs a backend and n_jobs argument. This will then construct a backend and calls the effective_n_jobs method of the newly constructed backend.

This comment has been minimized.

Copy link
@GaelVaroquaux

GaelVaroquaux Jan 27, 2016

Member

I'll implement a pulbic effective_n_jobs method in parallel which needs a
backend and n_jobs argument.

I'd rather have this as a function. If you look at how it's used eg in
scikit-learn, it makes more sense to have a function, rather than
instanciating a Parallel object:

https://github.com/scikit-learn/scikit-learn/blob/317dea8a05b0087f83318f318d112976e90566ff/sklearn/decomposition/dict_learning.py#L473
https://github.com/scikit-learn/scikit-learn/blob/691e39d9a91e57b3da4c03668c186f1d07d2b466/sklearn/metrics/pairwise.py#L1057

By the way, interestingly one of those two code snippets is wrong :).
Which tells us that it is easy to get this wrong.

This comment has been minimized.

Copy link
@NielsZeilemaker

NielsZeilemaker Jan 27, 2016

Author

I going to be a bit bold, and say that I believe both of them are wrong 😄

I both snippets, n_jobs is only passed to Parallel, which can handle n_jobs == -1 without a problem. So there is no need to be determining it beforehand.

def get_exceptions(self):
# We are using multiprocessing, we also want to capture
# KeyboardInterrupts
from .parallel import WorkerInterrupt

This comment has been minimized.

Copy link
@lesteve

lesteve Jan 27, 2016

Member

It feels like this goes in the wrong direction. The parallel module should use _parallel_backends but not the other way around. Maybe the best thing to do is to move WorkerInterrrupt to my_exceptions.

This comment has been minimized.

Copy link
@GaelVaroquaux

GaelVaroquaux Jan 27, 2016

Member

I agree with the comment of @lesteve that the exceptions are probably not located in the right module.

This comment has been minimized.

Copy link
@NielsZeilemaker

NielsZeilemaker Jan 27, 2016

Author

I was a bit worried that moving this object from parallel to my_exceptions would change the api. So I left it there. Same goes for SafeFunction.
I could move them if you like?

This comment has been minimized.

Copy link
@GaelVaroquaux

GaelVaroquaux Jan 27, 2016

Member

I was a bit worried that moving this object from parallel to my_exceptions
would change the api.

As far as I am concerned, that's internal. People importing and using
this object directly are using undocumented joblib features. Do you see
valid important usecases for such usage?

This comment has been minimized.

Copy link
@NielsZeilemaker

NielsZeilemaker Jan 27, 2016

Author

No, I don't. I'll move both objects then, the SafeFunction and WorkerInterrupt.

This comment has been minimized.

Copy link
@lesteve

lesteve Jan 27, 2016

Member

No, I don't. I'll move both objects then, the SafeFunction and WorkerInterrupt.

It feels like SafeFunction can be moved _parallel_backends since that is the only place it is being used.


from .format_stack import format_exc, format_outer_frames
from .logger import Logger, short_format_time
from .my_exceptions import TransportableException, _mk_exception
from .disk import memstr_to_kbytes
from ._parallel_backends import (MultiProcessingBackend, ThreadingBackend,
SequentialBackend)

This comment has been minimized.

Copy link
@lesteve

lesteve Jan 27, 2016

Member

This looks like a PEP8 to me. You should spend some time to configure your editor to run flake8 in order to highlight this kind of problems while you are editing.

@lesteve

This comment has been minimized.

Copy link
Member

lesteve commented Jan 27, 2016

I forgot to comment on this: I did not get the point of being able to set the
default backend in a local context but maybe I am missing something. Can you
not pass explicitly backend=whatever when you create your Parallel object?

When you are using a blackbox algorithm / object on which you have no
control. Eg a scikit-learn GridSearchCV.

OK fair point.

@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Jan 27, 2016

Still, those implementations would use the "default" backend. So overwriting the default backend in that case would suffice.

@aabadie

This comment has been minimized.

Copy link
Contributor

aabadie commented Jan 27, 2016

The contextmanager should then look something like:
....

The local default backend switch should be protected using try finally to ensure the old default backend is correctly reset in case of a failure.
Something like this:

@contextmanager
def parallel_backend(cls):
    old_backend = VALID_BACKENDS['default']
    try:
        register_parallel_backend('default', cls)
        yield
    finally:
        register_parallel_backend('default', old_backend)

I'm not super fund of using the same function to override the default backend and register new backends. Maybe the default backend should be set using an explicit function set_default_parallel_backend ?

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Mar 2, 2016

Initialize is the method where you need to setup things. And indeed the effective_n_jobs method is called before it. You should think of the effective_n_jobs method as being a sanity check of the n_jobs parameter. E.g. the existing backends use it to convert a n_jobs < 0 into an actual n_jobs parameter.

But initialize might be used to setup network connections with the backend infrastructure and computing effective_n_jobs might not be possible before calling initialize. We probably need to re-think our use of effective_n_jobs and maybe deprecate it.

I think the only valid use case to make it possible for caller code to know how many workers are going to be used in subsequent parallel code so has to split the data to process into a reasonable number of tasks (typically effective_n_jobs or twice that) so at to mitigate dispatching / scheduling overhead while using all the requested workers.

In practice we do not need this feature as badly as we used to in scikit-learn because:

  • random forests now use the low-overhead threading backend where it's fine to pass fit one tree at a time
  • other multiprocessing-based workload are protected by the auto-batching feature that make sure that very short tasks are automatically grouped and never cause a big scheduler overload.
@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Mar 2, 2016

I actually implemented a public effective_n_jobs method upon the request of @GaelVaroquaux. I think it should just be used as a best effort sanity check. So without the need of initializing the backend.

I agree that exposing the effective_n_jobs might not be needed as the backend can autobatch smaller jobs to reduce the overhead.

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Mar 2, 2016

Maybe we can add an alias for threading which allows it to be overwritten with another backend.

This feels too hackish / specific to me.

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Mar 2, 2016

FYI I am working on the default backend / context manager refactoring.

@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Mar 2, 2016

I think that the programmer designing the parallel job can determine if a job must run on the threading backend or if it would be nice to run on the threading backend. So why not give him this possibility?

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Mar 2, 2016

This is what the backend argument of Parallel class is for.

@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Mar 3, 2016

That's what I meant, let him pass backend="prefer_threading" or backend="threading" to distinguish between should use threading and must use threading.

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Mar 3, 2016

I implemented the context manager for switching the default backend in NielsZeilemaker#2. I will now give a deeper look at @mrocklin's prototype and in particular the the effective_n_jobs / initialize and backend parameters issues.

Context manager to change the default backend
@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Mar 3, 2016

@ogrisel thanks for the pull request, i've merged it. My backend is available at https://github.com/NielsZeilemaker/yarnpool/blob/master/yarnbackend.py btw.

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Mar 7, 2016

@NielsZeilemaker I issued a new PR to further simplify the API and make it more flexible (by registering any callable as a factory). I will do more experiments to adapt @mrocklin PoC backend for distributed with this new API.

Your own backend will need to be adapted to remove the parallel instance from the constructor and implement the configure function with the new arguments.

More simplification refactoring
@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Mar 14, 2016

Merged as #320. Thanks again @NielsZeilemaker!

@ogrisel ogrisel closed this Mar 14, 2016
@ogrisel ogrisel removed the need Review label Mar 14, 2016
@minrk minrk mentioned this pull request Mar 15, 2016
@minrk

This comment has been minimized.

Copy link

minrk commented Mar 15, 2016

This is really great! I added a prototype for IPython parallel over at ipython/ipyparallel#122, and it seems really easy, so kudos!

The one thing I'm not familiar enough with to really decide on is tuning the batching. The AutoBatchingMixin is slick, but I'm not sure how to get the most out of IPython Parallel with these APIs. To use IPython Parallel really efficiently, many more jobs than engines should be queued, so they can be making their way across the network while the engines are working on early tasks. Perhaps I should return a large number from effective_n_jobs?

Another question is that IPython has its own mechanisms for batching more efficiently than submitting one chunk at a time, especially with the DirectView API. It seems like to leverage that, I would need an apply_batch_async, not just the single-call apply_async. Could there be a mechanism for the backend to fire N jobs in a single call, that defaults to mapping apply_async?

@GaelVaroquaux

This comment has been minimized.

Copy link
Member

GaelVaroquaux commented Mar 15, 2016

@NielsZeilemaker

This comment has been minimized.

Copy link
Author

NielsZeilemaker commented Mar 15, 2016

Great news indeed. I'm already went ahead and started to improve dask/knit to support dynamic allocation of workers on YARN, see dask/knit#51.

@minrk I think the AutoBatchingMixin will mostly help to reduce the overhead of scheduling many small jobs. So if you have jobs which take more than 2 seconds to complete, it shouldn't do anything.
Increasing the effective_n_jobs, won't do much, as the Parallel class itself doesn't do much with that information.

In order to define your own batches, you can use the BatchedCalls object. And I guess we can modify the dispatch_one_batch method to not add another BatchedCalls object if the batch_size is equal to 1.
https://github.com/joblib/joblib/blob/master/joblib/parallel.py#L573
@ogrisel any comment on this?

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Mar 15, 2016

+1, I think the scheduling overhead of ipython parallel is on the same order of magnitude as the one for multiprocessing (on a local network), so I would advise to re-use the same magic constants.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

8 participants
You can’t perform that action at this time.