-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
passes the generic autotests
- Loading branch information
Showing
5 changed files
with
214 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
"""Create futures resolved by polling with a provided function.""" | ||
from concurrent.futures import Executor, Future | ||
from threading import RLock, Thread, Event | ||
import logging | ||
|
||
_LOG = logging.getLogger('PollExecutor') | ||
|
||
__pdoc__ = {} | ||
__pdoc__['PollExecutor.shutdown'] = None | ||
__pdoc__['PollExecutor.map'] = None | ||
|
||
|
||
class _PollFuture(Future): | ||
def __init__(self, delegate, executor): | ||
super(_PollFuture, self).__init__() | ||
self._delegate = delegate | ||
self._executor = executor | ||
self._me_lock = RLock() | ||
self._delegate.add_done_callback(self._delegate_resolved) | ||
|
||
def _delegate_resolved(self, delegate): | ||
assert delegate is self._delegate, \ | ||
"BUG: called with %s, expected %s" % (delegate, self._delegate) | ||
|
||
if delegate.exception(): | ||
self.set_exception(delegate.exception()) | ||
else: | ||
self._executor._register_poll(self, self._delegate) | ||
|
||
def _clear_delegate(self): | ||
with self._me_lock: | ||
self._delegate = None | ||
|
||
def running(self): | ||
with self._me_lock: | ||
if self._delegate: | ||
return self._delegate.running() | ||
# If delegate is removed, we're now polling or done. | ||
return False | ||
|
||
def cancel(self): | ||
with self._me_lock: | ||
if self.cancelled(): | ||
return True | ||
if self._delegate and not self._delegate.cancel(): | ||
return False | ||
self._executor._deregister_poll(self) | ||
out = super(_PollFuture, self).cancel() | ||
if out: | ||
self.set_running_or_notify_cancel() | ||
return out | ||
|
||
|
||
class PollDescriptor(object): | ||
"""A `PollDescriptor` represents an unresolved `Future`. | ||
The poll function used by `more_executors.poll.PollExecutor` will be | ||
invoked with a list of `PollDescriptor` objects.""" | ||
|
||
def __init__(self, result, yield_result, yield_exception): | ||
"""Do not construct instances of `PollDescriptor` directly.""" | ||
|
||
self.result = result | ||
"""The result from the delegate executor's future, which should | ||
be used to drive the poll.""" | ||
|
||
self.yield_result = yield_result | ||
"""The poll function can call this function to make the future | ||
yield the given result.""" | ||
|
||
self.yield_exception = yield_exception | ||
"""The poll function can call this function to make the future | ||
raise the given exception.""" | ||
|
||
|
||
class PollExecutor(Executor): | ||
"""Instances of `PollExecutor` submit callables to a delegate `Executor` | ||
and resolve the returned futures via a provided poll function. | ||
The poll function has the following semantics: | ||
- It's called with a single argument, a list of `more_executors.poll.PollDescriptor` objects. | ||
- If the poll function can determine that a particular future should be | ||
completed, either successfully or in error, it should call the `yield_result` | ||
or `yield_exception` methods on the appropriate `PollDescriptor`. | ||
- If the poll function raises an exception, all futures depending on that poll | ||
will fail with that exception. The poll function will be retried later. | ||
- If the poll function returns an int or float, it is used as the delay | ||
in seconds until the next poll. | ||
""" | ||
|
||
def __init__(self, delegate, poll_fn, default_interval=5.0): | ||
"""Create a new executor. | ||
- `delegate`: `Executor` instance to which callables will be submitted | ||
- `poll_fn`: a polling function used to decide when futures should be resolved; | ||
see the class documentation for the required behavior from this function | ||
- `default_interval`: default interval between polls (in seconds) | ||
""" | ||
self._delegate = delegate | ||
self._default_interval = default_interval | ||
self._poll_fn = poll_fn | ||
self._poll_descriptors = [] | ||
self._poll_event = Event() | ||
self._poll_thread = Thread(name='PollExecutor', target=self._poll_loop) | ||
self._poll_thread.daemon = True | ||
self._shutdown = False | ||
self._lock = RLock() | ||
|
||
self._poll_thread.start() | ||
|
||
def submit(self, fn, *args, **kwargs): | ||
"""Submit a callable. | ||
Returns a future which will be resolved via the poll function.""" | ||
delegate_future = self._delegate.submit(fn, *args, **kwargs) | ||
out = _PollFuture(delegate_future, self) | ||
out.add_done_callback(self._deregister_poll) | ||
return out | ||
|
||
def _register_poll(self, future, delegate_future): | ||
descriptor = PollDescriptor( | ||
delegate_future.result(), | ||
future.set_result, | ||
future.set_exception) | ||
with self._lock: | ||
future._clear_delegate() | ||
self._poll_descriptors.append((future, descriptor)) | ||
self._poll_event.set() | ||
|
||
def _deregister_poll(self, future): | ||
with self._lock: | ||
self._poll_descriptors = [(f, d) | ||
for (f, d) in self._poll_descriptors | ||
if f is not future] | ||
|
||
def _run_poll_fn(self): | ||
with self._lock: | ||
descriptors = [d for (_, d) in self._poll_descriptors] | ||
self._poll_event.clear() | ||
|
||
try: | ||
return self._poll_fn(descriptors) | ||
except Exception as e: | ||
_LOG.debug("Poll function failed: %s", e) | ||
# If poll function fails, then every future | ||
# depending on the poll also immediately fails. | ||
[d.yield_exception(e) for d in descriptors] | ||
|
||
def _poll_loop(self): | ||
while not self._shutdown: | ||
_LOG.debug("Polling...") | ||
|
||
next_sleep = self._run_poll_fn() | ||
if not (isinstance(next_sleep, int) or isinstance(next_sleep, float)): | ||
next_sleep = self._default_interval | ||
|
||
_LOG.debug("Sleeping...") | ||
self._poll_event.wait(next_sleep) | ||
|
||
def shutdown(self, wait=True): | ||
self._shutdown = True | ||
self._poll_event.set() | ||
self._delegate.shutdown(wait) | ||
if wait: | ||
_LOG.debug("Join poll thread...") | ||
self._poll_thread.join() | ||
_LOG.debug("Joined poll thread.") | ||
|
||
|
||
|
||
# Example: | ||
# POST https://myservice/entity/1/publish | ||
# POST https://myservice/entity/2/publish | ||
# ... | ||
# | ||
# Return task IDs: | ||
# 100 | ||
# 101 | ||
# | ||
# Poll needs to do: | ||
# POST https://myservice/task , '{"id": {"$in": [100, 101]}}' | ||
# Response: [{"id": 100, "state": running"}, {"id": 101, "state": "complete"}] | ||
# | ||
# What info does poll need? | ||
# - The returned values of every future which has not yet been completed | ||
# - A function which can be called to set the future to completed | ||
# | ||
# poll_fn([ | ||
# | ||
# ]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters