Permalink
Browse files

remove concepts: blueprint, fingerprint

  • Loading branch information...
1 parent 153ba60 commit a2d3f5229932e410bd9d64dae21eb5e577d1c521 @sublee committed Apr 4, 2013
Showing with 36 additions and 87 deletions.
  1. +2 −2 setup.py
  2. +2 −2 zeronimo/__init__.py
  3. +5 −26 zeronimo/core.py
  4. +9 −28 zeronimo/functional.py
  5. +18 −29 zeronimotests.py
View
@@ -18,7 +18,7 @@
class Application(object):
- @zeronimo.register(fanout=True)
+ @zeronimo.remote
def whoami(self):
# hostname
yield socket.gethostname()
@@ -31,7 +31,7 @@ def whoami(self):
customer = zeronimo.Customer()
with customer.link([worker]) as tunnel:
- for result in tunnel.whoami():
+ for result in tunnel(fanout=True).whoami():
print 'hostname=', result.next()
print 'public address=', result.next()
View
@@ -11,8 +11,8 @@
from __future__ import absolute_import
from .core import Worker, Customer, Tunnel, Task
-from .functional import register
+from .functional import remote
__version__ = '0.0.dev'
-__all__ = ['Worker', 'Customer', 'Tunnel', 'Task', 'register']
+__all__ = ['Worker', 'Customer', 'Tunnel', 'Task', 'remote']
View
@@ -19,7 +19,7 @@
import zmq.green as zmq
from .exceptions import ZeronimoError, NoWorkerError
-from .functional import extract_blueprint, make_fingerprint, should_yield
+from .functional import collect_remote_functions, should_yield
# task message behaviors
@@ -95,8 +95,7 @@ class Worker(Communicator):
addrs = None
fanout_addrs = None
fanout_filters = None
- blueprint = None
- fingerprint = None
+ functions = None
def __init__(self, obj, addrs=None, fanout_addrs=None, fanout_filters='',
**kwargs):
@@ -107,8 +106,7 @@ def __init__(self, obj, addrs=None, fanout_addrs=None, fanout_filters='',
self.addrs = addrs
self.fanout_addrs = fanout_addrs
self.fanout_filters = fanout_filters
- self.blueprint = extract_blueprint(obj)
- self.fingerprint = make_fingerprint(self.blueprint)
+ self.functions = collect_remote_functions(obj)
super(Worker, self).__init__(**kwargs)
def possible_addrs(self, socket_type):
@@ -132,7 +130,7 @@ def run_task(self, fn, args, kwargs, customer_addr, task_id, waiting):
else:
sock = False
try:
- val = self.blueprint[fn].func(*args, **kwargs)
+ val = self.functions[fn](*args, **kwargs)
except Exception, error:
print 'worker %s %r to %s:%s' % \
(dt(RAISE), error, task_id, run_id)
@@ -268,8 +266,7 @@ class Tunnel(object):
request of RPC through the customer's sockets.
:param customer: the :class:`Customer` object.
- :param workers: the :class:`Worker` objects. All workers must have same
- fingerprint.
+ :param workers: the :class:`Worker` objects.
:param return_task: if set to ``True``, the remote functions return a
:class:`Task` object instead of received value.
:type return_task: bool
@@ -284,31 +281,13 @@ def __init__(self, customer, workers,
self._znm_wait = wait
self._znm_fanout = fanout
self._znm_as_task = as_task
- #self._znm_verify_workers(workers)
- #self._znm_blueprint = workers[0].blueprint
- #self._znm_reflect(self._znm_blueprint)
def _znm_is_alive(self):
return self in self._znm_customer.tunnels
- def _znm_verify_workers(self, workers):
- worker = workers[0]
- for other_worker in workers[1:]:
- if worker.fingerprint != other_worker.fingerprint:
- raise ValueError('All workers must have same fingerprint')
- return workers
-
- def _znm_reflect(self, blueprint):
- """Sets methods which follows remote functions with same name."""
- for fn in blueprint.viewkeys():
- if hasattr(self, fn):
- raise AttributeError('{!r} is already used'.format(fn))
- setattr(self, fn, functools.partial(self._znm_invoke, fn))
-
def _znm_invoke(self, fn, *args, **kwargs):
"""Invokes remote function."""
task = Task(self)
- #spec = self._znm_blueprint[fn]
sock = self._znm_sockets[zmq.PUB if self._znm_fanout else zmq.PUSH]
sock.send_pyobj((fn, args, kwargs,
self._znm_customer.addr, task.id, self._znm_wait))
View
@@ -15,41 +15,22 @@
from gevent.coros import Semaphore
-Spec = namedtuple('Spec', ['func', 'fanout', 'reply'])
-
-
-def register(f=None, fanout=False, reply=True):
+def remote(func):
"""This decorator makes a function to be collected by
- :func:`collect_blueprint` for being invokable by remote clients.
+ :func:`collect_remote_functions` for being invokable by remote clients.
"""
- def decorator(f):
- f._znm_fanout = fanout
- f._znm_reply = reply
- return f
- return decorator(f) if f is not None else decorator
+ func._znm = True
+ return func
-def extract_blueprint(obj):
+def collect_remote_functions(obj):
"""Collects remote functions from the object."""
- blueprint = {}
+ functions = {}
for attr in dir(obj):
func = getattr(obj, attr)
- try:
- fanout, reply = func._znm_fanout, func._znm_reply
- except AttributeError:
- continue
- blueprint[func.__name__] = Spec(func, fanout, reply)
- return blueprint
-
-
-def make_fingerprint(blueprint):
- hexh = lambda x: hex(hash(x))
- md5, sha1 = hashlib.md5(), hashlib.sha1()
- for fn, spec in blueprint.viewitems():
- frag = ' '.join(map(repr, [fn, spec.fanout, spec.reply])) + '\n'
- md5.update(frag)
- sha1.update(frag)
- return '-'.join([md5.hexdigest(), sha1.hexdigest()])
+ if hasattr(func, '_znm') and func._znm:
+ functions[attr] = func
+ return functions
def should_yield(val):
View
@@ -71,7 +71,7 @@ def generate_endpoint(protocol, name=None, offset=None):
class Application(object):
- @zeronimo.register
+ @zeronimo.remote
def add(self, a, b):
"""Koreans' mathematical addition."""
if a == b:
@@ -81,48 +81,48 @@ def add(self, a, b):
return 'xoxoxoxoxoxo cutie'
return a + b
- @zeronimo.register
+ @zeronimo.remote
def jabberwocky(self):
yield 'Twas brillig, and the slithy toves'
yield 'Did gyre and gimble in the wabe;'
yield 'All mimsy were the borogoves,'
yield 'And the mome raths outgrabe.'
- @zeronimo.register
+ @zeronimo.remote
def xrange(self):
return xrange(5)
- @zeronimo.register
+ @zeronimo.remote
def dict_view(self):
return dict(zip(xrange(5), xrange(5))).viewkeys()
- @zeronimo.register
+ @zeronimo.remote
def dont_yield(self):
if False:
yield 'it should\'t be sent'
assert 0
- @zeronimo.register
+ @zeronimo.remote
def zero_div(self):
0/0 /0/0 /0/0 /0/0 /0/0/0/0 /0/0
0/0 /0 /0/0 /0/0 /0/0 /0/0 /0/0 /0
0/0/0/0/0/0/0 /0/0/0/0/0/0 /0/0 /0/0 /0
0/0/0/0/0/0/0 /0/0 /0/0 /0/0 /0/0
0/0 /0 /0/0 /0/0 /0/0 /0/0/0/0 /0
- @zeronimo.register
+ @zeronimo.remote
def launch_rocket(self):
yield 3
yield 2
yield 1
raise RuntimeError('Launch!')
- @zeronimo.register
+ @zeronimo.remote
def rycbar123(self):
for word in 'run, you clever boy; and remember.'.split():
yield word
- @zeronimo.register
+ @zeronimo.remote
def sleep(self):
gevent.sleep(0.1)
return 'slept'
@@ -146,36 +146,25 @@ def customer{x}():
# tests
-def _test_blueprint_extraction():
+def test_remote_function_collection():
class App(object):
- @zeronimo.register
+ @zeronimo.remote
def foo(self):
return 'foo-%s' % id(self)
- @zeronimo.register(fanout=True)
+ @zeronimo.remote
def bar(self):
return 'bar-%s' % id(self)
- @zeronimo.register
+ @zeronimo.remote
def baz(self):
yield 'baz-%s-begin' % id(self)
yield 'baz-%s-end' % id(self)
# collect from an object
app = App()
- blueprint = dict(zeronimo.functional.extract_blueprint(app))
- assert not blueprint['foo'].fanout
- assert blueprint['foo'].reply
- assert blueprint['foo'].reply
- assert blueprint['bar'].fanout
- assert blueprint['bar'].reply
- assert not blueprint['baz'].fanout
- assert blueprint['baz'].reply
+ functions = dict(zeronimo.functional.collect_remote_functions(app))
+ assert set(functions.keys()) == set(['foo', 'bar', 'baz'])
# collect from a class
- blueprint = dict(zeronimo.functional.extract_blueprint(App))
- assert not blueprint['foo'].fanout
- assert blueprint['foo'].reply
- assert blueprint['bar'].fanout
- assert blueprint['bar'].reply
- assert not blueprint['baz'].fanout
- assert blueprint['baz'].reply
+ functions = dict(zeronimo.functional.collect_remote_functions(App))
+ assert set(functions.keys()) == set(['foo', 'bar', 'baz'])
def _test_fingerprint():
@@ -307,7 +296,7 @@ def test_fanout(customer, worker1, worker2):
assert rycbar123.next() == 'remember.'
with pytest.raises(ZeroDivisionError):
tunnel(fanout=True).zero_div()
- failures = tunnel(as_task=True, fanout=True).zero_div()
+ failures = tunnel(fanout=True, as_task=True).zero_div()
assert len(failures) == 2
with pytest.raises(ZeroDivisionError):
failures[0]()

0 comments on commit a2d3f52

Please sign in to comment.