Skip to content

Commit

Permalink
Added bind_threads parameter definition and docstrings to functions i…
Browse files Browse the repository at this point in the history
…ntroduced for bind_threads logic
  • Loading branch information
comrumino committed Oct 24, 2022
1 parent 656998f commit c86fa84
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions rpyc/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PingError(Exception):
sync_request_timeout=30,
before_closed=None,
close_catchall=False,
bind_threads=os.environ.get('RPYC_BIND_THREADS', '0') != '0',
bind_threads=os.environ.get('RPYC_BIND_THREADS') == 'true',
)
"""
The default configuration dictionary of the protocol. You can override these parameters
Expand Down Expand Up @@ -125,6 +125,9 @@ class PingError(Exception):
do no have this configuration option set.
``sync_request_timeout`` ``30`` Default timeout for waiting results
``bind_threads`` ``False`` Whether to restrict request/reply by thread (experimental).
The default value is False. Setting the environment variable
`RPYC_BIND_THREADS` to `"true"` will enable this feature.
======================================= ================ =====================================================
"""

Expand Down Expand Up @@ -407,8 +410,7 @@ def serve(self, timeout=1, wait_for_lock=True): # serving
might trigger multiple (nested) requests, thus this function may be
reentrant.
:returns: ``True`` if a request or reply were received, ``False``
otherwise.
:returns: ``True`` if a request or reply were received, ``False`` otherwise.
"""
timeout = Timeout(timeout)
if self._bind_threads:
Expand Down Expand Up @@ -445,6 +447,15 @@ def serve(self, timeout=1, wait_for_lock=True): # serving
return False

def _serve_bound(self, timeout, wait_for_lock):
"""Serves messages like `serve` with the added benefit of making request/reply thread bound.
- Experimental functionality `RPYC_BIND_THREADS`
The first 8 bytes indicate the sending thread ID and intended recipient ID. When the recipient
thread ID is not the thread that received the data, the remote thread ID and message are appended
to the intended threads `_deque` and `_event` is set.
:returns: ``True`` if a request or reply were received, ``False`` otherwise.
"""
this_thread = self._get_thread()
wait = False

Expand Down Expand Up @@ -583,6 +594,11 @@ def _serve_bound(self, timeout, wait_for_lock):
return True

def _serve_temporary(self, remote_thread_id, message):
"""Callable that is used to schedule serve as a new thread
- Experimental functionality `RPYC_BIND_THREADS`
:returns: None
"""
thread = self._get_thread()
thread._deque.append((remote_thread_id, message))
thread._event.set()
Expand All @@ -602,6 +618,11 @@ def _serve_temporary(self, remote_thread_id, message):
pass

def _get_thread(self, id=None):
"""Get internal thread information for current thread for ID, when None use current thread.
- Experimental functionality `RPYC_BIND_THREADS`
:returns: _Thread
"""
if id is None:
id = threading.get_ident()

Expand Down Expand Up @@ -891,6 +912,7 @@ def _handle_oldslicing(self, obj, attempt, fallback, start, stop, args): # requ


class _Thread:
"""Internal thread information for the RPYC protocol used for thread binding."""
def __init__(self, id):
super().__init__()

Expand Down

0 comments on commit c86fa84

Please sign in to comment.