Skip to content

Commit

Permalink
Introduce cancel_fn to PollExecutor
Browse files Browse the repository at this point in the history
PollExecutor is designed for polling remote tasks.  It also makes
sense that it should be able to cancel those tasks, when a future
is cancelled.
  • Loading branch information
rohanpm committed Mar 16, 2018
1 parent 2621d71 commit ab9fb45
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 16 deletions.
11 changes: 7 additions & 4 deletions more_executors/_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ def with_map(cls, executor, fn):
return cls.wrap(MapExecutor(executor, fn))

@classmethod
def with_poll(cls, executor, fn):
def with_poll(cls, executor, fn, cancel_fn=None, default_interval=5.0):
"""Wrap an executor in a `more_executors.poll.PollExecutor`.
Submitted callables will have their output passed into the poll function.
See the class documentation for more information on the poll and cancel
functions.
- `fn`: a function used for polling results. See the class documentation for
expected behavior from this function."""
return cls.wrap(PollExecutor(executor, fn))
- `fn`: a function used for polling results.
- `cancel_fn`: a function called when a future is cancelled.
- `default_interval`: default interval between polls, in seconds."""
return cls.wrap(PollExecutor(executor, fn, cancel_fn, default_interval))
75 changes: 70 additions & 5 deletions more_executors/poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def cancel(self):
return True
if self._delegate and not self._delegate.cancel():
return False
if not self._executor._run_cancel_fn(self):
return False
self._executor._deregister_poll(self)
out = super(_PollFuture, self).cancel()
if out:
Expand All @@ -67,7 +69,8 @@ class PollDescriptor(object):
"""A `PollDescriptor` represents an unresolved `Future`.
The poll function used by `more_executors.poll.PollExecutor` will be
invoked with a list of `PollDescriptor` objects."""
invoked with a list of `PollDescriptor` objects.
"""

def __init__(self, result, yield_result, yield_exception):
"""Do not construct instances of `PollDescriptor` directly."""
Expand All @@ -89,6 +92,11 @@ class PollExecutor(Executor):
"""Instances of `PollExecutor` submit callables to a delegate `Executor`
and resolve the returned futures via a provided poll function.
A cancel function may also be provided to perform additional processing
when a returned future is cancelled.
### **Poll function**
The poll function has the following semantics:
- It's called with a single argument, a list of zero or more
Expand All @@ -104,6 +112,21 @@ class PollExecutor(Executor):
- If the poll function returns an int or float, it is used as the delay
in seconds until the next poll.
### **Cancel function**
The cancel function has the following semantics:
- It's called when cancel is requested on a future returned by this executor,
if and only if the future is currently in the list of futures being polled,
i.e. it will not be called if the delegate future has not yet completed.
- It's called with a single argument: the value returned by the delegate future.
- It should return `True` if the future can be cancelled.
- If the cancel function raises an exception, the future's cancel method will
return `False` and a message will be logged.
### **Example**
Consider a web service which executes tasks asynchronously, with an API like this:
Expand All @@ -112,6 +135,7 @@ class PollExecutor(Executor):
* Returns an identifier for a task, such as `{"task_id": 123}`
* To get status of a single task: `GET https://myservice/tasks/:id`
* Returns task status, such as `{"task_id": 123, "state": "finished"}`
* To cancel a single task: `DELETE https://myservice/tasks/:id`
* To search for status of multiple tasks:
`POST https://myservice/tasks/search {"task_id": [123, 456, ...]}`
* Returns array of task statuses, such as
Expand Down Expand Up @@ -163,7 +187,21 @@ def poll_tasks(poll_descriptors):
elif state == 'error':
d.yield_exception(TaskFailed())
With the poll function in place, futures could be created like this:
To ensure that calling `future.cancel()` also cancels tasks on the remote service,
we can also provide a cancel function:
def cancel_task(task):
task_id = task['task_id']
# Attempt to DELETE this task
response = requests.delete(
'https://myservice/tasks/%s' % task_id)
# Succeeded only if response was OK.
# Otherwise, we may have been too late to cancel.
return response.ok
With the poll and cancel functions in place, futures could be created like this:
def publish(object_id):
response = requests.post(
Expand All @@ -175,7 +213,7 @@ def publish(object_id):
# and poll for the returned tasks using our poll function.
executor = Executors.\\
threadpool(max_workers=4).\\
with_poll(poll_fn)
with_poll(poll_tasks, cancel_task)
futures = [executor.submit(publish, x)
for x in (10, 20, 30)]
Expand All @@ -185,17 +223,20 @@ def publish(object_id):
# ...
"""

