Skip to content
This repository has been archived by the owner on Oct 16, 2021. It is now read-only.

Commit

Permalink
Merge branch 'twin-cleanup' (closes issue #17)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxfischer2781 committed Apr 25, 2018
2 parents a7b8b20 + d3079bd commit 04f0adb
Show file tree
Hide file tree
Showing 36 changed files with 553 additions and 565 deletions.
3 changes: 3 additions & 0 deletions .readthedocs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
python:
version: 3
setup_py_install: true
8 changes: 4 additions & 4 deletions cpy2py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@

from cpy2py.meta import __version__
from cpy2py.utility.compat import NullHandler as _NullHandler
from cpy2py.proxy.proxy_object import TwinObject, localmethod
from cpy2py.proxy.proxy_func import twinfunction
from cpy2py.twinterpreter.twin_master import TwinMaster
from cpy2py.kernel import kernel_state
from cpy2py.proxy.baseclass import TwinObject, localmethod
from cpy2py.proxy.function import twinfunction
from cpy2py.twinterpreter.master import TwinMaster
from cpy2py.kernel import state as kernel_state


_base_logger = _logging.getLogger('__cpy2py__')
Expand Down
6 changes: 3 additions & 3 deletions cpy2py/ipyc/ipyc_socket.py → cpy2py/ipyc/duplex_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from cpy2py.utility.compat import BytesFile
from cpy2py.utility.utils import random_str
from cpy2py.kernel import kernel_state
from cpy2py.kernel import state


class DuplexSocketIPyC(object):
Expand Down Expand Up @@ -62,7 +62,7 @@ def open(self):
self.client_socket, client_address = self.server_socket.accept()
self._logger.warning(
'<%s> [%s] accepted from %r',
kernel_state.TWIN_ID,
state.TWIN_ID,
self.__class__.__name__,
client_address
)
Expand All @@ -71,7 +71,7 @@ def open(self):
self.client_socket.connect(self.address)
self._logger.warning(
'<%s> [%s] connected from %r',
kernel_state.TWIN_ID,
state.TWIN_ID,
self.__class__.__name__,
self.client_socket.getsockname()
)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import threading

from cpy2py.utility.exceptions import format_exception
from cpy2py.ipyc import ipyc_exceptions
from cpy2py.kernel import kernel_state
from cpy2py.kernel.kernel_single import SingleThreadKernelClient, SingleThreadKernelServer
from cpy2py.ipyc import exceptions
from cpy2py.kernel import state
from cpy2py.kernel.flavours.single import SingleThreadKernelClient, SingleThreadKernelServer


class AsyncKernelServer(SingleThreadKernelServer):
Expand All @@ -45,7 +45,7 @@ def __init__(self, peer_id, ipyc, pickle_protocol=2):
def _serve_requests(self):
while not self._terminate.is_set():
if __debug__:
self._logger.warning('<%s> [%s] Server Listening', kernel_state.TWIN_ID, self.peer_id)
self._logger.warning('<%s> [%s] Server Listening', state.TWIN_ID, self.peer_id)
request_id, directive = self._server_recv()
if self._except_callback is not None:
raise self._except_callback # pylint: disable=raising-bad-type
Expand Down Expand Up @@ -80,14 +80,14 @@ def _digest_replies(self):
try:
while not self._terminate.is_set():
if __debug__:
self._logger.warning('<%s> [%s] Client Listening', kernel_state.TWIN_ID, self.peer_id)
self._logger.warning('<%s> [%s] Client Listening', state.TWIN_ID, self.peer_id)
request_id, reply_body = self._client_recv()
request = self._requests.pop(request_id)
request[1] = reply_body
request[0].set()
del request_id, reply_body, request
except (ipyc_exceptions.IPyCTerminated, EOFError, ValueError):
self._logger.warning('<%s> [%s] Client Released', kernel_state.TWIN_ID, self.peer_id)
except (exceptions.IPyCTerminated, EOFError, ValueError):
self._logger.warning('<%s> [%s] Client Released', state.TWIN_ID, self.peer_id)
self.stop_local()
except Exception as err: # pylint: disable=broad-except
# DEBUG: sometimes, request_id raises KeyError even though it's in _requests - MF@20160518
Expand All @@ -96,7 +96,7 @@ def _digest_replies(self):
for key in self._requests:
self._logger.critical('Await : %r (%s)', key, type(key))
self._logger.critical(
'<%s> [%s] TWIN KERNEL INTERNAL EXCEPTION: %s', kernel_state.TWIN_ID, self.peer_id, err
'<%s> [%s] TWIN KERNEL INTERNAL EXCEPTION: %s', state.TWIN_ID, self.peer_id, err
)
format_exception(self._logger, 3)
raise
Expand Down
54 changes: 27 additions & 27 deletions cpy2py/kernel/kernel_single.py → cpy2py/kernel/flavours/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@
import logging
import threading

from cpy2py.kernel import kernel_state
from cpy2py.kernel import state

from cpy2py.utility.exceptions import format_exception
from cpy2py.ipyc import ipyc_exceptions
from cpy2py.kernel.kernel_exceptions import StopTwinterpreter
from cpy2py.proxy import proxy_tracker
from cpy2py.kernel.kernel_requesthandler import RequestDispatcher, RequestHandler
from cpy2py.ipyc import exceptions
from cpy2py.kernel.exceptions import StopTwinterpreter
from cpy2py.proxy import tracker
from cpy2py.kernel.requesthandler import RequestDispatcher, RequestHandler


def _connect_ipyc(ipyc, pickle_protocol):
"""Connect pickle/unpickle trackers to a duplex IPyC"""
pickler = proxy_tracker.twin_pickler(ipyc.writer, pickle_protocol)
pickler = tracker.twin_pickler(ipyc.writer, pickle_protocol)
send = pickler.dump
unpickler = proxy_tracker.twin_unpickler(ipyc.reader)
unpickler = tracker.twin_unpickler(ipyc.reader)
recv = unpickler.load
return send, recv

Expand All @@ -56,12 +56,12 @@ class SingleThreadKernelServer(object):
:type pickle_protocol: int
"""
def __new__(cls, peer_id, *args, **kwargs): # pylint: disable=unused-argument
assert peer_id not in kernel_state.KERNEL_SERVERS, 'Twinterpreters must have unique IDs'
kernel_state.KERNEL_SERVERS[peer_id] = object.__new__(cls)
return kernel_state.KERNEL_SERVERS[peer_id]
assert peer_id not in state.KERNEL_SERVERS, 'Twinterpreters must have unique IDs'
state.KERNEL_SERVERS[peer_id] = object.__new__(cls)
return state.KERNEL_SERVERS[peer_id]

def __init__(self, peer_id, ipyc, pickle_protocol=2):
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.server' % (kernel_state.TWIN_ID, peer_id))
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.server' % (state.TWIN_ID, peer_id))
self.peer_id = peer_id
self._ipyc = ipyc
self._ipyc.open()
Expand All @@ -80,42 +80,42 @@ def run(self):
self._terminate.clear()
exit_code = 1
self._logger.warning(
'<%s> [%s] Starting %s @ %s', kernel_state.TWIN_ID, self.peer_id, self.__class__.__name__, time.asctime()
'<%s> [%s] Starting %s @ %s', state.TWIN_ID, self.peer_id, self.__class__.__name__, time.asctime()
)
try:
self._serve_requests()
except StopTwinterpreter as err:
# actively shutting down
self._logger.critical('<%s> [%s] TWIN KERNEL TERMINATED: %s', kernel_state.TWIN_ID, self.peer_id, err)
self._logger.critical('<%s> [%s] TWIN KERNEL TERMINATED: %s', state.TWIN_ID, self.peer_id, err)
exit_code = err.exit_code
# cPickle may raise EOFError by itself
except (ipyc_exceptions.IPyCTerminated, EOFError) as err:
except (exceptions.IPyCTerminated, EOFError) as err:
# regular shutdown by master
self._logger.critical('<%s> [%s] TWIN KERNEL RELEASED: %s', kernel_state.TWIN_ID, self.peer_id, err)
self._logger.critical('<%s> [%s] TWIN KERNEL RELEASED: %s', state.TWIN_ID, self.peer_id, err)
exit_code = 0
except Exception as err: # pylint: disable=broad-except
# unexpected shutdown
# provide extended traceback if requested
self._logger.critical(
'<%s> [%s] TWIN KERNEL INTERNAL EXCEPTION: %s', kernel_state.TWIN_ID, self.peer_id, err
'<%s> [%s] TWIN KERNEL INTERNAL EXCEPTION: %s', state.TWIN_ID, self.peer_id, err
)
format_exception(self._logger, 3)
# emulate regular python exit
import traceback
exit_code = 1
traceback.print_exc(file=sys.stderr)
print('TwinError: unhandled exception in', kernel_state.TWIN_ID, file=sys.stderr)
print('TwinError: unhandled exception in', state.TWIN_ID, file=sys.stderr)
finally:
self._terminate.set()
self._logger.critical('<%s> [%s] TWIN KERNEL SHUTDOWN: %d', kernel_state.TWIN_ID, self.peer_id, exit_code)
self._logger.critical('<%s> [%s] TWIN KERNEL SHUTDOWN: %d', state.TWIN_ID, self.peer_id, exit_code)
self._ipyc.close()
del kernel_state.KERNEL_SERVERS[self.peer_id]
del state.KERNEL_SERVERS[self.peer_id]
return exit_code

def _serve_requests(self):
while not self._terminate.is_set():
if __debug__:
self._logger.warning('<%s> [%s] Server Listening', kernel_state.TWIN_ID, self.peer_id)
self._logger.warning('<%s> [%s] Server Listening', state.TWIN_ID, self.peer_id)
request_id, directive = self._server_recv()
self.request_handler.serve_request(request_id, directive)

Expand Down Expand Up @@ -144,19 +144,19 @@ class SingleThreadKernelClient(object):
:type pickle_protocol: int
"""
def __new__(cls, peer_id, *args, **kwargs): # pylint: disable=unused-argument
assert peer_id not in kernel_state.KERNEL_CLIENTS, 'Twinterpreters must have unique IDs'
kernel_state.KERNEL_CLIENTS[peer_id] = object.__new__(cls)
return kernel_state.KERNEL_CLIENTS[peer_id]
assert peer_id not in state.KERNEL_CLIENTS, 'Twinterpreters must have unique IDs'
state.KERNEL_CLIENTS[peer_id] = object.__new__(cls)
return state.KERNEL_CLIENTS[peer_id]

def __init__(self, peer_id, ipyc, pickle_protocol=2):
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.client' % (kernel_state.TWIN_ID, peer_id))
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.client' % (state.TWIN_ID, peer_id))
self.peer_id = peer_id
# communication
self._ipyc = ipyc
self._ipyc.open()
self._client_send, self._client_recv = _connect_ipyc(ipyc, pickle_protocol)
self.request_dispatcher = RequestDispatcher(peer_id=self.peer_id, kernel_client=self)
kernel_state.KERNEL_INTERFACE[peer_id] = self.request_dispatcher
state.KERNEL_INTERFACE[peer_id] = self.request_dispatcher

def run_request(self, request_body):
my_id = threading.current_thread().ident
Expand All @@ -179,8 +179,8 @@ def stop_local(self):
"""Shutdown the local server"""
self._ipyc.close()
try:
del kernel_state.KERNEL_CLIENTS[self.peer_id]
del kernel_state.KERNEL_INTERFACE[self.peer_id]
del state.KERNEL_CLIENTS[self.peer_id]
del state.KERNEL_INTERFACE[self.peer_id]
except KeyError:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import random

from cpy2py.utility.thread_tools import FifoQueue, ItemError, ThreadGuard
from .kernel_async import AsyncKernelClient, AsyncKernelServer
from cpy2py.kernel.flavours.async import AsyncKernelClient, AsyncKernelServer


class MultiThreadKernelServer(AsyncKernelServer):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# - # limitations under the License.
import logging

from cpy2py.kernel import kernel_state
from cpy2py.ipyc import ipyc_exceptions
from cpy2py.kernel import state
from cpy2py.ipyc import exceptions
from cpy2py.utility.exceptions import format_exception, CPy2PyException
from cpy2py.kernel.kernel_exceptions import StopTwinterpreter, TwinterpeterTerminated
from cpy2py.kernel.exceptions import StopTwinterpreter, TwinterpeterTerminated


# Message Enums
Expand Down Expand Up @@ -70,7 +70,7 @@ class RequestHandler(object):
:type kernel_server: :py:class:`~cpy2py.kernel.kernel_single.SingleThreadKernelServer`
"""
def __init__(self, peer_id, kernel_server):
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.handler' % (kernel_state.TWIN_ID, peer_id))
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.handler' % (state.TWIN_ID, peer_id))
self.peer_id = peer_id
self.kernel_server = kernel_server
# instance => ref_count
Expand Down Expand Up @@ -100,7 +100,7 @@ def serve_request(self, request_id, directive):
try:
if __debug__:
self._logger.warning(
'<%s> [%s] Directive %s', kernel_state.TWIN_ID, self.peer_id, E_SYMBOL[directive_type]
'<%s> [%s] Directive %s', state.TWIN_ID, self.peer_id, E_SYMBOL[directive_type]
)
response = directive_method(directive_body)
# catch internal errors to reraise them
Expand All @@ -109,7 +109,7 @@ def serve_request(self, request_id, directive):
# send everything else back to calling scope
except Exception as err: # pylint: disable=broad-except
self.kernel_server.send_reply(request_id, (__E_EXCEPTION__, err))
self._logger.critical('<%s> [%s] TWIN KERNEL PAYLOAD EXCEPTION', kernel_state.TWIN_ID, self.peer_id)
self._logger.critical('<%s> [%s] TWIN KERNEL PAYLOAD EXCEPTION', state.TWIN_ID, self.peer_id)
format_exception(self._logger, 3)
if isinstance(err, (KeyboardInterrupt, SystemExit)):
raise StopTwinterpreter(message=err.__class__.__name__, exit_code=1)
Expand Down Expand Up @@ -200,7 +200,7 @@ class should not be instantiated manually. Use
empty_reply = (None, None)

def __init__(self, peer_id, kernel_client):
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.dispatcher' % (kernel_state.TWIN_ID, peer_id))
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.dispatcher' % (state.TWIN_ID, peer_id))
self.peer_id = peer_id
self.kernel_client = kernel_client
self.exit_code = None
Expand All @@ -209,7 +209,7 @@ def _dispatch_request(self, request_type, *args):
"""Forward a request to peer and return the result"""
try:
result_type, result_body = self.kernel_client.run_request((request_type, args))
except (ipyc_exceptions.IPyCTerminated, IOError, ValueError):
except (exceptions.IPyCTerminated, IOError, ValueError):
raise TwinterpeterTerminated(twin_id=self.peer_id)
if result_type == __E_SUCCESS__:
return result_body
Expand All @@ -223,7 +223,7 @@ def _dispatch_event(self, request_type, *args):
"""Forward a request to peer and return the result"""
try:
self.kernel_client.run_event((request_type, args))
except (ipyc_exceptions.IPyCTerminated, IOError, ValueError):
except (exceptions.IPyCTerminated, IOError, ValueError):
raise TwinterpeterTerminated(twin_id=self.peer_id)
return True

Expand Down
2 changes: 1 addition & 1 deletion cpy2py/kernel/kernel_state.py → cpy2py/kernel/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import os
import sys

from cpy2py.kernel.kernel_exceptions import TwinterpeterUnavailable
from cpy2py.kernel.exceptions import TwinterpeterUnavailable


# current twin state
Expand Down
12 changes: 6 additions & 6 deletions cpy2py/proxy/proxy_object.py → cpy2py/proxy/baseclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# - # limitations under the License.
import time

from cpy2py.proxy import proxy_tracker
from cpy2py.proxy.proxy_meta import TwinMeta
from cpy2py.proxy.proxy_twin import TwinProxy
from cpy2py.proxy import tracker
from cpy2py.proxy.metaclass import TwinMeta
from cpy2py.proxy.proxy import InstanceProxy


def instance_id(instance):
Expand All @@ -41,7 +41,7 @@ def new_twin_object(cls, *args, **kwargs): # pylint: disable=unused-argument
self = object.__new__(cls)
object.__setattr__(self, '__instance_id__', instance_id(self))
# register our reference for lookup
proxy_tracker.__active_instances__[self.__twin_id__, self.__instance_id__] = self
tracker.__active_instances__[self.__twin_id__, self.__instance_id__] = self
return self

# calling TwinMeta to set metaclass explicitly works for py2 and py3
Expand Down Expand Up @@ -76,6 +76,6 @@ def localmethod(method):
setattr(method, TwinMeta.mark_localmember, True)
return method

# TwinObject and TwinProxy are not created by metaclass, initialize manually
# TwinObject and InstanceProxy are not created by metaclass, initialize manually
TwinObject.__import_mod_name__ = (TwinObject.__module__, TwinObject.__name__)
TwinProxy.__import_mod_name__ = TwinObject.__import_mod_name__
InstanceProxy.__import_mod_name__ = TwinObject.__import_mod_name__
6 changes: 3 additions & 3 deletions cpy2py/proxy/proxy_func.py → cpy2py/proxy/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# - # See the License for the specific language governing permissions and
# - # limitations under the License.
from cpy2py.utility.proxy import clone_function_meta
from cpy2py.kernel import kernel_state
from cpy2py.kernel import state


def twinfunction(twinterpreter_id):
Expand All @@ -28,7 +28,7 @@ def twinfunction(twinterpreter_id):
def decorator(func):
func.__twin_id__ = twinterpreter_id
# native twin, never redirect
if kernel_state.is_twinterpreter(twinterpreter_id):
if state.is_twinterpreter(twinterpreter_id):
return func
# redirect to kernel
# - must dispatch to the proxy, otherwise pickling will fail
Expand All @@ -40,7 +40,7 @@ def decorator(func):
def function_runner_factory(*fargs, **fkwargs):
def function_runner(*args, **kwargs):
return function_runner.dispatch_call(function_dispatch_proxy, *args, **kwargs)
function_runner.dispatch_call = kernel_state.get_kernel(twinterpreter_id).dispatch_call
function_runner.dispatch_call = state.get_kernel(twinterpreter_id).dispatch_call
function_dispatch_proxy.function_runner = function_runner
return function_runner(*fargs, **fkwargs)

Expand Down

0 comments on commit 04f0adb

Please sign in to comment.