Skip to content
Browse files

implements remote iterator

  • Loading branch information...
1 parent da05f42 commit 09a830739e35200ef6788d784374d51fc9245a01 @sublee committed Apr 3, 2013
Showing with 88 additions and 38 deletions.
  1. +52 −34 zeronimo/core.py
  2. +36 −4 zeronimotests.py
View
86 zeronimo/core.py
@@ -26,6 +26,7 @@
RETURN = 2
RAISE = 3
YIELD = 4
+BREAK = 5
# socket type helpers
@@ -70,17 +71,15 @@ class Worker(Communicator):
addr = None
functions = None
- plans = None
def __init__(self, obj, addr=None, **kwargs):
if addr is None:
addr = 'inproc://{0}'.format(str(uuid.uuid4()))
self.addr = addr
- self.functions = {}
- self.plans = {}
+ self.functions = {func.__name__: (func, plan)
+ for func, plan in collect_remote_functions(obj)}
for func, plan in collect_remote_functions(obj):
- self.functions[func.__name__] = func
- self.plans[func.__name__] = plan
+ self.functions[func.__name__] = (func, plan)
super(Worker, self).__init__(**kwargs)
def possible_addrs(self, socket_type):
@@ -97,21 +96,31 @@ def run(self):
joinall([spawn(self._run_direct)])
def task_received(self, customer_addr, task_id, fn, args, kwargs):
- func = self.functions[fn]
- if not self.plans[fn].reply:
- spawn(func, *args, **kwargs)
- return
- sock = self.context.socket(zmq.PUSH)
- sock.connect(customer_addr)
- sock.send_pyobj((task_id, ACK, None))
- value = func(*args, **kwargs)
- sock.send_pyobj((task_id, RETURN, value))
+ func, plan = self.functions[fn]
+ if plan.reply:
+ sock = self.context.socket(zmq.PUSH)
+ sock.connect(customer_addr)
+ sock.send_pyobj((task_id, ACK, None))
+ else:
+ sock = False
+ try:
+ value = func(*args, **kwargs)
+ except Exception, error:
+ sock and sock.send_pyobj((task_id, RAISE, error))
+ raise
+ if isinstance(value, types.GeneratorType):
+ # iterate the generator
+ for item in value:
+ sock and sock.send_pyobj((task_id, YIELD, item))
+ sock and sock.send_pyobj((task_id, BREAK, None))
+ else:
+ sock and sock.send_pyobj((task_id, RETURN, value))
def _run_direct(self):
sock = self.context.socket(zmq.PULL)
sock.bind(self.addr)
while self.running:
- self.task_received(*sock.recv_pyobj())
+ spawn(self.task_received, *sock.recv_pyobj())
def _run_sub(self):
pass
@@ -140,8 +149,8 @@ def run(self):
sock = self.context.socket(zmq.PULL)
sock.bind(self.addr)
while self.running:
- task_id, behavior, value = sock.recv_pyobj()
- self.tasks[task_id].put(behavior, value)
+ task_id, do, value = sock.recv_pyobj()
+ self.tasks[task_id].put(do, value)
def register_tunnel(self, tunnel):
"""Registers the :class:`Tunnel` object to run and ensures a socket
@@ -181,16 +190,16 @@ def __init__(self, customer, worker, return_task=False):
self._znm_reflect(worker)
def _znm_invoke(self, fn, *args, **kwargs):
- plan = self._znm_worker.plans[fn]
+ plan = self._znm_worker.functions[fn][-1]
task = Task(self)
sock = self._znm_sockets[zmq.PUB if plan.fanout else zmq.PUSH]
sock.send_pyobj((self._znm_customer.addr, task.id, fn, args, kwargs))
if not plan.reply:
return
if not self._znm_customer.running:
spawn(self._znm_customer.run)
- task.ensure()
- return task if self._znm_return_task else task.get()
+ task.prepare()
+ return task if self._znm_return_task else task()
def _znm_reflect(self, worker):
"""Sets methods which follows remote functions with same name."""
@@ -224,25 +233,34 @@ def __init__(self, tunnel, id=None):
self.id = str(uuid.uuid4()) if id is None else id
self.queue = Queue()
- def ensure(self):
+ def prepare(self):
self.customer.register_task(self)
- behavior, value = self.queue.get()
- assert behavior == ACK
+ do, value = self.queue.get()
+ assert do == ACK
- def put(self, behavior, value):
- self.queue.put((behavior, value))
+ def put(self, do, value):
+ self.queue.put((do, value))
- def get(self):
- behavior, value = self.queue.get()
- if behavior == (RETURN, RAISE):
+ def __call__(self):
+ do, value = self.queue.get()
+ if do in (RETURN, RAISE):
self.customer.unregister_task(self)
- if behavior == RETURN:
+ if do == RETURN:
return value
-
- def generate(self):
+ elif do == RAISE:
+ raise value
+ elif do == YIELD:
+ return self.make_generator(value)
+ elif do == BREAK:
+ return iter([])
+
+ def make_generator(self, first_value):
+ yield first_value
while True:
- behavior, value = self.customer.recv(self.id)
- if behavior == YIELD:
+ do, value = self.queue.get()
+ if do == YIELD:
yield value
- elif behavior == RAISE:
+ elif do == RAISE:
raise value
+ elif do == BREAK:
+ break
View
40 zeronimotests.py
@@ -37,6 +37,11 @@ def zmq_bound(addr, socket_type, context=zmq_context):
return True
+def ensure_worker(worker):
+ spawn(worker.run)
+ busywait(lambda: zmq_bound(worker.addr, zmq.PUSH), until=True)
+
+
'''
FD_DIR = os.path.join(os.path.dirname(__file__), '_fds')
def generate_endpoint(protocol, name=None, offset=None):
@@ -69,6 +74,18 @@ def add(self, a, b):
return 'xoxoxoxoxoxo cutie'
return a + b
+ @zeronimo.register
+ def jabberwocky(self):
+ yield 'Twas brillig, and the slithy toves'
+ yield 'Did gyre and gimble in the wabe;'
+ yield 'All mimsy were the borogoves,'
+ yield 'And the mome raths outgrabe.'
+
+ @zeronimo.register(fanout=True)
+ def rycbar123(self):
+ for word in 'Run, you clever boy; and remember.'.split():
+ yield word
+
@pytest.fixture
def worker():
@@ -129,17 +146,32 @@ def run(self):
@green
-def test_direct_worker(worker, customer):
- spawn(worker.run)
- busywait(lambda: zmq_bound(worker.addr, zmq.PUSH), until=True)
+def test_tunnel(worker, customer):
+ ensure_worker(worker)
assert len(customer.tunnels) == 0
with customer.link(worker) as tunnel:
assert len(customer.tunnels) == 1
+ assert len(customer.tunnels) == 0
+ with customer.link(worker) as tunnel1, customer.link(worker) as tunnel2:
+ assert len(customer.tunnels) == 2
+ assert len(customer.tunnels) == 0
+
+
+@green
+def test_direct_returning_worker(worker, customer):
+ ensure_worker(worker)
+ with customer.link(worker) as tunnel:
assert tunnel.add(1, 1) == 'cutie'
assert tunnel.add(2, 2) == 'cutie'
assert tunnel.add(3, 3) == 'cutie'
assert tunnel.add(4, 4) == 'cutie'
assert tunnel.add(5, 5) == 'cutie'
assert tunnel.add(6, 6) == 'xoxoxoxoxoxo cutie'
assert tunnel.add(42, 12) == 54
- assert len(customer.tunnels) == 0
+
+
+@green
+def test_direct_yielding_worker_(worker, customer):
+ ensure_worker(worker)
+ with customer.link(worker) as tunnel:
+ assert len(list(tunnel.jabberwocky())) == 4

0 comments on commit 09a8307

Please sign in to comment.
Something went wrong with that request. Please try again.