Browse files

add docstrings for `send` and `recv`

  • Loading branch information...
1 parent 4cfb40a commit 1e6ab8e042e80b64956bc7e0a9b26ebd25b18f3d @sublee committed Apr 5, 2013
Showing with 26 additions and 4 deletions.
  1. +10 −3 zeronimo.py
  2. +16 −1 zeronimotests.py
View
13 zeronimo.py
@@ -111,12 +111,19 @@ def ensure_list(val):
return list(val) if isinstance(val, (Sequence, Set)) else [val]
-def send(sock, obj, flags=0, topic=''):
+# ZMQ messaging functions
+
+
+def send(sock, obj, flags=0, prefix=''):
+ """Same with :meth:`zmq.Socket.send_pyobj` but can append prefix for
+ filtering subscription.
+ """
msg = pickle.dumps(obj)
- return sock.send(topic + msg, flags)
+ return sock.send(prefix + msg, flags)
def recv(sock, flags=0):
+ """Same with :meth:`zmq.Socket.recv_pyobj`."""
msg = sock.recv(flags)
return pickle.loads(msg)
@@ -386,7 +393,7 @@ def _znm_invoke(self, name, *args, **kwargs):
invocation = Invocation(name, args, kwargs, task.id, customer_addr)
print 'tunnel send %r' % (invocation,)
topic = self._znm_fanout_topic if self._znm_fanout else ''
- send(sock, invocation, topic=topic)
+ send(sock, invocation, prefix=topic)
if not self._znm_wait:
# immediately if workers won't wait
return
View
17 zeronimotests.py
@@ -14,7 +14,7 @@
zmq_context = zmq.Context()
-#gevent.hub.get_hub().print_exception = lambda *a, **k: 'do not print exception'
+gevent.hub.get_hub().print_exception = lambda *a, **k: 'do not print exception'
@decorator
@@ -57,6 +57,10 @@ def start_workers(workers):
class Application(object):
@zeronimo.remote
+ def simple(self):
+ return 'ok'
+
+ @zeronimo.remote
def add(self, a, b):
"""Koreans' mathematical addition."""
if a == b:
@@ -291,3 +295,14 @@ def test_link_to_addrs(customer, worker):
start_workers([worker])
with customer.link([(worker.addrs, worker.fanout_addrs)]) as tunnel:
assert tunnel.add(1, 1) == 'cutie'
+
+
+@green
+def _test_reject(customer, worker1, worker2):
+ start_workers([worker1, worker2])
+ with customer.link([worker1, worker2]) as tunnel:
+ assert len(list(tunnel(fanout=True).simple())) == 2
+ worker2.reject_all()
+ assert len(list(tunnel(fanout=True).simple())) == 1
+ worker2.accept_all()
+ assert len(list(tunnel(fanout=True).simple())) == 2

0 comments on commit 1e6ab8e

Please sign in to comment.