Permalink
Browse files

Extended amqp0-8 spec to include Unbind/Unbind-Ok methods, as well as…

… modified SubscribeClientImpl to unsubscribe by calling the Unbind command
  • Loading branch information...
1 parent 16c39a2 commit 718487dc9fabd00263f33ee1af753f3096859f92 Peter Kieltyka committed Oct 6, 2008
Showing with 78 additions and 29 deletions.
  1. +51 −5 codegen/specs/amqp0-8.xml
  2. +27 −24 src/org/amqp/patterns/impl/SubscribeClientImpl.as
View
@@ -3,14 +3,14 @@
<!--
Copyright Notice
================
-© Copyright JPMorgan Chase Bank & Co., Cisco Systems, Inc., Envoy Technologies Inc.,
-iMatix Corporation, IONA� Technologies, Red Hat, Inc.,
+© Copyright JPMorgan Chase Bank & Co., Cisco Systems, Inc., Envoy Technologies Inc.,
+iMatix Corporation, IONA� Technologies, Red Hat, Inc.,
TWIST Process Innovations, and 29West Inc. 2006. All rights reserved.
License
=======
JPMorgan Chase Bank & Co., Cisco Systems, Inc., Envoy Technologies Inc., iMatix
-Corporation, IONA� Technologies, Red Hat, Inc., TWIST Process Innovations, and
+Corporation, IONA� Technologies, Red Hat, Inc., TWIST Process Innovations, and
29West Inc. (collectively, the "Authors") each hereby grants to you a worldwide,
perpetual, royalty-free, nontransferable, nonexclusive license to
(i) copy, display, and implement the Advanced Messaging Queue Protocol
@@ -1213,6 +1213,7 @@ localised reply text
/ C:BIND S:BIND-OK
/ C:PURGE S:PURGE-OK
/ C:DELETE S:DELETE-OK
+ / C:UNBIND S:UNBIND-OK
</doc>
<chassis name="server" implement="MUST"/>
<chassis name="client" implement="MUST"/>
@@ -1588,7 +1589,52 @@ localised reply text
</doc>
</field>
</method>
- <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <!-- UNBIND - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="unbind" synchronous="1" index="50">
+ Unbind
+ <doc>
+ This methods unbinds a queue
+ </doc>
+ <chassis name="server" implement="MUST"/>
+ <response name="unbind-ok"/>
+ <field name="ticket" domain="access ticket">
+ <doc>The access ticket must be for the access realm that holds the
+ queue.</doc>
+ </field>
+ <field name="queue" domain="queue name">
+ <doc>
+ queue name
+ </doc>
+ </field>
+ <field name="exchange" domain="exchange name">
+ <doc>
+ exchange name
+ </doc>
+ </field>
+ <field name="routingkey" type="shortstr">
+ <doc>
+ routing key
+ </doc>
+ </field>
+ <field name="arguments" type="table">
+ <doc>
+ additional arguments
+ </doc>
+ </field>
+ </method>
+
+ <method name="unbind-ok" synchronous="1" index="51">
+ Unbind-ok
+ <doc>
+ Confirms a queue unbind
+ </doc>
+ <chassis name="client" implement="MUST"/>
+ </method>
+
+ <!-- UNBIND - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
<method name="delete" synchronous="1" index="40">
delete a queue
<doc>
@@ -2425,4 +2471,4 @@ confirm a successful rollback
<chassis name="client" implement="MUST"/>
</method>
</class>
-</amqp>
+</amqp>
@@ -17,7 +17,7 @@
**/
package org.amqp.patterns.impl
{
- import com.ericfeminella.utils.HashMap;
+ import de.polygonal.ds.HashMap;
import flash.events.EventDispatcher;
import flash.utils.ByteArray;
@@ -30,6 +30,7 @@ package org.amqp.patterns.impl
import org.amqp.methods.basic.Consume;
import org.amqp.methods.basic.Deliver;
import org.amqp.methods.queue.Declare;
+ import org.amqp.methods.queue.Unbind;
import org.amqp.patterns.CorrelatedMessageEvent;
import org.amqp.patterns.Dispatcher;
import org.amqp.patterns.SubscribeClient;
@@ -54,7 +55,7 @@ package org.amqp.patterns.impl
return;
}
- topics.insert(key, {callback:callback, consumerTag:null});
+ topics.insert(key, {callback:callback});
if (replyQueue != null) {
dispatch(key, null);
@@ -64,20 +65,24 @@ package org.amqp.patterns.impl
}
public function unsubscribe(key:String):void {
- var cancel:Cancel = new Cancel();
- var topic:* = topics.getValue(key);
- sessionHandler.unregister(topic.consumerTag);
+ var unbind:Unbind = new Unbind();
+ var topic:* = topics.find(key);
+ unbind.exchange = exchange;
+ unbind.queue = replyQueue;
+ unbind.routingkey = key;
+
+ sessionHandler.rpc(new Command(unbind), onUnbindOk);
+
+ dispatcher.removeEventListener(key, topic.callback);
topics.remove(key);
}
public function dispatch(o:*, callback:Function):void {
- var consume:Consume = new Consume();
- consume.queue = replyQueue;
- consume.noack = true;
- consume.consumertag = replyQueue + ":" + o;
- sessionHandler.register(consume, this);
-
bindQueue(exchange, replyQueue, o);
+
+ var topic:* = topics.find(o);
+ topics.insert(o, topic);
+ dispatcher.addEventListener(o, topic.callback);
}
override protected function onChannelOpenOk(event:ProtocolEvent):void {
@@ -91,32 +96,30 @@ package org.amqp.patterns.impl
var queue:Declare = new Declare();
queue.queue = q;
queue.autodelete = true;
- sessionHandler.dispatch(new Command(queue));
+ sessionHandler.rpc(new Command(queue), onQueueDeclareOk);
}
override protected function onQueueDeclareOk(event:ProtocolEvent):void {
replyQueue = getReplyQueue(event);
+
+ var consume:Consume = new Consume();
+ consume.queue = replyQueue;
+ consume.noack = true;
+ consume.consumertag = replyQueue;
+ sessionHandler.register(consume, this);
+
sendBuffer.drain();
}
- public function onConsumeOk(tag:String):void {
- var key:String = tag.split(":")[1];
- var topic:* = topics.getValue(key);
+ public function onConsumeOk(tag:String):void {}
- topic.consumerTag = tag;
- topics.put(key, topic);
+ public function onCancelOk(tag:String):void {}
- dispatcher.addEventListener(key, topic.callback);
- }
-
- public function onCancelOk(tag:String):void {
- //trace("cancelled");
- }
+ public function onUnbindOk(event:ProtocolEvent):void {}
public function onDeliver(method:Deliver,
properties:BasicProperties,
body:ByteArray):void {
-
trace("onDeliver");
var result:* = serializer.deserialize(body);

0 comments on commit 718487d

Please sign in to comment.