diff --git a/rpyc/core/protocol.py b/rpyc/core/protocol.py index 9854d87a..0bfcc4b7 100644 --- a/rpyc/core/protocol.py +++ b/rpyc/core/protocol.py @@ -153,6 +153,7 @@ def __init__(self, service, channel, config = {}, _lazy = False): self._proxy_cache = WeakValueDict() self._netref_classes_cache = {} self._remote_root = None + self._send_queue = [] self._local_root = service(weakref.proxy(self)) if not _lazy: self._init_service() @@ -241,15 +242,30 @@ def _send(self, msg, seq, args): # if so, a BaseNetref.__del__ might be called # BaseNetref.__del__ must call asyncreq, # which will cause a deadlock - is_gc_enabled = gc.isenabled() - gc.disable() - self._sendlock.acquire() - try: - self._channel.send(data) - finally: - self._sendlock.release() - if is_gc_enabled: - gc.enable() + # Solution: + # Add the current request to a queue and let the thread that currently + # holds the sendlock send it when it's done with its current job. + # NOTE: Atomic list operations should be thread safe, + # please call me out if they are not on all implementations! + self._send_queue.append(data) + # It is crucial to check the queue each time AFTER releasing the lock: + while self._send_queue: + if not self._sendlock.acquire(False): + # Another thread holds the lock. It will send the data after + # it's done with its current job. We can safely return. + return + try: + # Can happen if another consumer was scheduled in between + # `while` and `acquire`: + if not self._send_queue: + # Must `continue` to ensure that `send_queue` is checked + # after releasing the lock! (in case another producer is + # scheduled before `release`) + continue + data = self._send_queue.pop(0) + self._channel.send(data) + finally: + self._sendlock.release() def _send_request(self, seq, handler, args): self._send(consts.MSG_REQUEST, seq, (handler, self._box(args)))