Skip to content

Commit

Permalink
Rewrote demos of async usage to display issues such as those found in #…
Browse files Browse the repository at this point in the history
  • Loading branch information
comrumino committed May 20, 2022
1 parent dbb49f2 commit 6d4cf10
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 25 deletions.
70 changes: 52 additions & 18 deletions demos/async_client/client.py
Expand Up @@ -3,31 +3,65 @@
Additional context: https://github.com/tomerfiliba-org/rpyc/issues/491#issuecomment-1131843406
"""
import rpyc
import logging
import threading
import time
import rpyc


def async_example(connection):
t0 = time.time()
print(f"Running async example...")
_async_function = rpyc.async_(connection.root.function)
res = _async_function(threading.Event())
print(f"Created async result after {time.time()-t0}s")
value = res.value
print(f"Value returned after {time.time()-t0}s: {value}")
print()
logger = rpyc.setup_logger(namespace='client')
rpyc.core.protocol.DEFAULT_CONFIG['logger'] = logger


def async_example(connection, event):
_async_function = rpyc.async_(connection.root.function) # create async proxy
# The server will call event.wait which will block this thread. To process
# the set message from the server we need a background thread. A background
# thread ensures that we have a thread that is not blocked.
#
# But wait! Since the communication is symmetric, the server side could
# be blocked if you are not careful. It needs responses from the client
#
# The perils of trying to thread a single connection...
# - the thread the receives the message from the server to wait is blocked
# - which thread is blocked is VERY hard to guarantee
#
# THIS IS NOT HE PREFERRED WAY FOR MUTABLE TYPES...
# - threading a connection might be okay to do for immutable types depending on context

bgsrv = rpyc.BgServingThread(connection)
ares = _async_function(event, block_server_thread=False)
value = ares.value
event.clear()
logger.info('Running buggy blocking example...')
ares = _async_function(event, block_server_thread=True)
value = ares.value
event.clear()
bgsrv.stop()

def synchronous_example(connection):

def how_to_block_main_thread(connection, event):
"""Example of how to block the main thread of a client"""
t0 = time.time()
print(f"Running synchronous example...")
value = connection.root.function(threading.Event())
print(f"Value returned after {time.time()-t0}s: {value}")
print()
logger.debug("Running example that blocks main thread of client...")
value = connection.root.function(event, call_set=True)
logger.debug(f"Value returned after {time.time()-t0}s: {value}")


class Event:
def __init__(self):
self._evnt = threading.Event()

def __getattr__(self, name):
if name in ('wait', 'set', 'clear'):
logging.info(f'Event.__getattr__({name})')
return getattr(self._evnt, name)


if __name__ == "__main__":
connection = rpyc.connect("localhost", 18812, config=dict(allow_public_attrs=True))
async_example(connection)
synchronous_example(connection)
logger.info('Printed from main thread')
connection = rpyc.connect("localhost", 18812, config=dict(allow_all_attrs=True))
event = Event()
async_example(connection, event)
event.clear()
# how_to_block_main_thread_example(connection, event)
32 changes: 25 additions & 7 deletions demos/async_client/server.py
Expand Up @@ -3,18 +3,36 @@
Additional context: https://github.com/tomerfiliba-org/rpyc/issues/491#issuecomment-1131843406
"""
import logging
import time
import rpyc
import threading
import time


logger = rpyc.setup_logger(namespace='server')
rpyc.core.protocol.DEFAULT_CONFIG['logger'] = logger


class Service(rpyc.Service):
def exposed_function(self, event):
threading.Thread(target=event.wait).start()
time.sleep(1)
threading.Thread(target=event.set).start()
return 'silly sleeps on server threads'
def exposed_fetch_value(self):
return self._value

def exposed_function(self, client_event, block_server_thread=False):
if block_server_thread:
# For some reason
_wait = lambda : getattr(client_event, 'wait')() # delays attr proxy behavior
_set = lambda : getattr(client_event, 'set')() # delays attr proxy behavior
else:
_wait = rpyc.async_(client_event.wait) # amortize proxy behavior
_set = rpyc.async_(client_event.set) # amortize proxy behavior
_wait()
logger.debug('Client messaged to wait for now...')
for i in (1, 2):
logger.debug(f'Pretending to do task {i}')
time.sleep(0.2)
self._value = 6465616462656566 # ''.join([hex(ord(c))[2:] for c in 'deadbeef'])
_set()
logger.debug('Client event set, it may resume...')

if __name__ == "__main__":
rpyc.ThreadedServer(Service(), hostname="localhost", port=18812).start()
rpyc.ThreadedServer(service=Service, hostname="localhost", port=18812).start()

0 comments on commit 6d4cf10

Please sign in to comment.