def __init__(self, delegate, poll_fn, default_interval=5.0):
def __init__(self, delegate, poll_fn, cancel_fn=None, default_interval=5.0):
"""Create a new executor.
- `delegate`: `Executor` instance to which callables will be submitted
- `poll_fn`: a polling function used to decide when futures should be resolved;
see the class documentation for the required behavior from this function
see the class documentation for more information
- `cancel_fn`: a cancel function invoked when future cancel is required; see the
class documentation for more information
- `default_interval`: default interval between polls (in seconds)
"""
self._delegate = delegate
self._default_interval = default_interval
self._poll_fn = poll_fn
self._cancel_fn = cancel_fn
self._poll_descriptors = []
self._poll_event = Event()
self._poll_thread = Thread(name='PollExecutor', target=self._poll_loop)
Expand Down Expand Up @@ -230,6 +271,30 @@ def _deregister_poll(self, future):
for (f, d) in self._poll_descriptors
if f is not future]

def _run_cancel_fn(self, future):
if not self._cancel_fn:
# no cancel function => no veto of cancel
return True

descriptor = [d
for (f, d) in self._poll_descriptors
if f is future]
if not descriptor:
# no record of this future => no veto of cancel.
# we can get here if the future is already done
# or if polling hasn't started yet
return True

assert len(descriptor) == 1, "Too many poll descriptors for %s" % future

descriptor = descriptor[0]

try:
return self._cancel_fn(descriptor.result)
except Exception:
_LOG.exception("Exception during cancel on %s/%s", future, descriptor.result)
return False

def _run_poll_fn(self):
with self._lock:
descriptors = [d for (_, d) in self._poll_descriptors]
Expand Down
3 changes: 2 additions & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
PyHamcrest
pytest
pytest >= 3.4; python_version >= '2.7'
pytest; python_version < '2.7'
mock; python_version < '3.3'
18 changes: 17 additions & 1 deletion tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,25 @@ def retry_map_executor():
return Executors.thread_pool().with_map(lambda x: x).with_retry(RetryPolicy())


def random_cancel(_value):
"""cancel function for use with poll executor which randomly decides whether
cancel should succeed. This targets the stress test. The point here is that
the futures should still satisfy the invariants of the Future API regardless
of what the cancel function does."""
select = randint(0, 300)
if select < 100:
return True
if select < 200:
return False
raise RuntimeError('simulated error from cancel')


@fixture
def poll_executor():
return Executors.thread_pool().with_poll(lambda ds: [d.yield_result(d.result) for d in ds])
return Executors.\
thread_pool().\
with_poll(lambda ds: [d.yield_result(d.result) for d in ds],
random_cancel)


@fixture(params=['threadpool', 'retry', 'map', 'retry_map', 'map_retry', 'poll'])
Expand Down
79 changes: 74 additions & 5 deletions tests/test_poll.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from concurrent.futures import ThreadPoolExecutor
from pytest import fixture
from hamcrest import assert_that, equal_to, calling, raises, has_length
from hamcrest import assert_that, equal_to, calling, raises, has_length, has_item, contains, \
matches_regexp
from functools import partial
from six.moves.queue import Queue
from threading import Event
import sys
import logging

from more_executors.poll import PollExecutor

Expand All @@ -15,6 +18,14 @@ def executor():
return ThreadPoolExecutor()


