Skip to content

Commit

Permalink
Resolved issues around thread pool events not triggering and busy loop
Browse files Browse the repository at this point in the history
in AsyncResult
  • Loading branch information
comrumino committed Mar 19, 2023
1 parent bcab6a0 commit a5085b3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
4 changes: 2 additions & 2 deletions rpyc/core/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# Magic borrowed from helpers.py..
SERVE_INTERVAL = 0.0
SLEEP_INTERVAL = 0.1
SLEEP_INTERVAL = 0.001


class AsyncResult(object):
Expand Down Expand Up @@ -53,7 +53,7 @@ def wait(self):
# the reply for our seq is served. The callback is this class
# so __call__ sets our obj and _is_ready to true.
while not (self._is_ready or self.expired):
if self._conn.acquire_recvlock(timeout=SLEEP_INTERVAL, wait_for_lock=False):
if self._conn.acquire_recvlock(timeout=SLEEP_INTERVAL, wait_for_lock=True):
# Now that we can satisfy the precondition our _is_ready state is static...
# Serve a connection if another thread didn't already handle this results callback...
try:
Expand Down
14 changes: 10 additions & 4 deletions rpyc/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,11 @@ def _serve_bound(self, timeout, wait_for_lock):
remote_thread_id, message = this_thread._deque.popleft()
if len(this_thread._deque) == 0:
this_thread._event.clear()
elif self._recvlock._is_owned(): # enter pool
elif not self.acquire_recvlock(wait_for_lock=False): # enter pool
self._thread_pool.append(this_thread)
wait_for_event = True
else:
self.release_recvlock()

if message_available: # just process
this_thread._remote_thread_id = remote_thread_id
Expand All @@ -505,13 +507,14 @@ def _serve_bound(self, timeout, wait_for_lock):
if isinstance(exception, EOFError):
self.close() # sends close async request

self.release_recvlock()
with self._lock:
for thread in self._thread_pool:
thread._event.set()
break

raise
finally:
else:
self.release_recvlock()
else:
while True:
Expand All @@ -529,8 +532,9 @@ def _serve_bound(self, timeout, wait_for_lock):

else:
this_thread._event.clear()
if self._recvlock._is_owned(): # another thread was faster
if not self.acquire_recvlock(wait_for_lock=False): # another thread was faster
continue
self.release_recvlock()
self._thread_pool.remove(this_thread) # leave pool, as we aren't waiting
break
else: # timeout
Expand Down Expand Up @@ -730,8 +734,10 @@ def async_request(self, handler, *args, **kwargs): # serving
timeout = kwargs.pop("timeout", None)
if kwargs:
raise TypeError("got unexpected keyword argument(s) {list(kwargs.keys()}")
res = AsyncResult(self, timeout)
res = AsyncResult(self)
self._async_request(handler, args, res)
if timeout is not None:
res.set_expiry(timeout)
return res

@property
Expand Down
17 changes: 17 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import sys
import rpyc
from pathlib import Path

def load_tests(loader, standard_tests, pattern):
# Hook rpyc logger, unittest verbosity, and system paths
#rpyc.core.DEFAULT_CONFIG['logger'] = rpyc.lib.setup_logger()
rpyc_tests_path = Path(__file__).absolute().parent
rpyc_path = rpyc_tests_path.parent
for p in [str(rpyc_path), str(rpyc_tests_path)]:
if p not in sys.path:
sys.path.insert(0, p)

# Discover on tests and add paths
tests = loader.discover(start_dir=rpyc_tests_path, pattern=pattern, top_level_dir=rpyc_path)
standard_tests.addTests(tests)
return standard_tests

0 comments on commit a5085b3

Please sign in to comment.