Skip to content

Commit

Permalink
Reverted _is_ready to bool to remove accidental complexity; changed r…
Browse files Browse the repository at this point in the history
…ecvlock to rlock since it is used in reentrant functions; added releases as needed to adjust for reentrant acquires
  • Loading branch information
comrumino committed May 18, 2022
1 parent 29255e3 commit 234f253
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
27 changes: 16 additions & 11 deletions rpyc/core/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ class AsyncResult(object):

def __init__(self, conn):
self._conn = conn
self._is_ready = Event()
self._is_ready = False
self._is_exc = None
self._obj = None
self._callbacks = []
self._ttl = Timeout(None)

def __repr__(self):
if self._is_ready.is_set():
if self._is_ready:
state = "ready"
elif self._is_exc:
state = "error"
Expand All @@ -35,7 +35,7 @@ def __call__(self, is_exc, obj):
return
self._is_exc = is_exc
self._obj = obj
self._is_ready.set()
self._is_ready = True
for cb in self._callbacks:
cb(self)
del self._callbacks[:]
Expand All @@ -44,9 +44,14 @@ def wait(self):
"""Waits for the result to arrive. If the AsyncResult object has an
expiry set, and the result did not arrive within that timeout,
an :class:`AsyncResultTimeout` exception is raised"""
while not self._is_ready.is_set() and not self._ttl.expired():
self._conn.serve(self._ttl)
if not self._is_ready.is_set():
while not (self._is_ready or self.expired):
# Serve the connection since we are not ready. Suppose
# the reply for our seq is served. The callback is this class
# so __call__ sets our obj and _is_ready to true.
self._conn.serve(self._ttl, wait_for_lock=False)

This comment has been minimized.

Copy link
@notEvil

notEvil May 18, 2022

setting wait_for_lock to False means that the thread is in a busy loop if another thread is holding the recv lock.

This comment has been minimized.

Copy link
@comrumino

comrumino May 18, 2022

Author Collaborator

Ah thank for pointing this out. It is cruft from when I was testing and observing.


# Check if we timed out before result was ready
if not self._is_ready:
raise AsyncResultTimeout("result expired")

def add_callback(self, func):
Expand All @@ -57,7 +62,7 @@ def add_callback(self, func):
:param func: the callback function to add
"""
if self._is_ready.is_set():
if self._is_ready:
func(self)
else:
self._callbacks.append(func)
Expand All @@ -73,12 +78,12 @@ def set_expiry(self, timeout):
@property
def ready(self):
"""Indicates whether the result has arrived"""
if self._is_ready.is_set():
if self._is_ready:
return True
if self._ttl.expired():
if self.expired:
return False
self._conn.poll_all()
return self._is_ready.is_set()
return self._is_ready

@property
def error(self):
Expand All @@ -88,7 +93,7 @@ def error(self):
@property
def expired(self):
"""Indicates whether the AsyncResult has expired"""
return not self._is_ready.is_set() and self._ttl.expired()
return not self._is_ready and self._ttl.expired()

@property
def value(self):
Expand Down
23 changes: 18 additions & 5 deletions rpyc/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time # noqa: F401
import gc # noqa: F401

from threading import Lock, Condition
from threading import Lock, Condition, RLock
from rpyc.lib import spawn, Timeout, get_methods, get_id_pack
from rpyc.lib.compat import pickle, next, maxint, select_error, acquire_lock # noqa: F401
from rpyc.lib.colls import WeakValueDict, RefCountingColl
Expand Down Expand Up @@ -145,7 +145,7 @@ def __init__(self, root, channel, config={}):
self._HANDLERS = self._request_handlers()
self._channel = channel
self._seqcounter = itertools.count()
self._recvlock = Lock()
self._recvlock = RLock()
self._sendlock = Lock()
self._recv_event = Condition()
self._request_callbacks = {}
Expand Down Expand Up @@ -380,14 +380,20 @@ def serve(self, timeout=1, wait_for_lock=True): # serving
"""
timeout = Timeout(timeout)
with self._recv_event:
# Exit early if we cannot acquire the recvlock
if not self._recvlock.acquire(False):
return wait_for_lock and self._recv_event.wait(timeout.timeleft())
if wait_for_lock:
# Wait condition for recvlock release; recvlock is not underlying lock for condition
return self._recv_event.wait(timeout.timeleft())
else:
return False
# Assume the receive rlock is acquired and incremented
try:
data = self._channel.poll(timeout) and self._channel.recv()
if not data:
return False
except EOFError:
self.close()
self.close() # sends close async request
raise
finally:
self._recvlock.release()
Expand Down Expand Up @@ -470,7 +476,14 @@ def sync_request(self, handler, *args): # serving
:returns: the result of the request
"""
timeout = self._config["sync_request_timeout"]
return self.async_request(handler, *args, timeout=timeout).value
# The recv rlock is acquired prior to invoking the request.
# AsyncResult will be constructed and it is possible GIL switches
# threads before AsyncResult.wait is invoked. So, using an rlock
# we acquire once here and once inside of wait which invokes serve
self._recvlock.acquire()
value = self.async_request(handler, *args, timeout=timeout).value
self._recvlock.release()
return value

def _async_request(self, handler, args=(), callback=(lambda a, b: None)): # serving
seq = self._get_seq_id()
Expand Down

0 comments on commit 234f253

Please sign in to comment.