Permalink
Browse files

initial pass at demo controller and client, plus a few minor interop …

…fixes
  • Loading branch information...
1 parent 2df53d5 commit ab2da8c5fdf2ecb5822e16cd2200d51fb41876a8 @rhs committed Sep 28, 2011
Showing with 433 additions and 9 deletions.
  1. +1 −1 broker
  2. +1 −1 brokerlib.py
  3. +44 −0 demo/README
  4. +4 −0 demo/broker
  5. +97 −0 demo/client
  6. +92 −0 demo/common.py
  7. +65 −0 demo/controller
  8. +83 −0 demo/demo
  9. +12 −0 demo/demo-nodes
  10. +2 −0 demo/demo-users
  11. +23 −0 demo/visual.py
  12. +3 −2 recv
  13. +2 −2 sasl.py
  14. +4 −3 window.py
View
2 broker
@@ -78,6 +78,7 @@ try:
broker.passwords = passwords
broker.traces = opts.trace.split()
+ window = Window(lambda *args: selector.stop())
nodes = {}
for n in opts.nodes:
exec open(n) in globals(), nodes
@@ -96,7 +97,6 @@ try:
broker.nodes[a] = Queue(thresholds.get(a))
selector = Selector()
- window = Window(lambda *args: selector.stop())
broker.listener = window.redraw
broker.bind(opts.interface, opts.port)
selector.register(broker)
View
@@ -413,7 +413,7 @@ def process_receiver(self, link, connection):
if l.resumed:
link.settle(t, None)
elif r.settled and not isinstance(l.state, TransactionalState):
- state = target.settle(t, r.state)
+ state = target.settle(t, l.state)
link.settle(t, state)
if target.capacity() and link.credit() < 10: link.flow(10)
View
@@ -0,0 +1,44 @@
+The scripts in this directory require the python amqp implementation
+(the parent of the directory containing this README) to be in the
+PYTHONPATH. This can be done with:
+
+ export PYTHONPATH=<path_to_amqp>
+
+Note that this file is found at <path_to_amqp>/demo/README.
+
+The demo directory contains the following scripts:
+
+ broker -- a simple shell script to start the control broker
+ controller -- the demo controller application
+ client -- the demo client application
+ demo -- a control script used to trigger actions
+
+To use the demo take the following steps:
+
+ 1. Start the control broker, e.g.:
+
+ ./broker -t frm
+
+ 2. Start the controller application, e.g.:
+
+ ./controller
+
+ 3. Start as many client applications as you like. These should
+ announce their presence to the controller via the control queue.
+ The python client application can be started thusly:
+
+ ./client
+
+ 4. Use the demo app to create links and send messages:
+
+ ./demo create-link node1 ref=blah role=sender host=0.0.0.0 port=5672 user=demo password=demo
+ ./demo send-message blah m1
+ ./demo send-message blah m2
+ ./demo send-message blah m3
+ ./demo create-link node1 ref=bleh role=receiver host=0.0.0.0 port=5672 user=demo password=demo
+
+The controller and client apps will print useful info on stdout when
+interesting stuff happen. You can access the usage for any of the
+scripts by using the -h option, e.g.:
+
+ ./controller -h
View
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+export PYTHONPATH=$PWD/..
+exec ../broker -a -u demo-users -n demo-nodes "$@"
View
@@ -0,0 +1,97 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from threading import Thread
+from common import *
+
+class Link:
+
+ def __init__(self, client, conn, link):
+ self.client = client
+ self.conn = conn
+ self.link = link
+
+ def send(self, msg):
+ test = Message(content=msg.content, message_id=msg["message-id"])
+ test["opcode"] = "test"
+ test["vendor"] = "redhat"
+ self.link.send(settled=True, message=test)
+
+ log = Message()
+ log["opcode"] = "log"
+ log["action"] = "sent"
+ log["message-id"] = msg["message-id"]
+ log["vendor"] = test["vendor"]
+ self.client.ctl.send(settled=True, message=log)
+
+ def test(self, msg):
+ print "TEST:", msg
+
+ log = Message()
+ log["opcode"] = "log"
+ log["action"] = "received"
+ log["message-id"] = msg.properties.message_id
+ self.client.ctl.send(settled=True, message=log)
+
+ return ACCEPTED
+
+class Client:
+
+ def __init__(self, ssn, lnk):
+ self.ssn = ssn
+ self.lnk = lnk
+ self.links = {}
+ self.ctl = self.ssn.sender("control")
+ msg = Message()
+ msg["opcode"] = "announce"
+ msg["vendor"] = "redhat"
+ msg["address"] = self.lnk.address
+ self.ctl.send(msg, settled=True)
+
+ def create_link(self, msg):
+ conn = Connection(auth=True)
+# conn.tracing(*self.tracing)
+ conn.connect(host=msg["host"], port=int(msg["port"]))
+ conn.open(mechanism="PLAIN", username=msg["sasl-user"],
+ password=msg["sasl-password"])
+ ssn = conn.session()
+ address = msg["address"]
+ if msg["role"] == "sender":
+ snd = Link(self, conn, ssn.sender(address))
+ self.links[msg["link-ref"]] = snd
+ else:
+ rcv = Link(self, conn, ssn.receiver(address))
+ def run():
+ loop(rcv.link, rcv)
+ t = Thread(target=run)
+ t.setDaemon(True)
+ t.start()
+
+ return ACCEPTED
+
+ def send_message(self, msg):
+ ref = msg["link-ref"]
+ if ref not in self.links:
+ raise ValueError("unknown link reference: %r" % msg)
+ link = self.links[ref]
+ link.send(msg)
+ return ACCEPTED
+
+main(Client, Source(dynamic=True))
View
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, os, traceback
+from client import *
+
+def loop(link, obj):
+ link.flow(100)
+ while link.pending(block=True):
+ msg = link.get()
+ link.disposition(msg.delivery_tag, dispatch(msg, obj))
+ if link.credit() < 50: link.flow(100)
+ settle(link)
+
+ settle(link)
+
+ for tag, local, remote in self.lnk.get_unsettled():
+ print "UNSETTLED:", tag, local, remote
+
+def settle(link):
+ for tag, _, _ in link.get_remote(settled=True):
+ link.settle(tag)
+
+def dispatch(msg, obj):
+ try:
+ opcode = msg["opcode"].replace("-", "_")
+ except KeyError:
+ print "malformed message:", msg
+ return ACCEPTED
+ else:
+ try:
+ return getattr(obj, opcode, getattr(obj, "unknown", unknown))(msg)
+ except:
+ print "error processing message:", msg
+ traceback.print_exc()
+ return ACCEPTED
+
+def unknown(msg):
+ print "unknown opcode:", msg
+ return ACCEPTED
+
+def options():
+ parser = optparse.OptionParser(usage="usage: %prog [options] <address>",
+ description="receive messages")
+ parser.add_option("-H", "--host",
+ help="host to connect to (default 0.0.0.0)")
+ parser.add_option("-p", "--port", type=int, default=5672,
+ help="port to connect to (default %default)")
+ parser.add_option("-u", "--username", default="demo",
+ help="username to use for authentication")
+ parser.add_option("-w", "--password", default="demo",
+ help="password to use for authentication")
+ parser.add_option("-t", "--trace", default="err",
+ help="enable tracing for specified categories")
+
+ return parser.parse_args()
+
+def main(cls, source):
+ opts, args = options()
+
+ if args:
+ parser.error("unrecognized arguments")
+
+ conn = Connection(auth=True)
+ conn.tracing(*opts.trace.split())
+ conn.connect(opts.host or os.getenv('AMQP_BROKER') or "0.0.0.0", opts.port)
+ conn.open(mechanism="PLAIN", username=opts.username, password=opts.password)
+ ssn = conn.session()
+ lnk = ssn.receiver(source, limit=100)
+ try:
+ loop(lnk, cls(ssn, lnk))
+ except KeyboardInterrupt:
+ pass
+ lnk.close()
+ ssn.close()
+ conn.close()
View
@@ -0,0 +1,65 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from common import *
+
+class Client:
+
+ def __init__(self, vendor, address, link):
+ self.vendor = vendor
+ self.address = address
+ self.link = link
+
+ def send(self, msg):
+ self.link.send(settled=True, message=msg)
+
+ def __repr__(self):
+ return "Client(%r, %r)" % (self.vendor, self.address)
+
+class Controller:
+
+ def __init__(self, ssn, lnk):
+ self.ssn = ssn
+ self.clients = []
+
+ def relay(self, msg):
+ for c in self.clients:
+ c.send(msg)
+
+ def announce(self, msg):
+ vendor = msg["vendor"]
+ address = msg["address"]
+ link = self.ssn.sender(address)
+ c = Client(vendor, address, link)
+ self.clients.append(c)
+ print "The %s client is listening on: %s" % (vendor, address)
+ return ACCEPTED
+
+ def create_link(self, msg):
+ self.relay(msg)
+
+ def send_message(self, msg):
+ self.relay(msg)
+
+ def log(self, msg):
+ print "LOG:", msg
+ return ACCEPTED
+
+main(Controller, "control")
Oops, something went wrong.

0 comments on commit ab2da8c

Please sign in to comment.