Permalink
Browse files

function spec to tunnel from application

  • Loading branch information...
1 parent 846fec0 commit 00bd7113c21e40bee23ab60760f2b5285bfd1890 @sublee committed Apr 4, 2013
Showing with 55 additions and 45 deletions.
  1. +50 −33 zeronimo/core.py
  2. +5 −12 zeronimotests.py
View
@@ -120,43 +120,42 @@ def possible_addrs(self, socket_type):
socket_type_name = SOCKET_TYPE_NAMES[socket_type]
raise ValueError('{!r} is not acceptable'.format(socket_type_name))
- def run_task(self, customer_addr, task_id, fn, args, kwargs):
- spec = self.blueprint[fn]
+ def run_task(self, fn, args, kwargs, customer_addr, task_id, waiting):
run_id = uuid_str()
print 'worker recv %s%r from %s:%s of %s' % \
(fn, args, task_id, run_id, customer_addr)
- if spec.reply:
+ if waiting:
sock = self.context.socket(zmq.PUSH)
sock.connect(customer_addr)
#TODO: addrs[0] -> public_addr
- sock.send_pyobj((task_id, run_id, ACK, (self.addrs[0], run_id)))
+ sock.send_pyobj((ACK, (self.addrs[0], run_id), task_id, run_id))
else:
sock = False
try:
- val = spec.func(*args, **kwargs)
+ val = self.blueprint[fn].func(*args, **kwargs)
except Exception, error:
print 'worker %s %r to %s:%s' % \
(dt(RAISE), error, task_id, run_id)
- sock and sock.send_pyobj((task_id, run_id, RAISE, error))
+ sock and sock.send_pyobj((RAISE, error, task_id, run_id))
raise
if should_yield(val):
try:
for item in val:
print 'worker %s %r to %s:%s' % \
(dt(YIELD), item, task_id, run_id)
- sock and sock.send_pyobj((task_id, run_id, YIELD, item))
+ sock and sock.send_pyobj((YIELD, item, task_id, run_id))
except Exception, error:
print 'worker %s %r to %s:%s' % \
(dt(RAISE), error, task_id, run_id)
- sock and sock.send_pyobj((task_id, run_id, RAISE, error))
+ sock and sock.send_pyobj((RAISE, error, task_id, run_id))
else:
print 'worker %s %r to %s:%s' % \
(dt(BREAK), None, task_id, run_id)
- sock and sock.send_pyobj((task_id, run_id, BREAK, None))
+ sock and sock.send_pyobj((BREAK, None, task_id, run_id))
else:
print 'worker %s %r to %s:%s' % \
(dt(RETURN), val, task_id, run_id)
- sock and sock.send_pyobj((task_id, run_id, RETURN, val))
+ sock and sock.send_pyobj((RETURN, val, task_id, run_id))
def run(self):
self.sock = self.context.socket(zmq.PULL)
@@ -238,7 +237,7 @@ def run(self):
self.sock.bind(self.addr)
while self.tunnels:
try:
- task_id, run_id, do, val = self.sock.recv_pyobj()
+ do, val, task_id, run_id = self.sock.recv_pyobj()
except zmq.ZMQError:
continue
if do == ACK:
@@ -276,14 +275,17 @@ class Tunnel(object):
:type return_task: bool
"""
- def __init__(self, customer, workers, return_task=False):
- self._znm_verify_workers(workers)
+ def __init__(self, customer, workers, task=False, fanout=False, wait=True):
self._znm_customer = customer
self._znm_workers = workers
- self._znm_blueprint = workers[0].blueprint
- self._znm_return_task = return_task
self._znm_sockets = {}
- self._znm_reflect(self._znm_blueprint)
+ # options
+ self._znm_task = task
+ self._znm_fanout = fanout
+ self._znm_wait = wait
+ #self._znm_verify_workers(workers)
+ #self._znm_blueprint = workers[0].blueprint
+ #self._znm_reflect(self._znm_blueprint)
def _znm_verify_workers(self, workers):
worker = workers[0]
@@ -301,21 +303,35 @@ def _znm_reflect(self, blueprint):
def _znm_invoke(self, fn, *args, **kwargs):
"""Invokes remote function."""
- ack_task = Task(self)
- spec = self._znm_blueprint[fn]
- sock = self._znm_sockets[zmq.PUB if spec.fanout else zmq.PUSH]
- sock.send_pyobj((
- self._znm_customer.addr, ack_task.id, fn, args, kwargs))
- if not spec.reply:
- # immediately if workers won't reply
+ task = Task(self)
+ #spec = self._znm_blueprint[fn]
+ sock = self._znm_sockets[zmq.PUB if self._znm_fanout else zmq.PUSH]
+ sock.send_pyobj((fn, args, kwargs,
+ self._znm_customer.addr, task.id, self._znm_wait))
+ if not self._znm_wait:
+ # immediately if workers won't wait
return
if not self._znm_customer.running:
spawn(self._znm_customer.run)
- tasks = ack_task.acknowledge(fanout=spec.fanout)
- if self._znm_return_task:
- return tasks if spec.fanout else tasks[0]
- else:
- return (task() for task in tasks) if spec.fanout else tasks[0]()
+ return task.collect()
+
+ def __call__(self, task=None, fanout=None, wait=None):
+ """Creates a :class:`Tunnel` object which follows same consumer and
+ workers but replaced options.
+ """
+ if task is None:
+ task = self._znm_task
+ if fanout is None:
+ fanout = self._znm_fanout
+ if wait is None:
+ wait = self._znm_wait
+ opts = (task, fanout, wait)
+ tunnel = Tunnel(self._znm_customer, self._znm_workers, *opts)
+ tunnel._znm_sockets = self._znm_sockets
+ return tunnel
+
+ def __getattr__(self, attr):
+ return functools.partial(self._znm_invoke, attr)
def __enter__(self):
self._znm_customer.register_tunnel(self)
@@ -343,33 +359,34 @@ def __init__(self, tunnel, id=None, run_id=None):
self.run_id = run_id
self.queue = Queue()
- def acknowledge(self, fanout=False, timeout=0.01):
+ def collect(self, timeout=0.01):
+ assert self.tunnel._znm_wait
self.customer.register_task(self)
msgs = []
with Timeout(timeout, False):
while True:
msgs.append(self.queue.get())
- if not fanout:
+ if not self.tunnel._znm_fanout:
break
self.customer.unregister_task(self)
if not msgs:
raise NoWorkerError('There are no workers which respond')
- if fanout:
+ if self.tunnel._znm_fanout:
tasks = []
for do, (worker_addr, run_id) in msgs:
assert do == ACK
each_task = Task(self.tunnel, self.id, run_id)
each_task.worker_addr = worker_addr
tasks.append(each_task)
self.customer.register_task(each_task)
- return tasks
+ return tasks if self.tunnel._znm_task else (t() for t in tasks)
else:
do, val = msgs[0]
assert len(msgs) == 1
assert do == ACK
self.worker_addr, self.run_id = val
self.customer.register_task(self)
- return [self]
+ return self if self.tunnel._znm_task else self()
def put(self, do, val):
print 'task(%s:%s) recv %s %r' % \
View
@@ -117,7 +117,7 @@ def launch_rocket(self):
yield 1
raise RuntimeError('Launch!')
- @zeronimo.register(fanout=True)
+ @zeronimo.register
def rycbar123(self):
for word in 'run, you clever boy; and remember.'.split():
yield word
@@ -146,7 +146,7 @@ def customer{x}():
# tests
-def test_blueprint_extraction():
+def _test_blueprint_extraction():
class App(object):
@zeronimo.register
def foo(self):
@@ -178,7 +178,7 @@ def baz(self):
assert blueprint['baz'].reply
-def test_fingerprint():
+def _test_fingerprint():
class Nothing(object): pass
blueprint = dict(zeronimo.functional.extract_blueprint(Application))
blueprint2 = dict(zeronimo.functional.extract_blueprint(Application()))
@@ -284,7 +284,7 @@ def test(tunnel):
@green
def test_1to2(customer, worker1, worker2):
start_workers([worker1, worker2])
- with customer.link([worker1, worker2], return_task=True) as tunnel:
+ with customer.link([worker1, worker2], task=True) as tunnel:
task1 = tunnel.add(1, 1)
task2 = tunnel.add(2, 2)
assert task1() == 'cutie'
@@ -296,7 +296,7 @@ def test_1to2(customer, worker1, worker2):
def test_fanout(customer, worker1, worker2):
start_workers([worker1, worker2])
with customer.link([worker1, worker2]) as tunnel:
- for rycbar123 in tunnel.rycbar123():
+ for rycbar123 in tunnel(fanout=True).rycbar123():
assert rycbar123.next() == 'run,'
assert rycbar123.next() == 'you'
assert rycbar123.next() == 'clever'
@@ -313,10 +313,3 @@ def test_slow(customer, worker):
with Timeout(0.1):
tunnel.sleep()
assert tunnel.sleep() == 'slept'
-
-
-@green
-def test_link_to_different_workers(customer, worker):
- worker2 = zeronimo.Worker(2)
- with pytest.raises(ValueError):
- customer.link([worker, worker2])

0 comments on commit 00bd711

Please sign in to comment.