Skip to content

Commit

Permalink
Added demos/sharing client and server to provide a concrete example o…
Browse files Browse the repository at this point in the history
…f how shared state between connections has some perils when using ThreadedServer. The README.md explains how to toggle between safe and unsafe behavior
  • Loading branch information
comrumino committed Dec 27, 2020
1 parent 53d450a commit c6c896d
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 0 deletions.
2 changes: 2 additions & 0 deletions demos/sharing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Shared State Server
When using the `ThreadedServer`, you may wish to share data between clients. Although Python does not support genuine concurrency, an application using the `ThreadedServer` provides a concurrency illusion which still needs to be thread-safe (i.e. invariants are upheld, no data-races, etc.). The `sharing/server.py` defines a constant `THREAD_SAFE` that allows you to toggle between safe and unsafe function calls.
99 changes: 99 additions & 0 deletions demos/sharing/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python3
import time
import signal
import multiprocessing as mp
import pdb # noqa
import rpyc
import traceback


def echo_once():
start = time.time()
conn = rpyc.connect("localhost", 18861, config={"sync_request_timeout": None})
cdelta = time.time() - start
addr, port = conn._channel.stream.sock.getsockname()
fileno = conn.fileno()
start = time.time()
conn.root.echo("Echo")
edelta = time.time() - start
conn.close()
return cdelta, edelta, fileno, addr, port


def echo_forever(main_event):
try:
count = 0
edelta = 0
cdelta = 0
_max = {'edelta': 0, 'cdelta': 0}
fileno = "unknown"
addr = "unknown"
port = "unknown"
cdelta = -1
edelta = -1
while main_event.is_set():
count += 1
cdelta, edelta, fileno, addr, port = echo_once()
_max['cdelta'] = cdelta
_max['edelta'] = edelta
except KeyboardInterrupt:
if main_event.is_set():
main_event.clear()
except Exception:
tb = f"EXCEPT ('{addr}', {port}) with fd {fileno} over cdelta {cdelta} and delta {edelta}\n"
tb += traceback.format_exc()

return None, tb
finally:
return _max, None


def echo_client_pool(client_limit):
try:
sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = mp.Pool(processes=client_limit)
signal.signal(signal.SIGINT, sigint)
eid_proc = {}
pool_manager = mp.Manager()
main_event = pool_manager.Event()
main_event.set()
for eid in range(client_limit):
eid_proc[eid] = pool.apply_async(func=echo_forever, args=(main_event,))
while True:
alive = len([r for r in eid_proc.values() if not r.ready()])
print('{0}/{1} alive'.format(alive, client_limit))
if alive == 1:
print('All of the client processes are dead except one. Exiting loop...')
break
else:
time.sleep(1)
res = [r.get() for r in eid_proc.values() if r.ready()]
cdelta = [_max['cdelta'] for _max, tb in res if _max]
edelta = [_max['edelta'] for _max, tb in res if _max]
if cdelta:
cdelta = max(cdelta)
else:
cdelta = "unknown"
if edelta:
edelta = max(edelta)
else:
edelta = "unknown"
time.sleep(1)
print(f"Max time to establish: {cdelta}")
print(f"Max time echo reply: {edelta}")
main_event.clear()
except KeyboardInterrupt:
main_event.clear()
for proc in eid_proc.values():
proc.terminate()


def main(client_limit):
if client_limit == 1:
echo_once()
else:
echo_client_pool(client_limit)


if __name__ == "__main__":
main(client_limit=5)
75 changes: 75 additions & 0 deletions demos/sharing/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
import logging
import functools
import rpyc
import threading
import random
import time


THREAD_SAFE = True # Toggles thread safe and unsafe behavior


def synchronize(lock):
""" Decorator that invokes the lock acquire call before a function call and releases after """
def sync_func(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock.acquire()
res = func(*args, **kwargs)
lock.release()
return res
return wrapper
return sync_func


class SharingComponent(object):
""" Initialized in the class definition of SharingService and shared by all instances of SharingService """
lock = threading.Lock()

This comment has been minimized.

Copy link
@nnrepos

nnrepos Dec 30, 2020

why is the lock outside of __init__? What if I instantiated multiple servers?

This comment has been minimized.

Copy link
@comrumino

comrumino Jan 11, 2021

Author Collaborator

Suppose we move the definition of lock to __init__. How would we rewrite the decorator line? Trying to do @synchronize(self.lock) would result in a NameError being thrown.

Of course, this example has a short coming in the scenario that you mentioned. The other question at hand is, does extending the demo to support multiple servers within one interpreter better serve the demo? At some point, any developer using RPyC will have to consider their software design choices independent of a primitive demo. The sharing component would be easy enough to move into a factory, but I'm not convinced the added complexity helps make for a clear demo.

I'm open to suggestions on a better design choice for lock initialization.


def __init__(self):
self.sequence_id = 0

def sleepy_sequence_id(self):
""" increment id and sometimes sleep to force race condition """
self.sequence_id += 1
_expected_sequence_id = self.sequence_id
if random.randint(0, 1) == 1:
time.sleep(1)
if self.sequence_id == _expected_sequence_id:
return self.sequence_id
else:
raise RuntimeError("Unexpected sequence_id behavior (race condition).")

@synchronize(lock)
def get_sequence_id(self):
""" provides a thread-safe execution frame to otherwise unsafe functions """
return self.sleepy_sequence_id()


class SharingService(rpyc.Service):
""" A class that allows for sharing components between connection instances """
__shared__ = SharingComponent()

@property
def shared(self):
""" convenient access to an otherwise long object name """
return SharingService.__shared__

def exposed_echo(self, message):
""" example of the potential perils when threading shared state """
if THREAD_SAFE:
seq_id = self.shared.get_sequence_id()
else:
seq_id = self.shared.sleepy_sequence_id()
if message == "Echo":
return f"Echo Reply {seq_id}"
else:
return f"Parameter Problem {seq_id}"


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
debugging_config = {'allow_all_attrs': True, 'sync_request_timeout': None}
echo_svc = rpyc.ThreadedServer(service=SharingService, port=18861, protocol_config=debugging_config)
echo_svc.start()

0 comments on commit c6c896d

Please sign in to comment.