if sys.version_info[0:1] < (2, 7):
# This python is too old for pytest's caplog,
# make a null caplog and skip that part of the test
@fixture
def caplog():
pass


def poll_tasks(tasks, poll_descriptors):
for descriptor in poll_descriptors:
task_id = descriptor.result
Expand All @@ -29,7 +40,7 @@ def test_basic_poll(executor):
task_id_queue = Queue()
tasks = {}
poll_fn = partial(poll_tasks, tasks)
poll_executor = PollExecutor(executor, poll_fn, 0.01)
poll_executor = PollExecutor(executor, poll_fn, default_interval=0.01)

def make_task(x):
return '%s-%s' % (x, task_id_queue.get(True))
Expand Down Expand Up @@ -66,6 +77,64 @@ def make_task(x):
assert_that(not futures[2].done())


def test_cancel_fn(executor, caplog):
task_id_queue = Queue()
tasks = {}
poll_fn = partial(poll_tasks, tasks)

def cancel_fn(task):
if task.startswith('cancel-true-'):
return True
if task.startswith('cancel-false-'):
return False
raise RuntimeError('simulated cancel error')

poll_executor = PollExecutor(executor, poll_fn, cancel_fn, default_interval=0.01)

def make_task(x):
got = task_id_queue.get(True)
task_id_queue.task_done()
return '%s-%s' % (x, got)

inputs = ['cancel-true', 'cancel-false', 'cancel-error']
futures = [poll_executor.submit(make_task, x) for x in inputs]

# The futures should not currently be able to progress.
assert_that(not any([f.done() for f in futures]))

# Allow tasks to be created.
task_id_queue.put('x')
task_id_queue.put('y')
task_id_queue.put('z')

# Wait until all tasks were created and futures moved
# into poll mode
task_id_queue.join()

# Wait until the make_task function definitely completed in each thread,
# which can be determined by running==False
assert_soon(lambda: assert_that(all([not f.running() for f in futures])))

# Cancel behavior should be consistent (calling multiple times same
# as calling once)
for _ in 1, 2:
# Cancelling the cancel-true task should be allowed.
assert_that(futures[0].cancel())

# Cancelling the cancel-false task should not be allowed.
assert_that(not futures[1].cancel())

# Cancelling the cancel-error task should not be allowed.
assert_that(not futures[2].cancel())

# An error should have been logged due to the cancel function raising.
if caplog:
assert_that(caplog.record_tuples, has_item(
contains('PollExecutor',
logging.ERROR,
matches_regexp(r'Exception during cancel .*/cancel-error'))))


def test_cancel_during_poll(executor):
task_ran = Event()
poll_ran = Event()
Expand All @@ -81,7 +150,7 @@ def fn():
task_ran.set()
return 123

poll_executor = PollExecutor(executor, poll_fn, 0.01)
poll_executor = PollExecutor(executor, poll_fn, default_interval=0.01)
future = poll_executor.submit(fn)

# It shouldn't finish yet.
Expand Down Expand Up @@ -135,7 +204,7 @@ def poll_fn(descriptors):
else:
descriptor.yield_exception(RuntimeError('fail'))

poll_executor = PollExecutor(executor, poll_fn, 0.01)
poll_executor = PollExecutor(executor, poll_fn, default_interval=0.01)
futures = [poll_executor.submit(lambda x: x, x) for x in ('pass', 'fail')]

# Wait until both futures move to polling mode.
Expand Down Expand Up @@ -186,7 +255,7 @@ def poll_fn(descriptors):
raise RuntimeError("simulated poll error")
return poll_tasks(tasks, descriptors)

poll_executor = PollExecutor(executor, poll_fn, 0.01)
poll_executor = PollExecutor(executor, poll_fn, default_interval=0.01)

def make_task(x):
return '%s-%s' % (x, task_id_queue.get(True))
Expand Down

0 comments on commit ab9fb45

Please sign in to comment.