Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks are still run after being cancelled #33

Closed
robgolding opened this issue Jul 27, 2018 · 3 comments
Closed

Tasks are still run after being cancelled #33

robgolding opened this issue Jul 27, 2018 · 3 comments

Comments

@robgolding
Copy link

robgolding commented Jul 27, 2018

Firstly, thank you for such a great library! I'm having great success integrating this, but have come up against one problem: if you cancel a future that is not yet running via future.cancel(), it still gets run. 2 things stand out as odd, though:

  1. The task_done callback is called when the future is cancelled, as expected. This means it is not called again when the task actually finishes running.
  2. The task is killed after a short time, meaning it does not run to completion if it takes longer than a couple of seconds.

Note that I am running this on Python 2.7, using the backported concurrent.futures library (https://pypi.org/project/futures/).

Here is a minimal reproduction case which I have been using to test:

import os
import time

from pebble import ProcessPool
from concurrent.futures import TimeoutError, CancelledError

def function():
    print '[{}] Sleeping'.format(os.getpid())
    time.sleep(5)
    print '[{}] Done'.format(os.getpid())


def task_done(future):
    try:
        result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
    except CancelledError:
        print('Cancelled')
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function
    else:
        print('Function returned: {}'.format(result))

pool = ProcessPool(max_workers=1)
futures = []

for i in range(0, 10):
    future = pool.schedule(function)
    future.add_done_callback(task_done)
    futures.append(future)

time.sleep(2)

for future in futures:
    if not future.running() and not future.done():
        future.cancel()

pool.close()
pool.join()

The output is as follows:

(zapier)$ python demo.py
[1629] Sleeping
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
[1629] Done
Function returned: None
[1629] Sleeping
[1650] Sleeping
[1651] Sleeping
[1652] Sleeping
[1655] Sleeping
[1656] Sleeping
[1657] Sleeping
[1658] Sleeping
[1659] Sleeping

As you can see from the code, we're submitting 10 jobs to the pool, which has only a single worker. Each job waits for 5 seconds before returning. After 2 seconds, we cancel all the jobs which are not running or finished (of which there are 9, as the first job is still sleeping). At this point, all 9 task_done callbacks are fired as expected.

Then, once the first job finishes, we'd expect the program to exit as pool.join() should return. Instead, the 9 jobs that were previously cancelled each start to run, one after the other. The timestamps aren't shown in my output above, but this happens very fast—so the jobs are being killed almost immediately after starting. You can see by the incrementing pid that the worker process is indeed being killed in each case.


Is there anything we can do here to work around this, or potentially fix the issue? I'm still quite new to the concurrent.futures library, so I may well be doing something wrong!

Thank you again for the great work here!

Edit: I've just confirmed that this happens on Python 3 as well, using the standard library concurrent.futures module.

@noxdafox
Copy link
Owner

noxdafox commented Jul 27, 2018

Greetings,

the behaviour you are observing is expected and it's a limitation of the design.

Pebble is designed to keep the IPC overhead minimal offering the same performance of similar solutions (concurrent.futures, multiprocessing, billiard, etc...).

To ship the jobs to the worker processes, Pebble uses a simple pipe. Operating System pipes are implemented as buffers and provide interfaces very close to the ones of the sockets.

To maximize performance, the ProcessPool submits as many jobs as they fit within the pipe ensuring there will always be some job available for the workers (prevent starvation). Once a job is sent over the pipe, there is no way to intervene over it. You can imagine it like a message over a socket: you cannot modify messages which are in transit.

When you cancel a scheduled job, three possible scenarios may occur.

  1. The job was not yet sent over the pipe and it will be dropped before doing so.
  2. The job is already in progress as a worker is processing it. The ProcessPool will stop the worker.
  3. The job is travelling over the pipe. In this case, the first available worker will pick it up and start working on it. The ProcessPool realizes that a job related to a canceled future is in progress and stops the worker.

As the jobs in your example are very small, you are indeed experiencing the third and last scenario. Due to the concurrent nature of the problem, you will notice a small delay between the moment the job is started by the worker and it is actually canceled.

Note that this is known and correctly handled by the ProcessPool logic. Once canceled, the future will run all the subscribed callback only once and its state will be correctly set. Trying to retrieve its result will always rise a CanceledError. From the application standpoint, the APIs will behave correctly.

If stopping the worker from picking up a canceled job is critical for your application, you can add a simple logic to achieve so. You dedicate a pipe to the job and you employ the future callback to signal to the worker that the job is not to be started.

import time
from multiprocessing import Pipe

from pebble import ProcessPool


def callback(future):
    if future.canceled():
        future.cancel_signal.send('cancel')


def function(canceled):
    # check if a cancel signal was delivered
    if canceled.poll(timeout=0):
        print("Canceled job!")
        return

    print("You won't see this if canceled")

    time.sleep(5)

    print("Neither you'll see this")


def main():
    futures = []
    pool = ProcessPool(max_workers=1)

    for _ in range(10):
        read, write = Pipe()

        future = pool.schedule(function, args=[read])
        future.cancel_signal = write
        future.add_done_callback(callback)

        futures.append(future)

    time.sleep(3)

    for future in futures[2:]:
        future.cancel()

    pool.close()
    pool.join()


if __name__ == '__main__':
    main()

Keep in mind that this adds IPC complexity and might cause your application to run out of file descriptors if you schedule lots of jobs to the pool.

@robgolding
Copy link
Author

robgolding commented Jul 27, 2018

Wow, thank you @noxdafox that's really useful info. Your example actually fails for me with the following error (but I think this is due to the backported concurrent.futures module I am using):

Process Process-1:
Traceback (most recent call last):
  File "/Users/robgolding/.pyenv/versions/2.7.14/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
  File "/Users/robgolding/.pyenv/versions/2.7.14/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/robgolding/.pyenv/versions/zapier/lib/python2.7/site-packages/pebble/pool/process.py", line 390, in worker_process
    for task in worker_get_next_task(channel, params.max_tasks):
  File "/Users/robgolding/.pyenv/versions/zapier/lib/python2.7/site-packages/pebble/pool/process.py", line 405, in worker_get_next_task
    yield fetch_task(channel)
  File "/Users/robgolding/.pyenv/versions/zapier/lib/python2.7/site-packages/pebble/pool/process.py", line 411, in fetch_task
    return task_transaction(channel)
  File "/Users/robgolding/.pyenv/versions/zapier/lib/python2.7/site-packages/pebble/pool/process.py", line 420, in task_transaction
    task = channel.recv()
  File "/Users/robgolding/.pyenv/versions/zapier/lib/python2.7/site-packages/pebble/pool/channel.py", line 91, in recv
    return self.reader.recv()
TypeError: Required argument 'handle' (pos 1) not found

I'll see what I can do to get it working, but you've provided confirmation that this isn't actually a bug, so thanks again!

Edit: turns out it's a bug in Python's multiprocessing module which was fixed in Python 3: https://bugs.python.org/issue4892.

@noxdafox
Copy link
Owner

Yes that's a limitation of Python 2 I forgot about, sorry. I don't have a Python 2 dev environment at hand so could not try it.

Note that the above trick might not work also on Windows. Python multiprocessing abstracts several platform details making it hard to predict cross-platform issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants