Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

implement raise

  • Loading branch information...
commit accf35e66feb918648b7b327d79dd7ddb9e40ac0 1 parent 09a8307
@sublee authored
View
6 zeronimo/__init__.py
@@ -10,10 +10,10 @@
"""
from __future__ import absolute_import
-from .collect import register, collect_remote_functions
from .core import Worker, Customer, Tunnel, Task
+from .functional import register, collect_remote_functions
__version__ = '0.0.dev'
-__all__ = ['register', 'collect_remote_functions',
- 'Worker', 'Customer', 'Tunnel', 'Task']
+__all__ = ['Worker', 'Customer', 'Tunnel', 'Task',
+ 'register', 'collect_remote_functions']
View
62 zeronimo/core.py
@@ -9,8 +9,8 @@
from __future__ import absolute_import
from contextlib import contextmanager, nested
import functools
-import types
-import uuid
+from types import MethodType
+from uuid import uuid4
from gevent import joinall, spawn
from gevent.coros import Semaphore
@@ -18,7 +18,7 @@
from gevent.queue import Queue
import zmq.green as zmq
-from .collect import collect_remote_functions
+from .functional import collect_remote_functions, should_yield
# task message behaviors
@@ -54,7 +54,7 @@ def run(self):
finally:
obj.running -= 1
assert obj.running >= 0
- obj.run, obj._actual_run = types.MethodType(run, obj), obj.run
+ obj.run, obj._actual_run = MethodType(run, obj), obj.run
return obj
def __init__(self, context=None):
@@ -74,7 +74,7 @@ class Worker(Communicator):
def __init__(self, obj, addr=None, **kwargs):
if addr is None:
- addr = 'inproc://{0}'.format(str(uuid.uuid4()))
+ addr = 'inproc://{0}'.format(str(uuid4()))
self.addr = addr
self.functions = {func.__name__: (func, plan)
for func, plan in collect_remote_functions(obj)}
@@ -104,17 +104,20 @@ def task_received(self, customer_addr, task_id, fn, args, kwargs):
else:
sock = False
try:
- value = func(*args, **kwargs)
+ val = func(*args, **kwargs)
except Exception, error:
sock and sock.send_pyobj((task_id, RAISE, error))
raise
- if isinstance(value, types.GeneratorType):
- # iterate the generator
- for item in value:
- sock and sock.send_pyobj((task_id, YIELD, item))
- sock and sock.send_pyobj((task_id, BREAK, None))
+ if should_yield(val):
+ try:
+ for item in val:
+ sock and sock.send_pyobj((task_id, YIELD, item))
+ except Exception, error:
+ sock and sock.send_pyobj((task_id, RAISE, error))
+ else:
+ sock and sock.send_pyobj((task_id, BREAK, None))
else:
- sock and sock.send_pyobj((task_id, RETURN, value))
+ sock and sock.send_pyobj((task_id, RETURN, val))
def _run_direct(self):
sock = self.context.socket(zmq.PULL)
@@ -132,7 +135,7 @@ class Customer(Communicator):
def __init__(self, addr=None, **kwargs):
if addr is None:
- addr = 'inproc://{0}'.format(str(uuid.uuid4()))
+ addr = 'inproc://{0}'.format(str(uuid4()))
self.addr = addr
self.tunnels = set()
self.lock = Semaphore()
@@ -149,8 +152,8 @@ def run(self):
sock = self.context.socket(zmq.PULL)
sock.bind(self.addr)
while self.running:
- task_id, do, value = sock.recv_pyobj()
- self.tasks[task_id].put(do, value)
+ task_id, do, val = sock.recv_pyobj()
+ self.tasks[task_id].put(do, val)
def register_tunnel(self, tunnel):
"""Registers the :class:`Tunnel` object to run and ensures a socket
@@ -206,8 +209,7 @@ def _znm_reflect(self, worker):
for fn in worker.functions:
if hasattr(self, fn):
raise AttributeError('{!r} is already used'.format(fn))
- invoke = functools.partial(self._znm_invoke, fn)
- setattr(self, fn, invoke)
+ setattr(self, fn, functools.partial(self._znm_invoke, fn))
def __enter__(self):
self._znm_customer.register_tunnel(self)
@@ -230,37 +232,37 @@ class Task(object):
def __init__(self, tunnel, id=None):
self.tunnel = tunnel
self.customer = tunnel._znm_customer
- self.id = str(uuid.uuid4()) if id is None else id
+ self.id = str(uuid4()) if id is None else id
self.queue = Queue()
def prepare(self):
self.customer.register_task(self)
- do, value = self.queue.get()
+ do, val = self.queue.get()
assert do == ACK
- def put(self, do, value):
- self.queue.put((do, value))
+ def put(self, do, val):
+ self.queue.put((do, val))
def __call__(self):
- do, value = self.queue.get()
+ do, val= self.queue.get()
if do in (RETURN, RAISE):
self.customer.unregister_task(self)
if do == RETURN:
- return value
+ return val
elif do == RAISE:
- raise value
+ raise val
elif do == YIELD:
- return self.make_generator(value)
+ return self.make_generator(val)
elif do == BREAK:
return iter([])
- def make_generator(self, first_value):
- yield first_value
+ def make_generator(self, first_val):
+ yield first_val
while True:
- do, value = self.queue.get()
+ do, val = self.queue.get()
if do == YIELD:
- yield value
+ yield val
elif do == RAISE:
- raise value
+ raise val
elif do == BREAK:
break
View
13 zeronimo/collect.py → zeronimo/functional.py
@@ -1,14 +1,14 @@
# -*- coding: utf-8 -*-
"""
- zeronimo.collect
- ~~~~~~~~~~~~~~~~
+ zeronimo.functional
+ ~~~~~~~~~~~~~~~~~~~
- The functions for collecting remote functions or methods.
+ Provides higher-order functions.
:copyright: (c) 2013 by Heungsub Lee
:license: BSD, see LICENSE for more details.
"""
-from collections import namedtuple
+from collections import Iterable, Sequence, Set, Mapping, namedtuple
Plan = namedtuple('Plan', ['fanout', 'reply'])
@@ -34,3 +34,8 @@ def collect_remote_functions(obj):
except AttributeError:
continue
yield func, plan
+
+
+def should_yield(val):
+ return (isinstance(val, Iterable) and
+ not isinstance(val, (Sequence, Set, Mapping)))
View
49 zeronimotests.py
@@ -6,6 +6,7 @@
import gevent
from gevent import joinall, killall, spawn
import pytest
+from pytest import raises
import zmq.green as zmq
import zeronimo
@@ -81,6 +82,35 @@ def jabberwocky(self):
yield 'All mimsy were the borogoves,'
yield 'And the mome raths outgrabe.'
+ @zeronimo.register
+ def xrange(self):
+ return xrange(5)
+
+ @zeronimo.register
+ def dict_view(self):
+ return dict(zip(xrange(5), xrange(5))).viewkeys()
+
+ @zeronimo.register
+ def dont_yield(self):
+ if False:
+ yield 'it should\'t be sent'
+ assert 0
+
+ @zeronimo.register
+ def divide_by_zero(self):
+ 0/0 /0/0 /0/0 /0/0 /0/0/0/0 /0/0
+ 0/0 /0 /0/0 /0/0 /0/0 /0/0 /0/0 /0
+ 0/0/0/0/0/0/0 /0/0/0/0/0/0 /0/0 /0/0 /0
+ 0/0/0/0/0/0/0 /0/0 /0/0 /0/0 /0/0
+ 0/0 /0 /0/0 /0/0 /0/0 /0/0/0/0 /0
+
+ @zeronimo.register
+ def launch_rocket(self):
+ yield 3
+ yield 2
+ yield 1
+ raise RuntimeError('Launch!')
+
@zeronimo.register(fanout=True)
def rycbar123(self):
for word in 'Run, you clever boy; and remember.'.split():
@@ -171,7 +201,24 @@ def test_direct_returning_worker(worker, customer):
@green
-def test_direct_yielding_worker_(worker, customer):
+def test_direct_yielding_worker(worker, customer):
ensure_worker(worker)
with customer.link(worker) as tunnel:
assert len(list(tunnel.jabberwocky())) == 4
+ assert list(tunnel.xrange()) == [0, 1, 2, 3, 4]
+ assert list(tunnel.dict_view()) == [0, 1, 2, 3, 4]
+ assert list(tunnel.dont_yield()) == []
+
+
+@green
+def test_direct_raising_worker(worker, customer):
+ ensure_worker(worker)
+ with customer.link(worker) as tunnel:
+ with raises(ZeroDivisionError):
+ tunnel.divide_by_zero()
+ rocket_launching = tunnel.launch_rocket()
+ assert rocket_launching.next() == 3
+ assert rocket_launching.next() == 2
+ assert rocket_launching.next() == 1
+ with raises(RuntimeError):
+ rocket_launching.next()
Please sign in to comment.
Something went wrong with that request. Please try again.