Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous output variation of Parallel.__call__ #79

Closed
duckworthd opened this issue Aug 27, 2013 · 5 comments
Closed

Asynchronous output variation of Parallel.__call__ #79

duckworthd opened this issue Aug 27, 2013 · 5 comments

Comments

@duckworthd
Copy link

Parallel.retrieve ensures that order is maintained when Parallel.__call__ is called on an iterable. Rather than returning a list of results in the same order as the input, I propose a generator-based version of Parallel.__call__ that yields output as it is ready without ensuring order.

>>> workers = Parallel(n_jobs=8)
>>> workers(delayed(sqrt)(i) for i in range(10))       # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> workers.async(delayed(square)(i) for i in range(10))   # <generator object _____ at 0x______>

Thoughts? Issues?

@GaelVaroquaux
Copy link
Member

Parallel.retrieve ensures that order is maintained when Parallel.call is
called on an iterable. Rather than returning a list of results in the same
order as the input, I propose a generator-based version of Parallel.call
that yields output as it is ready without ensuring order.

workers = Parallel(n_jobs=8)
workers(delayed(sqrt)(i) for i in range(10)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
workers.async(delayed(square)(i) for i in range(10)) # <generator object _____ at 0x______>

Thoughts? Issues?

It would be fantastic, but I have in mind that it would be a bit tricky
to implement. I somehow fear deadlocks as there is a lock between the
dispatch queue and the retrieve code.

However, I am very open to a pull request to discuss these issues based
on code rather than gut feelings.

The way I would suggest to do it is with a keyword argument 'async' to
Parallel.

Thanks heaps for proposing this, and sorry for my slow reply.

@duckworthd
Copy link
Author

Hi Gael,

Below is a very barebones implementation (with example) of an asynchronous version of Parallel.__call__. I don't have the eye for catching deadlocks that you may have, so would you might taking a look? In particular, I have TODO I'm not quite sure how to solve and another TODO where there's more polling happening than I'd like. Suggestions appreciated.

import multiprocessing
import Queue


class Parallel(object):
  def __init__(self, n_jobs=None):
    self.pool = multiprocessing.Pool(n_jobs)

  def _dispatch(self, iterator, calls):
    n_calls = 0   # number of outputs to expect (could be unbounded)
    for func, args, kwargs in calls:
      n_calls += 1

      # asynchronously execute function and put its result to the iterator when
      # the function returns
      self.pool.apply_async(
          func, args, kwargs, callback=iterator.put
        )

    # tell the result iterator how many results to expect
    iterator.length(n_calls)

  def __call__(self, calls):
    result_iterator = ResultIterator()

    # asynchronously put tasks to the iterator
    self._dispatch(result_iterator, calls)   # TODO this should be done async

    # return iterator holding task outputs
    return result_iterator


class ResultIterator(object):
  """An iterator whose length will be known later"""

  def __init__(self, *args, **kwargs):
    self._queue    = multiprocessing.Queue()
    self._returned = 0
    self._length   = float('inf')

  def put(self, result):
    """Add a new result to the queue"""
    self._queue.put(result)

  def length(self, length):
    self._length = length

  def __iter__(self):
    while True:
      if self._returned >= self._length:
        raise StopIteration()
      else:
        # TODO this takes more resources than it should, no?
        try:
          yield self._queue.get(block=False)
          self._returned += 1
        except Queue.Empty:
          # still waiting for next result
          pass


def wait(n):
  import time
  print "enter wait(%f)" % n
  time.sleep(n)
  print "leave wait(%f)" % n
  return n


if __name__ == '__main__':
  from joblib import delayed
  import random; random.seed(0)

  args    = [i for i in range(4)]
  random.shuffle(args)
  results = Parallel()(delayed(wait)(arg) for arg in args)
  for i, result in enumerate(results):
    print 'returned: (%d) %f' % (i, result)

@GaelVaroquaux
Copy link
Member

Hi David,

Unfortunately this barebones implementation does not give us a lot of the
functionalities that are valuable in joblib, in particular error
management, and on-the-fly dispatching of jobs. These are the features
that leed to having 2 threads in the dispatching and potential deadlocks.

I'd much prefer that you submitted a patch to the existing code, so that
we can have a look at it and see if it raises potential issues such as
deadlocks.

Cheers

@duckworthd
Copy link
Author

@GaelVaroquaux I've been toying with capturing exceptions, but I'm having trouble re-raising them later. It seems that the callback argument to Pool.apply_async(..., callback) is applied in a separate thread (not a process, an actual thread!). Are you familiar with a way of capturing exceptions thrown in other threads and re-raising them in the main thread asynchronously?

@fcharras
Copy link
Contributor

Done in #1393 and #1463

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants