Permalink
Browse files

Reworked SubscribeClientImpl so it creates a consumer for each subscr…

…iption
  • Loading branch information...
1 parent 0557ca6 commit ecfa04f09d9d374baf3a2581bcbcd5f3bf91659e Peter Kieltyka committed Aug 26, 2008
View
@@ -1,3 +1,4 @@
+.DS_Store
.actionScriptProperties
.flexLibProperties
.project
View
Binary file not shown.
@@ -18,15 +18,17 @@
package org.amqp.patterns.impl
{
import com.ericfeminella.utils.HashMap;
-
+
import flash.events.EventDispatcher;
import flash.utils.ByteArray;
-
+
import org.amqp.BasicConsumer;
import org.amqp.Command;
import org.amqp.Connection;
import org.amqp.ProtocolEvent;
import org.amqp.headers.BasicProperties;
+ import org.amqp.methods.basic.Cancel;
+ import org.amqp.methods.basic.CancelOk;
import org.amqp.methods.basic.Consume;
import org.amqp.methods.basic.Deliver;
import org.amqp.methods.queue.Declare;
@@ -36,8 +38,6 @@ package org.amqp.patterns.impl
public class SubscribeClientImpl extends AbstractDelegate implements SubscribeClient, BasicConsumer, Dispatcher
{
- private const CONSUME_HANDLER:Function = consumeHandler;
-
private var replyQueue:String = null;
private var topics:HashMap = new HashMap();
private var dispatcher:EventDispatcher = new EventDispatcher();
@@ -53,36 +53,39 @@ package org.amqp.patterns.impl
return;
}
- topics.put(key, callback);
+ topics.put(key, {callback:callback, consumerTag:null});
if (replyQueue != null) {
- dispatch(key, CONSUME_HANDLER);
+ dispatch(key, null);
}else {
- sendBuffer.buffer(key, CONSUME_HANDLER);
+ sendBuffer.buffer(key, null);
}
}
public function unsubscribe(key:String):void {
+ var cancel:Cancel = new Cancel();
+ var topic:* = topics.getValue(key);
+
+ cancel.consumertag = topic.consumerTag;
+ sessionHandler.dispatch(new Command(cancel));
@pkieltyka

pkieltyka Aug 28, 2008

Owner

As of this commit, there is a single queue per connection with a consumer registered for each topic. This is a much better design, however when calling Basic.Cancel for a particular consumerTag, the consumer is not removed and messages for a consumer continue to be delivered to onDeliver. Also, onCancelOk is returned with the proper tag. Perhaps a bug in the rabbitmq server?

+ sessionHandler.addEventListener(new org.amqp.methods.basic.CancelOk, onCancelOk);
+
+ dispatcher.removeEventListener(key, topic.callback);
topics.remove(key);
- dispatcher.removeEventListener(key, CONSUME_HANDLER);
}
public function dispatch(o:*, callback:Function):void {
+ var consume:Consume = new Consume();
+ consume.queue = replyQueue;
+ consume.noack = true;
+ consume.consumertag = replyQueue + ":" + o;
+ sessionHandler.register(consume, this);
+
bindQueue(exchange, replyQueue, o);
- dispatcher.addEventListener(o, callback);
- }
-
- public function consumeHandler(event:CorrelatedMessageEvent):void {
- var key:String = event.type;
-
- if (topics.containsKey(key)) {
- var topic:* = topics.getValue(key);
- topic.call(null, event);
- }
}
override protected function onChannelOpenOk(event:ProtocolEvent):void {
- declareExchange(exchange,exchangeType);
+ declareExchange(exchange, exchangeType);
setupReplyQueue();
}
@@ -95,26 +98,26 @@ package org.amqp.patterns.impl
override protected function onQueueDeclareOk(event:ProtocolEvent):void {
replyQueue = getReplyQueue(event);
- var consume:Consume = new Consume();
- consume.queue = replyQueue;
- consume.noack = true;
- sessionHandler.register(consume, this);
sendBuffer.drain();
}
- public function onConsumeOk(tag:String):void {}
+ public function onConsumeOk(tag:String):void {
+ var key:String = tag.split(":")[1];
+ var topic:* = topics.getValue(key);
+
+ topic.consumerTag = tag;
+ topics.put(key, topic);
+
+ dispatcher.addEventListener(key, topic.callback);
+ }
public function onCancelOk(tag:String):void {}
public function onDeliver(method:Deliver,
properties:BasicProperties,
body:ByteArray):void {
- // replyto will always be null since we dont know the queue name
- // ..maybe we set an optional user id in the constructor?
- //if (properties.replyto != replyQueue) {
- var result:* = serializer.deserialize(body);
- dispatcher.dispatchEvent(new CorrelatedMessageEvent(properties.correlationid, result));
- //}
+ var result:* = serializer.deserialize(body);
+ dispatcher.dispatchEvent(new CorrelatedMessageEvent(properties.correlationid, result));
}
}
}
View
Binary file not shown.

2 comments on commit ecfa04f

Collaborator

0x6e6562 replied Aug 26, 2008

Is the new version of flexunit?

Owner

pkieltyka replied Aug 26, 2008

Yes

Please sign in to comment.