diff --git a/rpyc/core/async_.py b/rpyc/core/async_.py index 07d819b7..336c8bbe 100644 --- a/rpyc/core/async_.py +++ b/rpyc/core/async_.py @@ -6,7 +6,7 @@ # Magic borrowed from helpers.py.. SERVE_INTERVAL = 0.0 -SLEEP_INTERVAL = 0.1 +SLEEP_INTERVAL = 0.001 class AsyncResult(object): @@ -53,7 +53,7 @@ def wait(self): # the reply for our seq is served. The callback is this class # so __call__ sets our obj and _is_ready to true. while not (self._is_ready or self.expired): - if self._conn.acquire_recvlock(timeout=SLEEP_INTERVAL, wait_for_lock=False): + if self._conn.acquire_recvlock(timeout=SLEEP_INTERVAL, wait_for_lock=True): # Now that we can satisfy the precondition our _is_ready state is static... # Serve a connection if another thread didn't already handle this results callback... try: diff --git a/rpyc/core/protocol.py b/rpyc/core/protocol.py index 2b317c4f..a7a243cf 100644 --- a/rpyc/core/protocol.py +++ b/rpyc/core/protocol.py @@ -487,9 +487,11 @@ def _serve_bound(self, timeout, wait_for_lock): remote_thread_id, message = this_thread._deque.popleft() if len(this_thread._deque) == 0: this_thread._event.clear() - elif self._recvlock._is_owned(): # enter pool + elif not self.acquire_recvlock(wait_for_lock=False): # enter pool self._thread_pool.append(this_thread) wait_for_event = True + else: + self.release_recvlock() if message_available: # just process this_thread._remote_thread_id = remote_thread_id @@ -505,13 +507,14 @@ def _serve_bound(self, timeout, wait_for_lock): if isinstance(exception, EOFError): self.close() # sends close async request + self.release_recvlock() with self._lock: for thread in self._thread_pool: thread._event.set() break raise - finally: + else: self.release_recvlock() else: while True: @@ -529,8 +532,9 @@ def _serve_bound(self, timeout, wait_for_lock): else: this_thread._event.clear() - if self._recvlock._is_owned(): # another thread was faster + if not self.acquire_recvlock(wait_for_lock=False): # another thread was faster continue + self.release_recvlock() self._thread_pool.remove(this_thread) # leave pool, as we aren't waiting break else: # timeout @@ -730,8 +734,10 @@ def async_request(self, handler, *args, **kwargs): # serving timeout = kwargs.pop("timeout", None) if kwargs: raise TypeError("got unexpected keyword argument(s) {list(kwargs.keys()}") - res = AsyncResult(self, timeout) + res = AsyncResult(self) self._async_request(handler, args, res) + if timeout is not None: + res.set_expiry(timeout) return res @property diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..9d9b39e6 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,17 @@ +import sys +import rpyc +from pathlib import Path + +def load_tests(loader, standard_tests, pattern): + # Hook rpyc logger, unittest verbosity, and system paths + #rpyc.core.DEFAULT_CONFIG['logger'] = rpyc.lib.setup_logger() + rpyc_tests_path = Path(__file__).absolute().parent + rpyc_path = rpyc_tests_path.parent + for p in [str(rpyc_path), str(rpyc_tests_path)]: + if p not in sys.path: + sys.path.insert(0, p) + + # Discover on tests and add paths + tests = loader.discover(start_dir=rpyc_tests_path, pattern=pattern, top_level_dir=rpyc_path) + standard_tests.addTests(tests) + return standard_tests