Skip to content

Commit

Permalink
Replaced hard coded value 0 with UNBOUND_THREAD_ID to lower cognitive…
Browse files Browse the repository at this point in the history
… load. Removed some accidental complexity in _serve_bound around local_thread_id cases and commented the cases/issues
  • Loading branch information
comrumino committed Feb 17, 2023
1 parent 1cea5ba commit 64ff98d
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions rpyc/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class PingError(Exception):
pass


UNBOUND_THREAD_ID = 0 # Used when the message is being sent but the thread is not bound yet.
DEFAULT_CONFIG = dict(
# ATTRIBUTES
allow_safe_attrs=True,
Expand Down Expand Up @@ -171,7 +172,9 @@ def __init__(self, root, channel, config={}):
self._send_queue = []
self._local_root = root
self._closed = False
# Settings for bind_threads
self._bind_threads = self._config['bind_threads']
self._threads = None
if self._bind_threads:
self._lock = threading.Lock()
self._threads = {}
Expand Down Expand Up @@ -266,7 +269,7 @@ def _send(self, msg, seq, args): # IO
else:
this_thread._occupation_count -= 1
if this_thread._occupation_count == 0:
this_thread._remote_thread_id = 0
this_thread._remote_thread_id = UNBOUND_THREAD_ID
# GC might run while sending data
# if so, a BaseNetref.__del__ might be called
# BaseNetref.__del__ must call asyncreq,
Expand Down Expand Up @@ -399,7 +402,7 @@ def _dispatch(self, data): # serving---dispatch?
this_thread = self._get_thread()
this_thread._occupation_count -= 1
if this_thread._occupation_count == 0:
this_thread._remote_thread_id = 0
this_thread._remote_thread_id = UNBOUND_THREAD_ID
if msg == consts.MSG_REPLY:
obj = self._unbox(args)
self._seq_request_callback(msg, seq, False, obj)
Expand Down Expand Up @@ -554,30 +557,29 @@ def _serve_bound(self, timeout, wait_for_lock):

this = False

if local_thread_id == 0: # root request
if this_thread._occupation_count == 0: # this
this = True

else: # other
new = False

with self._lock:
for thread in self._thread_pool:
if thread._occupation_count == 0 and not thread._event.is_set():
thread._deque.append((remote_thread_id, message))
thread._event.set()
break
if local_thread_id == UNBOUND_THREAD_ID and this_thread._occupation_count != 0:
# Message is not meant for this thread. Use a thread that is not occupied or have the pool create a new one.
# TODO: reusing threads may be problematic if occupation being zero is wrong...
new = False
with self._lock:
for thread in self._thread_pool:
if thread._occupation_count == 0 and not thread._event.is_set():
thread._deque.append((remote_thread_id, message))
thread._event.set()
break

else:
new = True
else:
new = True

if new:
self._thread_pool_executor.submit(self._serve_temporary, remote_thread_id, message)
if new:
self._thread_pool_executor.submit(self._serve_temporary, remote_thread_id, message)

elif local_thread_id == this_thread.id:
elif local_thread_id in {UNBOUND_THREAD_ID, this_thread.id}:
# Of course, the message is for this thread if equal. When id is UNBOUND_THREAD_ID,
# we deduce that occupation count is 0 from the previous if condition. So, set this True.
this = True

else: # sub request
else:
# Otherwise, message was meant for another thread.
thread = self._get_thread(id=local_thread_id)
with self._lock:
thread._deque.append((remote_thread_id, message))
Expand Down Expand Up @@ -924,7 +926,7 @@ def __init__(self, id):

self.id = id

self._remote_thread_id = 0
self._remote_thread_id = UNBOUND_THREAD_ID
self._occupation_count = 0
self._event = threading.Event()
self._deque = collections.deque()

0 comments on commit 64ff98d

Please sign in to comment.