Skip to content

Commit

Permalink
WIP: PollExecutor
Browse files Browse the repository at this point in the history
passes the generic autotests
  • Loading branch information
rohanpm committed Mar 13, 2018
1 parent 193b24f commit 9238a0f
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 2 deletions.
2 changes: 1 addition & 1 deletion more_executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
This documentation was built from an unknown revision.
"""

__all__ = ['map', 'retry', 'Executors']
__all__ = ['map', 'retry', 'poll', 'Executors']

from more_executors._executors import Executors
12 changes: 12 additions & 0 deletions more_executors/_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from more_executors.map import MapExecutor
from more_executors.retry import RetryExecutor, ExceptionRetryPolicy
from more_executors.poll import PollExecutor


class Executors(object):
Expand All @@ -21,6 +22,7 @@ class Executors(object):
_WRAP_METHODS = [
'with_retry',
'with_map',
'with_poll',
]

@classmethod
Expand Down Expand Up @@ -65,3 +67,13 @@ def with_map(cls, executor, fn):
- `fn`: a function used to transform each output value from the executor"""
return cls.wrap(MapExecutor(executor, fn))

@classmethod
def with_poll(cls, executor, fn):
"""Wrap an executor in a `more_executors.poll.PollExecutor`.
Submitted callables will have their output passed into the poll function.
- `fn`: a function used for polling results. See the class documentation for
expected behavior from this function."""
return cls.wrap(PollExecutor(executor, fn))
194 changes: 194 additions & 0 deletions more_executors/poll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Create futures resolved by polling with a provided function."""
from concurrent.futures import Executor, Future
from threading import RLock, Thread, Event
import logging

_LOG = logging.getLogger('PollExecutor')

__pdoc__ = {}
__pdoc__['PollExecutor.shutdown'] = None
__pdoc__['PollExecutor.map'] = None


class _PollFuture(Future):
def __init__(self, delegate, executor):
super(_PollFuture, self).__init__()
self._delegate = delegate
self._executor = executor
self._me_lock = RLock()
self._delegate.add_done_callback(self._delegate_resolved)

def _delegate_resolved(self, delegate):
assert delegate is self._delegate, \
"BUG: called with %s, expected %s" % (delegate, self._delegate)

if delegate.exception():
self.set_exception(delegate.exception())
else:
self._executor._register_poll(self, self._delegate)

def _clear_delegate(self):
with self._me_lock:
self._delegate = None

def running(self):
with self._me_lock:
if self._delegate:
return self._delegate.running()
# If delegate is removed, we're now polling or done.
return False

def cancel(self):
with self._me_lock:
if self.cancelled():
return True
if self._delegate and not self._delegate.cancel():
return False
self._executor._deregister_poll(self)
out = super(_PollFuture, self).cancel()
if out:
self.set_running_or_notify_cancel()
return out


class PollDescriptor(object):
"""A `PollDescriptor` represents an unresolved `Future`.
The poll function used by `more_executors.poll.PollExecutor` will be
invoked with a list of `PollDescriptor` objects."""

def __init__(self, result, yield_result, yield_exception):
"""Do not construct instances of `PollDescriptor` directly."""

self.result = result
"""The result from the delegate executor's future, which should
be used to drive the poll."""

self.yield_result = yield_result
"""The poll function can call this function to make the future
yield the given result."""

self.yield_exception = yield_exception
"""The poll function can call this function to make the future
raise the given exception."""


class PollExecutor(Executor):
"""Instances of `PollExecutor` submit callables to a delegate `Executor`
and resolve the returned futures via a provided poll function.
The poll function has the following semantics:
- It's called with a single argument, a list of `more_executors.poll.PollDescriptor` objects.
- If the poll function can determine that a particular future should be
completed, either successfully or in error, it should call the `yield_result`
or `yield_exception` methods on the appropriate `PollDescriptor`.
- If the poll function raises an exception, all futures depending on that poll
will fail with that exception. The poll function will be retried later.
- If the poll function returns an int or float, it is used as the delay
in seconds until the next poll.
"""

def __init__(self, delegate, poll_fn, default_interval=5.0):
"""Create a new executor.
- `delegate`: `Executor` instance to which callables will be submitted
- `poll_fn`: a polling function used to decide when futures should be resolved;
see the class documentation for the required behavior from this function
- `default_interval`: default interval between polls (in seconds)
"""
self._delegate = delegate
self._default_interval = default_interval
self._poll_fn = poll_fn
self._poll_descriptors = []
self._poll_event = Event()
self._poll_thread = Thread(name='PollExecutor', target=self._poll_loop)
self._poll_thread.daemon = True
self._shutdown = False
self._lock = RLock()

self._poll_thread.start()

def submit(self, fn, *args, **kwargs):
"""Submit a callable.
Returns a future which will be resolved via the poll function."""
delegate_future = self._delegate.submit(fn, *args, **kwargs)
out = _PollFuture(delegate_future, self)
out.add_done_callback(self._deregister_poll)
return out

def _register_poll(self, future, delegate_future):
descriptor = PollDescriptor(
delegate_future.result(),
future.set_result,
future.set_exception)
with self._lock:
future._clear_delegate()
self._poll_descriptors.append((future, descriptor))
self._poll_event.set()

def _deregister_poll(self, future):
with self._lock:
self._poll_descriptors = [(f, d)
for (f, d) in self._poll_descriptors
if f is not future]

def _run_poll_fn(self):
with self._lock:
descriptors = [d for (_, d) in self._poll_descriptors]
self._poll_event.clear()

try:
return self._poll_fn(descriptors)
except Exception as e:
_LOG.debug("Poll function failed: %s", e)
# If poll function fails, then every future
# depending on the poll also immediately fails.
[d.yield_exception(e) for d in descriptors]

def _poll_loop(self):
while not self._shutdown:
_LOG.debug("Polling...")

next_sleep = self._run_poll_fn()
if not (isinstance(next_sleep, int) or isinstance(next_sleep, float)):
next_sleep = self._default_interval

_LOG.debug("Sleeping...")
self._poll_event.wait(next_sleep)

def shutdown(self, wait=True):
self._shutdown = True
self._poll_event.set()
self._delegate.shutdown(wait)
if wait:
_LOG.debug("Join poll thread...")
self._poll_thread.join()
_LOG.debug("Joined poll thread.")



# Example:
# POST https://myservice/entity/1/publish
# POST https://myservice/entity/2/publish
# ...
#
# Return task IDs:
# 100
# 101
#
# Poll needs to do:
# POST https://myservice/task , '{"id": {"$in": [100, 101]}}'
# Response: [{"id": 100, "state": running"}, {"id": 101, "state": "complete"}]
#
# What info does poll need?
# - The returned values of every future which has not yet been completed
# - A function which can be called to set the future to completed
#
# poll_fn([
#
# ])
7 changes: 6 additions & 1 deletion tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ def retry_map_executor():
return Executors.thread_pool().with_map(lambda x: x).with_retry(RetryPolicy())


@fixture(params=['threadpool', 'retry', 'map', 'retry_map', 'map_retry'])
@fixture
def poll_executor():
return Executors.thread_pool().with_poll(lambda ds: [d.yield_result(d.result) for d in ds])


@fixture(params=['threadpool', 'retry', 'map', 'retry_map', 'map_retry', 'poll'])
def any_executor(request):
ex = request.getfixturevalue(request.param + '_executor')
yield ex
Expand Down

0 comments on commit 9238a0f

Please sign in to comment.