Skip to content

Commit

Permalink
Moved AsyncResulting logic back into Connection and ...
Browse files Browse the repository at this point in the history
- replaced waiting with exposing recvlock
- deprecated recv_event b/c it didn't do much of anything
- split out some of serve_bound into _dispatch_to_bound_thread
- replaced _receiving with _recvlock._is_owned as it is slightly more
  robust
  • Loading branch information
comrumino committed Mar 18, 2023
1 parent 99c5abe commit bcab6a0
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 148 deletions.
32 changes: 21 additions & 11 deletions rpyc/core/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
from rpyc.lib.compat import TimeoutError as AsyncResultTimeout


# Magic borrowed from helpers.py..
SERVE_INTERVAL = 0.0
SLEEP_INTERVAL = 0.1


class AsyncResult(object):
"""*AsyncResult* represents a computation that occurs in the background and
will eventually have a result. Use the :attr:`value` property to access the
result (which will block if the result has not yet arrived).
"""
__slots__ = ["_conn", "_is_ready", "_is_exc", "_callbacks", "_obj", "_ttl"]

def __init__(self, conn):
def __init__(self, conn, timeout=None):
self._conn = conn
self._is_ready = False
self._is_exc = None
self._obj = None
self._callbacks = []
self._ttl = Timeout(None)
self._ttl = Timeout.lazy_init(timeout)

def __repr__(self):
if self._is_ready:
Expand All @@ -44,19 +49,24 @@ def wait(self):
"""Waits for the result to arrive. If the AsyncResult object has an
expiry set, and the result did not arrive within that timeout,
an :class:`AsyncResultTimeout` exception is raised"""
while self._waiting():
# Serve the connection since we are not ready. Suppose
# the reply for our seq is served. The callback is this class
# so __call__ sets our obj and _is_ready to true.
self._conn.serve(self._ttl, waiting=self._waiting)
# Serve the connection since we are not ready. Suppose
# the reply for our seq is served. The callback is this class
# so __call__ sets our obj and _is_ready to true.
while not (self._is_ready or self.expired):
if self._conn.acquire_recvlock(timeout=SLEEP_INTERVAL, wait_for_lock=False):
# Now that we can satisfy the precondition our _is_ready state is static...
# Serve a connection if another thread didn't already handle this results callback...
try:
if not self._is_ready:
# Who cares if it expired... one last attempt... worst case timeleft is 0
self._conn.serve(self._ttl)
finally:
self._conn.release_recvlock()

# Check if we timed out before result was ready
if not self._is_ready:
raise AsyncResultTimeout("result expired")

def _waiting(self):
return not (self._is_ready or self.expired)

def add_callback(self, func):
"""Adds a callback to be invoked when the result arrives. The callback
function takes a single argument, which is the current AsyncResult
Expand All @@ -76,7 +86,7 @@ def set_expiry(self, timeout):
:param timeout: the expiry time in seconds or ``None``
"""
self._ttl = Timeout(timeout)
self._ttl = Timeout.lazy_init(timeout)

@property
def ready(self):
Expand Down

0 comments on commit bcab6a0

Please sign in to comment.