Skip to content

Commit

Permalink
Merge pull request #412 from dand-oss/master
Browse files Browse the repository at this point in the history
allow connect_multiprocess() and connect_thread() to pickle
  • Loading branch information
comrumino committed Dec 25, 2020
2 parents 28be845 + 2191913 commit f32fbd8
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions rpyc/utils/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,32 @@ def connect_subproc(args, service=VoidService, config={}):
return conn


# callable, picklable functor
# was un-pickleable nested function
class _ServerFunctor:
"""Functor for use in connect calls"""

def __init__(self, listener, service, config, args=None):
""" Holds functor inputs """
self.listener = listener
self.service = service
self.config = config
self.args = args

def __call__(self):
""" connects to process/socket """
with closing(self.listener):
client = self.listener.accept()[0]
conn = connect_stream(SocketStream(client), service=self.service, config=self.config)
try:
if self.args is not None:
for k in self.args:
conn._local_root.exposed_namespace[k] = self.args[k]
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()


def connect_thread(service=VoidService, config={}, remote_service=VoidService, remote_config={}):
"""starts an rpyc server on a new thread, bound to an arbitrary port,
and connects to it over a socket.
Expand All @@ -277,17 +303,7 @@ def connect_thread(service=VoidService, config={}, remote_service=VoidService, r
listener.bind(("localhost", 0))
listener.listen(1)

def server(listener=listener):
with closing(listener):
client = listener.accept()[0]
conn = connect_stream(SocketStream(client), service=remote_service,
config=remote_config)
try:
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()

spawn(server)
spawn(_ServerFunctor(listener, remote_service, remote_config))
host, port = listener.getsockname()
return connect(host, port, service=service, config=config)

Expand All @@ -312,18 +328,7 @@ def connect_multiprocess(service=VoidService, config={}, remote_service=VoidServ
listener.bind(("localhost", 0))
listener.listen(1)

def server(listener=listener, args=args):
with closing(listener):
client = listener.accept()[0]
conn = connect_stream(SocketStream(client), service=remote_service, config=remote_config)
try:
for k in args:
conn._local_root.exposed_namespace[k] = args[k]
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()

t = Process(target=server)
t = Process(target=_ServerFunctor(listener, remote_service, remote_config, args))
t.start()
host, port = listener.getsockname()
return connect(host, port, service=service, config=config)

0 comments on commit f32fbd8

Please sign in to comment.