Browse files

batch viewing

  • Loading branch information...
1 parent abdfb2e commit db3e60bc40b6a1d851027bf4a74862c232e7366f @rhs committed Oct 11, 2011
Showing with 80 additions and 25 deletions.
  1. +15 −1 demo/common.py
  2. BIN demo/images/message.png
  3. +10 −3 demo/permute
  4. +49 −21 demo/viewer
  5. +6 −0 messaging.py
View
16 demo/common.py
@@ -20,13 +20,27 @@
import optparse, os, traceback
from client import *
-def loop(link, obj):
+def loop(link, obj, batch=False, timeout=3):
link.flow(100)
while link.pending(block=True):
msg = link.get()
link.disposition(msg.delivery_tag, dispatch(msg, obj))
if link.credit() < 50: link.flow(100)
settle(link)
+ if batch:
+ if timeout:
+ try:
+ more = link.pending(block=True, timeout=timeout)
+ except Timeout:
+ more = False
+ else:
+ more = link.pending()
+ if not more:
+ try:
+ obj.process()
+ except:
+ print "error processing"
+ traceback.print_exc()
settle(link)
View
BIN demo/images/message.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
13 demo/permute
@@ -66,25 +66,32 @@ def reset_all():
def setup(clients, brokers):
for b in brokers:
bh, bp, bu, bw = BRADDRS[b]
+ bp = str(bp)
for c in clients:
- send(create_link(node(c, b), "receiver", "%s" % c, bh, bp, bu, bw, target=c))
+ send(create_link(node(c, b), "receiver", "%s-via-%s" % (c, b), bh, bp, bu, bw, target=c))
for cc in clients:
- send(create_link(node(c, b), "sender", "%s-to-%s" % (cc, c), bh, bp, bu, bw, target=cc))
+ send(create_link(node(c, b), "sender", "%s-to-%s-via-%s" % (cc, c, b), bh, bp, bu, bw, target=cc))
def permute(start, stop, via, count=1):
for c1 in start:
for c2 in stop:
for b in via:
for i in range(count):
- send(send_message("%s-to-%s" % (c1, c2), "%s:%s" % (b, counter()), target=c1))
+ send(send_message("%s-to-%s-via-%s" % (c1, c2, b), "%s:%s:%s" % (c1, b, counter()), target=c1))
def self_ping(clients, brokers):
for c in clients:
permute([c], [c], brokers)
+def proprietary(vendors):
+ for v in vendors:
+ permute([v], [v], [v])
+
ACTIONS = {
"reset": reset_all,
"setup": lambda: setup(CLIENTS, BROKERS),
+ "proprietary": lambda: proprietary(BROKERS),
+ "interop": lambda: (permute([RH], [MS], [VM]), permute([MS], [RH], [VM])),
"all": lambda: permute(CLIENTS, CLIENTS, BROKERS),
"self-ping": self_ping
}
View
70 demo/viewer
@@ -30,6 +30,7 @@ for v in VENDORS:
LOGOS[v] = ImageSurface.create_from_png("images/broker-%s.png" % v)
for v in CLIENTS:
MESSAGES[v] = ImageSurface.create_from_png("images/message-%s.png" % v)
+MESSAGE = ImageSurface.create_from_png("images/message.png")
SCREEN_H = 1.0
SCREEN_W = 16.0/9.0
@@ -52,7 +53,7 @@ class Ball:
cr.translate(self.x, self.y)
cr.rotate(self.angle)
cr.translate(-scale/2, scale/2)
- ball = MESSAGES[self.vendor]
+ ball = MESSAGES.get(self.vendor, MESSAGE)
cr.scale(scale/ball.get_width(), -scale/ball.get_height())
cr.set_source_surface(ball)
cr.paint_with_alpha(self.alpha)
@@ -159,7 +160,7 @@ CLIS = {
parser = options()
parser.add_option("-s", "--script", help="drive graphics from script")
-parser.add_option("-l", "--listen", default=False,
+parser.add_option("-l", "--listen", action="store_true",
help="listen after running script")
parser.add_option("-f", "--fullscreen", action="store_true",
help="run in fullscreen mode")
@@ -229,8 +230,10 @@ def go(paths):
for i in range(fade + 1):
p.ball.alpha = i/float(fade)
window.redraw()
- elif p.start.broker:
- p.ball.vendor = p.start.dequeue()
+ elif p.start.broker and p.start.messages:
+ vnd = p.start.dequeue()
+ if p.ball.vendor == "none":
+ p.ball.vendor = vnd
# travel
for i in range(steps+1):
@@ -268,37 +271,62 @@ def script(fname):
else:
start, stop = line.split()
s = lookup(start)
- paths.append(Path(Ball(s.vendor), lookup(start), lookup(stop)))
+ if start[0] == "C":
+ org = s.vendor
+ else:
+ org = "none"
+ paths.append(Path(Ball(org), lookup(start), lookup(stop)))
time.sleep(1)
class Listener:
+ def __init__(self):
+ self.messages = []
+ self.sent = set()
+
def log(self, msg):
print "LOG:", msg
- action = msg["action"]
- vendor = msg["vendor"]
- cli = CLIS[vendor]
- brkv, num = msg["message-id"].split(":")
- brk = BRKS[brkv]
- if action == "sent":
- start = cli
- stop = brk
- else:
- start = brk
- stop = cli
- paths = [Path(Ball(start.vendor), start, stop)]
- go(paths)
+ if msg.get("message-id", "").count(":") == 2:
+ self.messages.append(msg)
+
+ def process(self):
+ paths = []
+ for msg in list(self.messages):
+ action = msg["action"]
+ vendor = msg["vendor"]
+ cli = CLIS[vendor]
+ mid = msg["message-id"]
+ orgv, brkv, num = mid.split(":")
+ brk = BRKS[brkv]
+ if action == "sent":
+ self.sent.add(mid)
+ self.messages.remove(msg)
+ paths.append(Path(Ball(orgv), cli, brk))
+ if paths: go(paths)
+ paths = []
+ for msg in list(self.messages):
+ action = msg["action"]
+ vendor = msg["vendor"]
+ cli = CLIS[vendor]
+ mid = msg["message-id"]
+ orgv, brkv, num = mid.split(":")
+ brk = BRKS[brkv]
+ if action == "received":
+ if mid in self.sent:
+ paths.append(Path(Ball(orgv), brk, cli))
+ self.messages.remove(msg)
+ if paths: go(paths)
if opts.script:
script(opts.script)
if opts.listen:
- conn = open_conn(args)
+ conn = open_conn(opts)
ssn = conn.session()
- lnk = ssn.receiver(source, limit=100)
+ lnk = ssn.receiver("log", limit=100)
try:
- loop(lnk, Listener())
+ loop(lnk, Listener(), True, timeout=1)
except KeyboardInterrupt:
pass
lnk.close()
View
6 messaging.py
@@ -47,6 +47,12 @@ def __init__(self, content=None, delivery_tag=None, **kwargs):
if hasattr(o, k):
setattr(o, k, v)
+ def get(self, key, default=None):
+ if self.application_properties:
+ return self.application_properties.get(key, default)
+ else:
+ return default
+
def __getitem__(self, key):
if self.application_properties:
return self.application_properties[key]

0 comments on commit db3e60b

Please sign in to comment.