Permalink
Browse files

new concepts: spec, blueprint, signature

  • Loading branch information...
1 parent 6677f8d commit ac51871c888a5659d223f24d83101c54e7f925ba @sublee committed Apr 3, 2013
Showing with 83 additions and 56 deletions.
  1. +2 −2 zeronimo/__init__.py
  2. +26 −30 zeronimo/core.py
  3. +27 −9 zeronimo/functional.py
  4. +28 −15 zeronimotests.py
@@ -11,9 +11,9 @@
from __future__ import absolute_import
from .core import Worker, Customer, Tunnel, Task
-from .functional import register, collect_remote_functions
+from .functional import register, extract_blueprint, sign_blueprint
__version__ = '0.0.dev'
__all__ = ['Worker', 'Customer', 'Tunnel', 'Task',
- 'register', 'collect_remote_functions']
+ 'register', 'extract_blueprint', 'sign_blueprint']
View
@@ -18,7 +18,7 @@
from gevent.queue import Queue
import zmq.green as zmq
-from .functional import collect_remote_functions, should_yield
+from .functional import extract_blueprint, sign_blueprint, should_yield
# task message behaviors
@@ -70,16 +70,14 @@ def __del__(self):
class Worker(Communicator):
addr = None
- functions = None
+ blueprint = None
def __init__(self, obj, addr=None, **kwargs):
if addr is None:
addr = 'inproc://{0}'.format(str(uuid4()))
self.addr = addr
- self.functions = {func.__name__: (func, plan)
- for func, plan in collect_remote_functions(obj)}
- for func, plan in collect_remote_functions(obj):
- self.functions[func.__name__] = (func, plan)
+ self.blueprint = extract_blueprint(obj)
+ self.signature = sign_blueprint(self.blueprint)
super(Worker, self).__init__(**kwargs)
def possible_addrs(self, socket_type):
@@ -103,15 +101,15 @@ def serve(self, sock_type, addr):
spawn(self.task_received, *sock.recv_pyobj())
def task_received(self, customer_addr, task_id, fn, args, kwargs):
- func, plan = self.functions[fn]
- if plan.reply:
+ spec = self.blueprint[fn]
+ if spec.reply:
sock = self.context.socket(zmq.PUSH)
sock.connect(customer_addr)
sock.send_pyobj((task_id, ACK, self.addr))
else:
sock = False
try:
- val = func(*args, **kwargs)
+ val = spec.func(*args, **kwargs)
except Exception, error:
sock and sock.send_pyobj((task_id, RAISE, error))
raise
@@ -177,48 +175,46 @@ class Tunnel(object):
request of RPC through the customer's sockets.
:param customer: the :class:`Customer` object.
- :param workers: the :class:`Worker` objects. All workers have to contain
- same remote functions.
+ :param workers: the :class:`Worker` objects. All workers must have same
+ signature.
:param return_task: if set to ``True``, the remote functions return a
:class:`Task` object instead of received value.
:type return_task: bool
"""
def __init__(self, customer, workers, return_task=False):
self._znm_customer = customer
- self._znm_workers = self._znm_verify_workers(workers)
- self._znm_worker = self._znm_workers[0]
+ workers, blueprint = self._znm_verify_workers(workers)
+ self._znm_workers = workers
+ self._znm_blueprint = blueprint
self._znm_return_task = return_task
self._znm_sockets = {}
- self._znm_reflect(self._znm_worker)
+ self._znm_reflect(blueprint)
def _znm_verify_workers(self, workers):
if isinstance(workers, Worker):
worker = workers
workers = [worker]
- def get_plans(worker):
- return [(fn, plan)
- for fn, (__, plan) in worker.functions.iteritems()]
- std_plans = get_plans(workers[0])
- for worker in workers[1:]:
- if get_plans(worker) != std_plans:
- raise ValueError('All workers have to contain '
- 'same remote functions')
- return workers
-
- def _znm_reflect(self, worker):
+ worker = workers[0]
+ blueprint = worker.blueprint
+ for other_worker in workers[1:]:
+ if worker.signature != other_worker.signature:
+ raise ValueError('All workers must have same signature')
+ return workers, blueprint
+
+ def _znm_reflect(self, blueprint):
"""Sets methods which follows remote functions with same name."""
- for fn in worker.functions:
+ for fn in blueprint.viewkeys():
if hasattr(self, fn):
raise AttributeError('{!r} is already used'.format(fn))
setattr(self, fn, functools.partial(self._znm_invoke, fn))
def _znm_invoke(self, fn, *args, **kwargs):
- plan = self._znm_worker.functions[fn][-1]
+ spec = self._znm_blueprint[fn]
task = Task(self)
- sock = self._znm_sockets[zmq.PUB if plan.fanout else zmq.PUSH]
+ sock = self._znm_sockets[zmq.PUB if spec.fanout else zmq.PUSH]
sock.send_pyobj((self._znm_customer.addr, task.id, fn, args, kwargs))
- if not plan.reply:
+ if not spec.reply:
return
if not self._znm_customer.running:
spawn(self._znm_customer.run)
@@ -236,7 +232,7 @@ def __enter__(self):
return self
def __exit__(self, error, error_type, traceback):
- for sock in self._znm_sockets.itervalues():
+ for sock in self._znm_sockets.viewvalues():
sock.close()
self._znm_sockets.clear()
self._znm_customer.unregister_tunnel(self)
@@ -9,33 +9,51 @@
:license: BSD, see LICENSE for more details.
"""
from collections import Iterable, Sequence, Set, Mapping, namedtuple
+import hashlib
+import inspect
-Plan = namedtuple('Plan', ['fanout', 'reply'])
+Spec = namedtuple('Spec', [
+ 'func', 'args', 'varargs', 'keywords', 'fanout', 'reply'])
def register(f=None, fanout=False, reply=True):
"""This decorator makes a function to be collected by
- :func:`collect_remote_functions` for being invokable by remote clients.
+ :func:`collect_blueprint` for being invokable by remote clients.
"""
- plan = Plan(fanout, reply)
def decorator(f):
- f._znm_plan = plan
+ f._znm_fanout = fanout
+ f._znm_reply = reply
return f
return decorator(f) if f is not None else decorator
-def collect_remote_functions(obj):
+def extract_blueprint(obj):
"""Collects remote functions from the object."""
+ blueprint = {}
for attr in dir(obj):
func = getattr(obj, attr)
try:
- plan = func._znm_plan
+ fanout, reply = func._znm_fanout, func._znm_reply
except AttributeError:
continue
- yield func, plan
+ args, varargs, keywords = inspect.getargspec(func)[:3]
+ spec = Spec(func, args, varargs, keywords, fanout, reply)
+ blueprint[func.__name__] = spec
+ return blueprint
+
+
+def sign_blueprint(blueprint):
+ hexh = lambda x: hex(hash(x))
+ md5, sha1 = hashlib.md5(), hashlib.sha1()
+ for fn, spec in blueprint.viewitems():
+ fingerprint = ' '.join(map(repr, (fn,) + spec[1:])) + '\n'
+ md5.update(fingerprint)
+ sha1.update(fingerprint)
+ return '-'.join([md5.hexdigest(), sha1.hexdigest()])
def should_yield(val):
- return (isinstance(val, Iterable) and
- not isinstance(val, (Sequence, Set, Mapping)))
+ return (
+ isinstance(val, Iterable) and
+ not isinstance(val, (Sequence, Set, Mapping)))
View
@@ -145,7 +145,7 @@ def customer2():
# tests
-def test_remote_method_collection():
+def test_blueprint_extraction():
class App(object):
@zeronimo.register
def foo(self):
@@ -159,21 +159,34 @@ def baz(self):
yield 'baz-%s-end' % id(self)
# collect from an object
app = App()
- plans = dict(zeronimo.collect_remote_functions(app))
- assert not plans[app.foo].fanout
- assert plans[app.foo].reply
- assert plans[app.bar].fanout
- assert plans[app.bar].reply
- assert not plans[app.baz].fanout
- assert plans[app.baz].reply
+ blueprint = dict(zeronimo.extract_blueprint(app))
+ assert not blueprint['foo'].fanout
+ assert blueprint['foo'].reply
+ assert blueprint['foo'].reply
+ assert blueprint['bar'].fanout
+ assert blueprint['bar'].reply
+ assert not blueprint['baz'].fanout
+ assert blueprint['baz'].reply
# collect from a class
- plans = dict(zeronimo.collect_remote_functions(App))
- assert not plans[App.foo].fanout
- assert plans[App.foo].reply
- assert plans[App.bar].fanout
- assert plans[App.bar].reply
- assert not plans[App.baz].fanout
- assert plans[App.baz].reply
+ blueprint = dict(zeronimo.extract_blueprint(App))
+ assert not blueprint['foo'].fanout
+ assert blueprint['foo'].reply
+ assert blueprint['bar'].fanout
+ assert blueprint['bar'].reply
+ assert not blueprint['baz'].fanout
+ assert blueprint['baz'].reply
+
+
+def test_signature():
+ class Nothing(object): pass
+ blueprint = dict(zeronimo.extract_blueprint(Application))
+ blueprint2 = dict(zeronimo.extract_blueprint(Application()))
+ blueprint3 = dict(zeronimo.extract_blueprint(Nothing))
+ signature = zeronimo.sign_blueprint(blueprint)
+ signature2 = zeronimo.sign_blueprint(blueprint2)
+ signature3 = zeronimo.sign_blueprint(blueprint3)
+ assert signature == signature2
+ assert signature != signature3
def test_default_addr(customer, worker):

0 comments on commit ac51871

Please sign in to comment.