Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

new concepts: Invocation, Reply frame and ACCEPT/REJECT method

  • Loading branch information...
commit d7524d82775c42fa6daed7dee13a93bcb4d14077 1 parent a2d3f52
@sublee authored
View
25 setup.py
@@ -11,6 +11,14 @@
- A customer can invoke to any remote worker in the worker cluster.
- A customer can invoke to all remote workers in the worker cluster.
+.. _ZeroMQ: http://www.zeromq.org/
+
+Example
+=======
+
+Server-side
+-----------
+
.. sourcecode:: python
import socket
@@ -28,15 +36,26 @@ def whoami(self):
yield sock.getsockname()[0]
worker = zeronimo.Worker(Application())
+ worker.bind('ipc://worker')
+ worker.bind_fanout('ipc://worker_fanout')
+ worker.subscribe('')
+ worker.run()
+
+Client-side
+-----------
+
+.. sourcecode:: python
+
+ import zeronimo
+
customer = zeronimo.Customer()
+ customer.bind('ipc://customer')
- with customer.link([worker]) as tunnel:
+ with customer.link(['ipc://worker'], ['ipc://worker_fanout']) as tunnel:
for result in tunnel(fanout=True).whoami():
print 'hostname=', result.next()
print 'public address=', result.next()
-.. _ZeroMQ: http://www.zeromq.org/
-
"""
from __future__ import with_statement
import distutils
View
337 zeronimo/core.py → zeronimo.py
@@ -1,14 +1,15 @@
# -*- coding: utf-8 -*-
"""
- zeronimo.core
- ~~~~~~~~~~~~~
+ zeronimo
+ ~~~~~~~~
:copyright: (c) 2013 by Heungsub Lee
:license: BSD, see LICENSE for more details.
"""
-from __future__ import absolute_import
+from collections import namedtuple, Iterable, Sequence, Set, Mapping
from contextlib import contextmanager, nested
import functools
+import hashlib
from types import MethodType
import uuid
@@ -18,16 +19,9 @@
from gevent.queue import Queue, Empty
import zmq.green as zmq
-from .exceptions import ZeronimoError, NoWorkerError
-from .functional import collect_remote_functions, should_yield
-
-# task message behaviors
-ACK = 1
-RETURN = 2
-RAISE = 3
-YIELD = 4
-BREAK = 5
+__version__ = '0.0.dev'
+__all__ = []
# socket type helpers
@@ -40,31 +34,89 @@
PUSH_PULL = (zmq.PUSH, zmq.PULL)
+# exceptions
+class ZeronimoError(RuntimeError): pass
+class ZeronimoWarning(RuntimeWarning): pass
+class AcceptanceError(ZeronimoError): pass
+class SubscriptionWarning(ZeronimoWarning): pass
+
+
+# message frames
+class Invocation(namedtuple('Invocation', [
+ 'name', 'args', 'kwargs', 'task_id', 'customer_addr'])):
+
+ def __repr__(self):
+ args = (type(self).__name__,) + self
+ return '{}({!r}, {}, {}, {!r}, {!r})'.format(*args)
+
+
+class Reply(namedtuple('Reply', [
+ 'method', 'data', 'task_id', 'run_id', 'worker_addr'])):
+
+ def __repr__(self):
+ method = {1: 'ACCEPT', 0: 'REJECT',
+ 10: 'RETURN', 11: 'RAISE',
+ 12: 'YIELD', 13: 'BREAK'}[self.method]
+ args = (type(self).__name__, method) + self[1:]
+ return '{}({}, {!r}, {!r}, {!r}, {!r})'.format(*args)
+
+
+# methods
+ACCEPT = 1
+REJECT = 0
+RETURN = 10
+RAISE = 11
+YIELD = 12
+BREAK = 13
+
+
def st(x):
return SOCKET_TYPE_NAMES[x]
def dt(x):
- return {1: 'ACK', 2: 'RETURN', 3: 'RAISE', 4: 'YIELD', 5: 'BREAK'}[x]
-
-
-def generate_inproc_addr():
- return 'inproc://{0}'.format(uuid_str())
+ return {1: 'ACCEPT', 0: 'REJECT',
+ 10: 'RETURN', 11: 'RAISE', 12: 'YIELD', 13: 'BREAK'}[x]
def uuid_str():
- import hashlib
return hashlib.md5(str(uuid.uuid4())).hexdigest()[:6]
-class Communicator(object):
+def remote(func):
+ """This decorator makes a function to be collected by
+ :func:`collect_remote_functions` for being invokable by remote clients.
+ """
+ func._znm = True
+ return func
+
+
+def collect_remote_functions(obj):
+ """Collects remote functions from the object."""
+ functions = {}
+ for attr in dir(obj):
+ func = getattr(obj, attr)
+ if hasattr(func, '_znm') and func._znm:
+ functions[attr] = func
+ return functions
+
+
+def should_yield(val):
+ return (
+ isinstance(val, Iterable) and
+ not isinstance(val, (Sequence, Set, Mapping)))
+
+
+class Base(object):
"""Manages ZeroMQ sockets."""
running = 0
context = None
+ sock = None
+ addrs = None
def __new__(cls, *args, **kwargs):
- obj = super(Communicator, cls).__new__(cls)
+ obj = super(Base, cls).__new__(cls)
obj._running_lock = Semaphore()
def run(self):
if self._running_lock.locked():
@@ -82,110 +134,130 @@ def run(self):
def __init__(self, context=None):
self.context = context
-
- def run(self):
- raise NotImplementedError
+ self.reset_sockets()
def __del__(self):
self.running = 0
+ def reset_sockets(self):
+ if self.sock is not None:
+ self.sock.close()
+ self.sock = self.context.socket(zmq.PULL)
+ self.addrs = set()
-class Worker(Communicator):
+ def bind(self, addr):
+ self.sock.bind(addr)
+ self.addrs.add(addr)
- addrs = None
+ def unbind(self, addr):
+ self.sock.unbind(addr)
+ self.addrs.remove(addr)
+
+ def run(self):
+ raise NotImplementedError
+
+
+class Worker(Base):
+
+ functions = None
+ fanout_sock = None
fanout_addrs = None
fanout_filters = None
- functions = None
- def __init__(self, obj, addrs=None, fanout_addrs=None, fanout_filters='',
+ def __init__(self, obj, bind=None, bind_fanout=None, subscribe=None,
**kwargs):
- if addrs is None:
- addrs = [generate_inproc_addr()]
- if fanout_addrs is None:
- fanout_addrs = [generate_inproc_addr()]
- self.addrs = addrs
- self.fanout_addrs = fanout_addrs
- self.fanout_filters = fanout_filters
- self.functions = collect_remote_functions(obj)
super(Worker, self).__init__(**kwargs)
+ self.functions = collect_remote_functions(obj)
+ bind and self.bind(bind)
+ bind_fanout and self.bind_fanout(bind_fanout)
+ if subscribe is not None:
+ self.subscribe(subscribe)
+
+ def possible_addrs(self, sock_type):
+ return self.addrs if sock_type == zmq.PULL else self.fanout_addrs
+
+ def reset_sockets(self):
+ super(Worker, self).reset_sockets()
+ if self.fanout_sock is not None:
+ self.fanout_sock.close()
+ self.fanout_sock = self.context.socket(zmq.SUB)
+ self.fanout_addrs = set()
+ self.fanout_filters = set()
- def possible_addrs(self, socket_type):
- if socket_type == zmq.PULL:
- return self.addrs
- elif socket_type == zmq.SUB:
- return self.fanout_addrs
- else:
- socket_type_name = SOCKET_TYPE_NAMES[socket_type]
- raise ValueError('{!r} is not acceptable'.format(socket_type_name))
+ def bind_fanout(self, addr):
+ self.fanout_sock.bind(addr)
+ self.fanout_addrs.add(addr)
+
+ def unbind_fanout(self, addr):
+ self.fanout_sock.unbind(addr)
+ self.fanout_addrs.remove(addr)
+
+ def subscribe(self, fanout_filter):
+ self.fanout_sock.setsockopt(zmq.SUBSCRIBE, fanout_filter)
+ self.fanout_filters.add(fanout_filter)
+
+ def unsubscribe(self, fanout_filter):
+ self.fanout_sock.setsockopt(zmq.UNSUBSCRIBE, fanout_filter)
+ try:
+ self.fanout_filters.remove(fanout_filter)
+ except KeyError:
+ pass
- def run_task(self, fn, args, kwargs, customer_addr, task_id, waiting):
+ def run_task(self, invocation):
run_id = uuid_str()
- print 'worker recv %s%r from %s:%s of %s' % \
- (fn, args, task_id, run_id, customer_addr)
- if waiting:
- sock = self.context.socket(zmq.PUSH)
- sock.connect(customer_addr)
- #TODO: addrs[0] -> public_addr
- sock.send_pyobj((ACK, (self.addrs[0], run_id), task_id, run_id))
- else:
+ meta = (invocation.task_id, run_id, list(self.addrs)[0])
+ name = invocation.name
+ args = invocation.args
+ kwargs = invocation.kwargs
+ print 'worker recv %r (run_id=%s)' % (invocation, run_id)
+ if invocation.customer_addr is None:
sock = False
+ else:
+ sock = self.context.socket(zmq.PUSH)
+ sock.connect(invocation.customer_addr)
+ sock.send_pyobj(Reply(ACCEPT, None, *meta))
try:
- val = self.functions[fn](*args, **kwargs)
+ val = self.functions[name](*args, **kwargs)
except Exception, error:
- print 'worker %s %r to %s:%s' % \
- (dt(RAISE), error, task_id, run_id)
- sock and sock.send_pyobj((RAISE, error, task_id, run_id))
+ print 'worker send %r' % (Reply(RAISE, error, *meta),)
+ sock and sock.send_pyobj(Reply(RAISE, error, *meta))
raise
if should_yield(val):
try:
for item in val:
- print 'worker %s %r to %s:%s' % \
- (dt(YIELD), item, task_id, run_id)
- sock and sock.send_pyobj((YIELD, item, task_id, run_id))
+ print 'worker send %r' % (Reply(YIELD, item, *meta),)
+ sock and sock.send_pyobj(Reply(YIELD, item, *meta))
except Exception, error:
- print 'worker %s %r to %s:%s' % \
- (dt(RAISE), error, task_id, run_id)
- sock and sock.send_pyobj((RAISE, error, task_id, run_id))
+ print 'worker send %r' % (Reply(RAISE, error, *meta),)
+ sock and sock.send_pyobj(Reply(RAISE, error, *meta))
else:
- print 'worker %s %r to %s:%s' % \
- (dt(BREAK), None, task_id, run_id)
- sock and sock.send_pyobj((BREAK, None, task_id, run_id))
+ print 'worker send %r' % (Reply(BREAK, None, *meta),)
+ sock and sock.send_pyobj(Reply(BREAK, None, *meta))
else:
- print 'worker %s %r to %s:%s' % \
- (dt(RETURN), val, task_id, run_id)
- sock and sock.send_pyobj((RETURN, val, task_id, run_id))
+ print 'worker send %r' % (Reply(RETURN, val, *meta),)
+ sock and sock.send_pyobj(Reply(RETURN, val, *meta))
def run(self):
- self.sock = self.context.socket(zmq.PULL)
- self.fanout_sock = self.context.socket(zmq.SUB)
- self.fanout_sock.setsockopt(zmq.SUBSCRIBE, '')
- # bind addresses
- for addr in self.addrs:
- self.sock.bind(addr)
- for addr in self.fanout_addrs:
- self.fanout_sock.bind(addr)
- # serve both sockets
+ if not self.fanout_filters:
+ from warnings import warn
+ warn('Didn\'t subscribe any topic', SubscriptionWarning)
def serve(sock):
while self.running:
- spawn(self.run_task, *sock.recv_pyobj())
+ spawn(self.run_task, sock.recv_pyobj())
joinall([spawn(serve, self.sock), spawn(serve, self.fanout_sock)])
-class Customer(Communicator):
+class Customer(Base):
- addr = None
- sock = None
tunnels = None
tasks = None
- def __init__(self, addr=None, **kwargs):
- if addr is None:
- addr = 'inproc://{0}'.format(uuid_str())
- self.addr = addr
+ def __init__(self, bind=None, **kwargs):
+ super(Customer, self).__init__(**kwargs)
self.tunnels = set()
self.tasks = {}
self._missing_tasks = {}
- super(Customer, self).__init__(**kwargs)
+ bind and self.bind(bind)
def link(self, *args, **kwargs):
return Tunnel(self, *args, **kwargs)
@@ -225,20 +297,19 @@ def _restore_missing_messages(self, task):
del self._missing_tasks[task.id]
try:
while missing.queue:
- task.put(*missing.queue.get(block=False))
+ task.queue.put(missing.queue.get(block=False))
except Empty:
pass
def run(self):
- assert self.sock is None
- self.sock = self.context.socket(zmq.PULL)
- self.sock.bind(self.addr)
while self.tunnels:
try:
- do, val, task_id, run_id = self.sock.recv_pyobj()
+ reply = self.sock.recv_pyobj()
except zmq.ZMQError:
continue
- if do == ACK:
+ task_id = reply.task_id
+ run_id = reply.run_id
+ if reply.method in (ACCEPT, REJECT):
run_id = None
try:
tasks = self.tasks[task_id]
@@ -257,7 +328,8 @@ def run(self):
self._missing_tasks[task_id] = {run_id: task}
elif run_id not in self._missing_tasks[task_id]:
self._missing_tasks[task_id][run_id] = task
- task.put(do, val)
+ print 'customer recv %r' % (reply,)
+ task.queue.put(reply)
self.sock = None
@@ -285,12 +357,14 @@ def __init__(self, customer, workers,
def _znm_is_alive(self):
return self in self._znm_customer.tunnels
- def _znm_invoke(self, fn, *args, **kwargs):
+ def _znm_invoke(self, name, *args, **kwargs):
"""Invokes remote function."""
+ customer_addr = list(self._znm_customer.addrs)[0] \
+ if self._znm_wait else None
task = Task(self)
sock = self._znm_sockets[zmq.PUB if self._znm_fanout else zmq.PUSH]
- sock.send_pyobj((fn, args, kwargs,
- self._znm_customer.addr, task.id, self._znm_wait))
+ print 'tunnel send %r' % (Invocation(name, args, kwargs, task.id, customer_addr),)
+ sock.send_pyobj(Invocation(name, args, kwargs, task.id, customer_addr))
if not self._znm_wait:
# immediately if workers won't wait
return
@@ -345,58 +419,59 @@ def __init__(self, tunnel, id=None, run_id=None):
def collect(self, timeout=0.01):
assert self.tunnel._znm_wait
self.customer.register_task(self)
- msgs = []
+ replies = []
with Timeout(timeout, False):
while True:
- msgs.append(self.queue.get())
+ reply = self.queue.get()
+ assert isinstance(reply, Reply)
+ replies.append(reply)
if not self.tunnel._znm_fanout:
break
self.customer.unregister_task(self)
- if not msgs:
- raise NoWorkerError('There are no workers which respond')
+ if not replies:
+ raise AcceptanceError('No workers which accepted')
if self.tunnel._znm_fanout:
tasks = []
- for do, (worker_addr, run_id) in msgs:
- assert do == ACK
- each_task = Task(self.tunnel, self.id, run_id)
- each_task.worker_addr = worker_addr
+ for reply in replies:
+ assert reply.method == ACCEPT
+ each_task = Task(self.tunnel, self.id, reply.run_id)
+ each_task.worker_addr = reply.worker_addr
tasks.append(each_task)
self.customer.register_task(each_task)
return tasks if self.tunnel._znm_as_task else [t() for t in tasks]
else:
- do, val = msgs[0]
- assert len(msgs) == 1
- assert do == ACK
- self.worker_addr, self.run_id = val
+ reply = replies[0]
+ assert len(replies) == 1
+ assert reply.method == ACCEPT
+ self.worker_addr = reply.worker_addr
+ self.run_id = reply.run_id
self.customer.register_task(self)
return self if self.tunnel._znm_as_task else self()
- def put(self, do, val):
- print 'task(%s:%s) recv %s %r' % \
- (self.id, self.run_id, dt(do), val)
- self.queue.put((do, val))
-
def __call__(self):
- do, val= self.queue.get()
- if do in (RETURN, RAISE):
+ reply = self.queue.get()
+ print 'task recv %r' % (reply,)
+ assert reply.method not in (ACCEPT, REJECT)
+ if reply.method in (RETURN, RAISE):
self.customer.unregister_task(self)
- assert do != ACK
- if do == RETURN:
- return val
- elif do == RAISE:
- raise val
- elif do == YIELD:
- return self.make_generator(val)
- elif do == BREAK:
+ if reply.method == RETURN:
+ return reply.data
+ elif reply.method == RAISE:
+ raise reply.data
+ elif reply.method == YIELD:
+ return self.make_generator(reply)
+ elif reply.method == BREAK:
return iter([])
- def make_generator(self, first_val):
- yield first_val
+ def make_generator(self, first_reply):
+ yield first_reply.data
while True:
- do, val = self.queue.get()
- if do == YIELD:
- yield val
- elif do == RAISE:
- raise val
- elif do == BREAK:
+ reply = self.queue.get()
+ print 'task recv %r' % (reply,)
+ assert reply.method not in (ACCEPT, REJECT, RETURN)
+ if reply.method == YIELD:
+ yield reply.data
+ elif reply.method == RAISE:
+ raise reply.data
+ elif reply.method == BREAK:
break
View
18 zeronimo/__init__.py
@@ -1,18 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- zeronimo
- ~~~~~~~~
-
- A distributed RPC solution based on ØMQ.
-
- :copyright: (c) 2013 by Heungsub Lee
- :license: BSD, see LICENSE for more details.
-"""
-from __future__ import absolute_import
-
-from .core import Worker, Customer, Tunnel, Task
-from .functional import remote
-
-
-__version__ = '0.0.dev'
-__all__ = ['Worker', 'Customer', 'Tunnel', 'Task', 'remote']
View
16 zeronimo/exceptions.py
@@ -1,16 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- zeronimo.exceptions
- ~~~~~~~~~~~~~~~~~~~
-
- :copyright: (c) 2013 by Heungsub Lee
- :license: BSD, see LICENSE for more details.
-"""
-import gevent
-
-
-class ZeronimoError(RuntimeError): pass
-class NoWorkerError(ZeronimoError): pass
-
-#: an alias for :exc:`gevent.Timeout`.
-TimeoutError = gevent.Timeout
View
39 zeronimo/functional.py
@@ -1,39 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- zeronimo.functional
- ~~~~~~~~~~~~~~~~~~~
-
- Provides higher-order functions.
-
- :copyright: (c) 2013 by Heungsub Lee
- :license: BSD, see LICENSE for more details.
-"""
-from collections import Iterable, Sequence, Set, Mapping, namedtuple
-import functools
-import hashlib
-
-from gevent.coros import Semaphore
-
-
-def remote(func):
- """This decorator makes a function to be collected by
- :func:`collect_remote_functions` for being invokable by remote clients.
- """
- func._znm = True
- return func
-
-
-def collect_remote_functions(obj):
- """Collects remote functions from the object."""
- functions = {}
- for attr in dir(obj):
- func = getattr(obj, attr)
- if hasattr(func, '_znm') and func._znm:
- functions[attr] = func
- return functions
-
-
-def should_yield(val):
- return (
- isinstance(val, Iterable) and
- not isinstance(val, (Sequence, Set, Mapping)))
View
81 zeronimotests.py
@@ -1,4 +1,5 @@
import functools
+from itertools import chain
import os
import textwrap
import uuid
@@ -13,12 +14,11 @@
zmq_context = zmq.Context()
-#gevent.hub.get_hub().print_exception = lambda *a, **k: 'do not print exception'
+gevent.hub.get_hub().print_exception = lambda *a, **k: 'do not print exception'
@decorator
def green(f, *args, **kwargs):
- print
return spawn(f, *args, **kwargs).get()
@@ -40,35 +40,20 @@ def is_connectable(addr, socket_type, context=zmq_context):
return True
+def inproc():
+ return 'inproc://{0}'.format(zeronimo.uuid_str())
+
+
def start_workers(workers):
waits = []
for worker in workers:
spawn(worker.run)
- until = lambda: is_connectable(worker.addrs[0], zmq.PUSH)
- waits.append(spawn(busywait, until, until=True, timeout=1))
+ for addr in chain(worker.addrs, worker.fanout_addrs):
+ until = lambda: is_connectable(addr, zmq.PUSH)
+ waits.append(spawn(busywait, until, until=True, timeout=1))
joinall(waits)
-'''
-FD_DIR = os.path.join(os.path.dirname(__file__), '_fds')
-def generate_endpoint(protocol, name=None, offset=None):
- if protocol in 'inproc':
- if name is None:
- name = str(uuid.uuid4())
- endpoint = 'inproc://{}'.format(name)
- if offset is not None:
- endpoint = '-'.join([endpoint, str(offset)])
- return endpoint
- elif protocol == 'ipc':
- if not os.isdir(FD_DIR):
- os.makedir(FD_DIR)
- fd = int(sorted(os.listdir(FD_DIR), reverse=True)[0]) + 1
- return 'ipc://_fds/{}'.format(fd)
- elif protocol == 'tcp':
- return 'tcp://*:*'
-'''
-
-
class Application(object):
@zeronimo.remote
@@ -136,10 +121,11 @@ def sleep(self):
@pytest.fixture
def worker{x}():
app = Application()
- return zeronimo.Worker(app, context=zmq_context)
+ return zeronimo.Worker(app, inproc(), inproc(), '',
+ context=zmq_context)
@pytest.fixture
def customer{x}():
- return zeronimo.Customer(context=zmq_context)
+ return zeronimo.Customer(inproc(), context=zmq_context)
''').format(x=x if x else ''))
@@ -160,39 +146,23 @@ def baz(self):
yield 'baz-%s-end' % id(self)
# collect from an object
app = App()
- functions = dict(zeronimo.functional.collect_remote_functions(app))
+ functions = dict(zeronimo.collect_remote_functions(app))
assert set(functions.keys()) == set(['foo', 'bar', 'baz'])
# collect from a class
- functions = dict(zeronimo.functional.collect_remote_functions(App))
+ functions = dict(zeronimo.collect_remote_functions(App))
assert set(functions.keys()) == set(['foo', 'bar', 'baz'])
-def _test_fingerprint():
- class Nothing(object): pass
- blueprint = dict(zeronimo.functional.extract_blueprint(Application))
- blueprint2 = dict(zeronimo.functional.extract_blueprint(Application()))
- blueprint3 = dict(zeronimo.functional.extract_blueprint(Nothing))
- fingerprint = zeronimo.functional.make_fingerprint(blueprint)
- fingerprint2 = zeronimo.functional.make_fingerprint(blueprint2)
- fingerprint3 = zeronimo.functional.make_fingerprint(blueprint3)
- assert fingerprint == fingerprint2
- assert fingerprint != fingerprint3
-
-
-def test_default_addr(customer, worker):
- assert worker.addrs[0].startswith('inproc://')
- assert customer.addr.startswith('inproc://')
-
-
def test_running():
- from zeronimo.core import Communicator
- class TestingCommunicator(Communicator):
+ class Runner(zeronimo.Base):
+ def reset_sockets(self):
+ pass
def run(self):
assert self.running
- comm = TestingCommunicator()
- assert not comm.running
- comm.run()
- assert not comm.running
+ runner = Runner()
+ assert not runner.running
+ runner.run()
+ assert not runner.running
@green
@@ -202,12 +172,14 @@ def test_tunnel(customer, worker):
with customer.link([worker]) as tunnel:
assert len(customer.tunnels) == 1
assert len(customer.tunnels) == 0
+ '''
with customer.link([worker]) as tunnel1, \
customer.link([worker]) as tunnel2:
assert not customer.running
assert len(customer.tunnels) == 2
tunnel1.add(0, 0)
assert customer.running
+ '''
assert len(customer.tunnels) == 0
busywait(lambda: customer.running, timeout=1)
assert not customer.running
@@ -312,3 +284,10 @@ def test_slow(customer, worker):
with Timeout(0.1):
tunnel.sleep()
assert tunnel.sleep() == 'slept'
+
+
+@green
+def _test_link_to_addrs(customer, worker):
+ start_workers([worker])
+ with customer.link(worker.addrs) as tunnel:
+ assert tunnel.add(1, 1) == 'cutie'
Please sign in to comment.
Something went wrong with that request. Please try again.