Skip to content

Commit

Permalink
Replaced functor in favor of partial compilation of _server function …
Browse files Browse the repository at this point in the history
…to avoid potential state complications and better readability; fixed assumptions about newstyle vs oldstyle services for factory._server
  • Loading branch information
comrumino committed Dec 25, 2020
1 parent f32fbd8 commit 9e22bc8
Showing 1 changed file with 25 additions and 30 deletions.
55 changes: 25 additions & 30 deletions rpyc/utils/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import with_statement
import socket
from contextlib import closing

from functools import partial
import threading
try:
from thread import interrupt_main
Expand All @@ -19,7 +19,7 @@

from rpyc.core.channel import Channel
from rpyc.core.stream import SocketStream, TunneledSocketStream, PipeStream
from rpyc.core.service import VoidService
from rpyc.core.service import VoidService, MasterService, SlaveService
from rpyc.utils.registry import UDPRegistryClient
from rpyc.lib import safe_import, spawn
ssl = safe_import("ssl")
Expand Down Expand Up @@ -264,30 +264,25 @@ def connect_subproc(args, service=VoidService, config={}):
return conn


# callable, picklable functor
# was un-pickleable nested function
class _ServerFunctor:
"""Functor for use in connect calls"""

def __init__(self, listener, service, config, args=None):
""" Holds functor inputs """
self.listener = listener
self.service = service
self.config = config
self.args = args

def __call__(self):
""" connects to process/socket """
with closing(self.listener):
client = self.listener.accept()[0]
conn = connect_stream(SocketStream(client), service=self.service, config=self.config)
try:
if self.args is not None:
for k in self.args:
conn._local_root.exposed_namespace[k] = self.args[k]
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()
def _server(listener, remote_service, remote_config, args=None):
try:
with closing(listener):
client = listener.accept()[0]
conn = connect_stream(SocketStream(client), service=remote_service, config=remote_config)
if isinstance(args, dict):
_oldstyle = (MasterService, SlaveService)
is_newstyle = isinstance(remote_service, type) and not issubclass(remote_service, _oldstyle)
is_newstyle |= not isinstance(remote_service, type) and not isinstance(remote_service, _oldstyle)
is_voidservice = isinstance(remote_service, type) and issubclass(remote_service, VoidService)
is_voidservice |= not isinstance(remote_service, type) and isinstance(remote_service, VoidService)
if is_newstyle and not is_voidservice:
conn._local_root.exposed_namespace.update(args)
elif not is_voidservice:
conn._local_root.namespace.update(args)

conn.serve_all()
except KeyboardInterrupt:
interrupt_main()


def connect_thread(service=VoidService, config={}, remote_service=VoidService, remote_config={}):
Expand All @@ -302,8 +297,8 @@ def connect_thread(service=VoidService, config={}, remote_service=VoidService, r
listener = socket.socket()
listener.bind(("localhost", 0))
listener.listen(1)

spawn(_ServerFunctor(listener, remote_service, remote_config))
remote_server = partial(_server, listener, remote_service, remote_config)
spawn(remote_server)
host, port = listener.getsockname()
return connect(host, port, service=service, config=config)

Expand All @@ -327,8 +322,8 @@ def connect_multiprocess(service=VoidService, config={}, remote_service=VoidServ
listener = socket.socket()
listener.bind(("localhost", 0))
listener.listen(1)

t = Process(target=_ServerFunctor(listener, remote_service, remote_config, args))
remote_server = partial(_server, listener, remote_service, remote_config, args)
t = Process(target=remote_server)
t.start()
host, port = listener.getsockname()
return connect(host, port, service=service, config=config)

0 comments on commit 9e22bc8

Please sign in to comment.