Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add support for RabbitMQ's E2E Binding Extension #141

Closed
wants to merge 2 commits into from

2 participants

@jasonpincin

Added bind/unbind support to the Exchange object, allowing it to harness RabbitMQ's "Exchange to Exchange Bindings" (or E2E). See http://www.rabbitmq.com/e2e.html for additional information on this AMQP extension.

I've included documentation updates (for two new Exchange methods), two new tests (for exchange binding and unbinding), etc. Let me know if there's something more needed.

jasonpincin added some commits
@jasonpincin jasonpincin Added support for RabbitMQ E2E extention.
Added bind/unbind support to the Exchange object, allowing it to
harness RabbitMQ's "Exchange to Exchange Bindings" (or E2E). See
http://www.rabbitmq.com/e2e.html for additional information on this
AMQP extension.
f0697c0
@jasonpincin jasonpincin Documentation improvements.
Noted optional callback arguments on Exchange.bind/unbind, and emitted
events upon successful completion of each.
d5a9bca
@postwait
Owner

merged

@postwait postwait closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Oct 18, 2012
  1. @jasonpincin

    Added support for RabbitMQ E2E extention.

    jasonpincin authored
    Added bind/unbind support to the Exchange object, allowing it to
    harness RabbitMQ's "Exchange to Exchange Bindings" (or E2E). See
    http://www.rabbitmq.com/e2e.html for additional information on this
    AMQP extension.
Commits on Oct 19, 2012
  1. @jasonpincin

    Documentation improvements.

    jasonpincin authored
    Noted optional callback arguments on Exchange.bind/unbind, and emitted
    events upon successful completion of each.
This page is out of date. Refresh to see the latest.
View
24 README.md
@@ -374,3 +374,27 @@ If the optional boolean second argument is set, the server will only
delete the exchange if it has no queue bindings. If the exchange has queue
bindings the server does not delete it but raises a channel exception
instead.
+
+### exchange.bind(srcExchange, routingKey [, callback])
+
+Binds the exchange (destination) to the given source exchange (srcExchange).
+When one exchange is bound to another, the destination (or receiving) exchange
+will receive all messages published to the source exchange that match the
+given routingKey.
+
+This method will emit `'exchangeBindOk'` when complete.
+
+Please note that Exchange to Exchange Bindings (E2E) are an extension to the
+AMQP spec introduced by RabbitMQ, and that by using this feature, you will be
+reliant on RabbitMQ's AMQP implementation. For more information on E2E
+Bindings with RabbitMQ see:
+
+http://www.rabbitmq.com/e2e.html
+
+### exchange.unbind(srcExchange, routingKey [, callback])
+
+Unbinds the exchange (destination) from the given source exchange (srcExchange).
+This is the reverse of the exchange.bind method above, and will stop messages
+from srcExchange/routingKey from being sent to the destination exchange.
+
+This method will emit `'exchangeUnbindOk'` when complete.
View
2  amqp-definitions-0-9-1.js
@@ -1,2 +1,2 @@
exports.constants = [[1,"frameMethod"],[2,"frameHeader"],[3,"frameBody"],[8,"frameHeartbeat"],[200,"replySuccess"],[206,"frameEnd"],[311,"contentTooLarge"],[313,"noConsumers"],[320,"connectionForced"],[402,"invalidPath"],[403,"accessRefused"],[404,"notFound"],[405,"resourceLocked"],[406,"preconditionFailed"],[501,"frameError"],[502,"syntaxError"],[503,"commandInvalid"],[504,"channelError"],[505,"unexpectedFrame"],[506,"resourceError"],[530,"notAllowed"],[540,"notImplemented"],[541,"internalError"],[4096,"frameMinSize"]];
-exports.classes = [{"name":"connection","index":10,"fields":[],"methods":[{"name":"start","index":10,"fields":[{"name":"versionMajor","domain":"octet"},{"name":"versionMinor","domain":"octet"},{"name":"serverProperties","domain":"table"},{"name":"mechanisms","domain":"longstr"},{"name":"locales","domain":"longstr"}]},{"name":"startOk","index":11,"fields":[{"name":"clientProperties","domain":"table"},{"name":"mechanism","domain":"shortstr"},{"name":"response","domain":"longstr"},{"name":"locale","domain":"shortstr"}]},{"name":"secure","index":20,"fields":[{"name":"challenge","domain":"longstr"}]},{"name":"secureOk","index":21,"fields":[{"name":"response","domain":"longstr"}]},{"name":"tune","index":30,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"tuneOk","index":31,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"open","index":40,"fields":[{"name":"virtualHost","domain":"shortstr"},{"name":"reserved1","domain":"shortstr"},{"name":"reserved2","domain":"bit"}]},{"name":"openOk","index":41,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"close","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":51,"fields":[]}]},{"name":"channel","index":20,"fields":[],"methods":[{"name":"open","index":10,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"openOk","index":11,"fields":[{"name":"reserved1","domain":"longstr"}]},{"name":"flow","index":20,"fields":[{"name":"active","domain":"bit"}]},{"name":"flowOk","index":21,"fields":[{"name":"active","domain":"bit"}]},{"name":"close","index":40,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":41,"fields":[]}]},{"name":"exchange","index":40,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"type","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"reserved2","domain":"bit"},{"name":"reserved3","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[]},{"name":"delete","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":21,"fields":[]}]},{"name":"queue","index":50,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[{"name":"queue","domain":"shortstr"},{"name":"messageCount","domain":"long"},{"name":"consumerCount","domain":"long"}]},{"name":"bind","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"bindOk","index":21,"fields":[]},{"name":"unbind","index":50,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"arguments","domain":"table"}]},{"name":"unbindOk","index":51,"fields":[]},{"name":"purge","index":30,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"purgeOk","index":31,"fields":[{"name":"messageCount","domain":"long"}]},{"name":"delete","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"ifEmpty","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":41,"fields":[{"name":"messageCount","domain":"long"}]}]},{"name":"basic","index":60,"fields":[{"name":"contentType","domain":"shortstr"},{"name":"contentEncoding","domain":"shortstr"},{"name":"headers","domain":"table"},{"name":"deliveryMode","domain":"octet"},{"name":"priority","domain":"octet"},{"name":"correlationId","domain":"shortstr"},{"name":"replyTo","domain":"shortstr"},{"name":"expiration","domain":"shortstr"},{"name":"messageId","domain":"shortstr"},{"name":"timestamp","domain":"timestamp"},{"name":"type","domain":"shortstr"},{"name":"userId","domain":"shortstr"},{"name":"appId","domain":"shortstr"},{"name":"reserved","domain":"shortstr"}],"methods":[{"name":"qos","index":10,"fields":[{"name":"prefetchSize","domain":"long"},{"name":"prefetchCount","domain":"short"},{"name":"global","domain":"bit"}]},{"name":"qosOk","index":11,"fields":[]},{"name":"consume","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"consumerTag","domain":"shortstr"},{"name":"noLocal","domain":"bit"},{"name":"noAck","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"consumeOk","index":21,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"cancel","index":30,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"cancelOk","index":31,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"publish","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"mandatory","domain":"bit"},{"name":"immediate","domain":"bit"}]},{"name":"return","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"deliver","index":60,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"get","index":70,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noAck","domain":"bit"}]},{"name":"getOk","index":71,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"messageCount","domain":"long"}]},{"name":"getEmpty","index":72,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"ack","index":80,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"multiple","domain":"bit"}]},{"name":"reject","index":90,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"requeue","domain":"bit"}]},{"name":"recoverAsync","index":100,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recover","index":110,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recoverOk","index":111,"fields":[]}]},{"name":"tx","index":90,"fields":[],"methods":[{"name":"select","index":10,"fields":[]},{"name":"selectOk","index":11,"fields":[]},{"name":"commit","index":20,"fields":[]},{"name":"commitOk","index":21,"fields":[]},{"name":"rollback","index":30,"fields":[]},{"name":"rollbackOk","index":31,"fields":[]}]},{"name":"confirm","index":85,"fields":[],"methods":[{"name":"select","index":10,"fields":[{"name":"noWait","domain":"bit"}]},{"name":"selectOk","index":11,"fields":[]}]}];
+exports.classes = [{"name":"connection","index":10,"fields":[],"methods":[{"name":"start","index":10,"fields":[{"name":"versionMajor","domain":"octet"},{"name":"versionMinor","domain":"octet"},{"name":"serverProperties","domain":"table"},{"name":"mechanisms","domain":"longstr"},{"name":"locales","domain":"longstr"}]},{"name":"startOk","index":11,"fields":[{"name":"clientProperties","domain":"table"},{"name":"mechanism","domain":"shortstr"},{"name":"response","domain":"longstr"},{"name":"locale","domain":"shortstr"}]},{"name":"secure","index":20,"fields":[{"name":"challenge","domain":"longstr"}]},{"name":"secureOk","index":21,"fields":[{"name":"response","domain":"longstr"}]},{"name":"tune","index":30,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"tuneOk","index":31,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"open","index":40,"fields":[{"name":"virtualHost","domain":"shortstr"},{"name":"reserved1","domain":"shortstr"},{"name":"reserved2","domain":"bit"}]},{"name":"openOk","index":41,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"close","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":51,"fields":[]}]},{"name":"channel","index":20,"fields":[],"methods":[{"name":"open","index":10,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"openOk","index":11,"fields":[{"name":"reserved1","domain":"longstr"}]},{"name":"flow","index":20,"fields":[{"name":"active","domain":"bit"}]},{"name":"flowOk","index":21,"fields":[{"name":"active","domain":"bit"}]},{"name":"close","index":40,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":41,"fields":[]}]},{"name":"exchange","index":40,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"type","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"reserved2","domain":"bit"},{"name":"reserved3","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[]},{"name":"delete","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":21,"fields":[]},{"name":"bind","index":30,"fields":[{"name":"reserved1","domain":"short"},{"name":"destination","domain":"shortstr"},{"name":"source","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"bindOk","index":31,"fields":[]},{"name":"unbind","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"destination","domain":"shortstr"},{"name":"source","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"unbindOk","index":51,"fields":[]}]},{"name":"queue","index":50,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[{"name":"queue","domain":"shortstr"},{"name":"messageCount","domain":"long"},{"name":"consumerCount","domain":"long"}]},{"name":"bind","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"bindOk","index":21,"fields":[]},{"name":"unbind","index":50,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"arguments","domain":"table"}]},{"name":"unbindOk","index":51,"fields":[]},{"name":"purge","index":30,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"purgeOk","index":31,"fields":[{"name":"messageCount","domain":"long"}]},{"name":"delete","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"ifEmpty","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":41,"fields":[{"name":"messageCount","domain":"long"}]}]},{"name":"basic","index":60,"fields":[{"name":"contentType","domain":"shortstr"},{"name":"contentEncoding","domain":"shortstr"},{"name":"headers","domain":"table"},{"name":"deliveryMode","domain":"octet"},{"name":"priority","domain":"octet"},{"name":"correlationId","domain":"shortstr"},{"name":"replyTo","domain":"shortstr"},{"name":"expiration","domain":"shortstr"},{"name":"messageId","domain":"shortstr"},{"name":"timestamp","domain":"timestamp"},{"name":"type","domain":"shortstr"},{"name":"userId","domain":"shortstr"},{"name":"appId","domain":"shortstr"},{"name":"reserved","domain":"shortstr"}],"methods":[{"name":"qos","index":10,"fields":[{"name":"prefetchSize","domain":"long"},{"name":"prefetchCount","domain":"short"},{"name":"global","domain":"bit"}]},{"name":"qosOk","index":11,"fields":[]},{"name":"consume","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"consumerTag","domain":"shortstr"},{"name":"noLocal","domain":"bit"},{"name":"noAck","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"consumeOk","index":21,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"cancel","index":30,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"cancelOk","index":31,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"publish","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"mandatory","domain":"bit"},{"name":"immediate","domain":"bit"}]},{"name":"return","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"deliver","index":60,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"get","index":70,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noAck","domain":"bit"}]},{"name":"getOk","index":71,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"messageCount","domain":"long"}]},{"name":"getEmpty","index":72,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"ack","index":80,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"multiple","domain":"bit"}]},{"name":"reject","index":90,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"requeue","domain":"bit"}]},{"name":"recoverAsync","index":100,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recover","index":110,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recoverOk","index":111,"fields":[]}]},{"name":"tx","index":90,"fields":[],"methods":[{"name":"select","index":10,"fields":[]},{"name":"selectOk","index":11,"fields":[]},{"name":"commit","index":20,"fields":[]},{"name":"commitOk","index":21,"fields":[]},{"name":"rollback","index":30,"fields":[]},{"name":"rollbackOk","index":31,"fields":[]}]},{"name":"confirm","index":85,"fields":[],"methods":[{"name":"select","index":10,"fields":[{"name":"noWait","domain":"bit"}]},{"name":"selectOk","index":11,"fields":[]}]}];
View
91 amqp.js
@@ -2000,6 +2000,8 @@ function Exchange (connection, channel, name, options, openCallback) {
Channel.call(this, connection, channel);
this.name = name;
this.binds = 0; // keep track of queues bound
+ this.exchangeBinds = 0; // keep track of exchanges bound
+ this.sourceExchanges = {};
this.options = options || { autoDelete: true};
this._openCallback = openCallback;
@@ -2129,6 +2131,23 @@ Exchange.prototype._onMethod = function (channel, method, args) {
this.emit('basic-return', args);
break;
+ case methods.exchangeBindOk:
+ if (this._bindCallback) {
+ // setting this._bindCallback to null before calling the callback allows for a subsequent bind within the callback
+ var cb = this._bindCallback;
+ this._bindCallback = null;
+ cb(this);
+ }
+ break;
+
+ case methods.exchangeUnbindOk:
+ if (this._unbindCallback) {
+ var cb = this._unbindCallback;
+ this._unbindCallback = null;
+ cb(this);
+ }
+ break;
+
default:
throw new Error("Uncaught method '" + method.name + "' with args " +
JSON.stringify(args));
@@ -2214,3 +2233,75 @@ Exchange.prototype.destroy = function (ifUnused) {
});
});
};
+
+// E2E Unbind
+// support RabbitMQ's exchange-to-exchange binding extension
+// http://www.rabbitmq.com/e2e.html
+Exchange.prototype.unbind = function (/* exchange, routingKey [, bindCallback] */) {
+ var self = this;
+
+ // Both arguments are required. The binding to the destination
+ // exchange/routingKey will be unbound.
+
+ var exchange = arguments[0]
+ , routingKey = arguments[1]
+ , callback = arguments[2]
+ ;
+
+ if(callback) this._unbindCallback = callback;
+
+ return this._taskPush(methods.exchangeUnbindOk, function () {
+ var source = exchange instanceof Exchange ? exchange.name : exchange;
+ var destination = self.name;
+
+ if(source in self.connection.exchanges) {
+ delete self.sourceExchanges[source];
+ self.connection.exchanges[source].exchangeBinds--;
+ }
+
+ self.connection._sendMethod(self.channel, methods.exchangeUnbind,
+ { reserved1: 0
+ , destination: destination
+ , source: source
+ , routingKey: routingKey
+ , noWait: false
+ , "arguments": {}
+ });
+ });
+};
+
+// E2E Bind
+// support RabbitMQ's exchange-to-exchange binding extension
+// http://www.rabbitmq.com/e2e.html
+Exchange.prototype.bind = function (/* exchange, routingKey [, bindCallback] */) {
+ var self = this;
+
+ // Two arguments are required. The binding to the destination
+ // exchange/routingKey will be established.
+
+ var exchange = arguments[0]
+ , routingKey = arguments[1]
+ , callback = arguments[2]
+ ;
+
+ if(callback) this._bindCallback = callback;
+
+
+ var source = exchange instanceof Exchange ? exchange.name : exchange;
+ var destination = self.name;
+
+ if(source in self.connection.exchanges) {
+ self.sourceExchanges[source] = self.connection.exchanges[source];
+ self.connection.exchanges[source].exchangeBinds++;
+ }
+
+ self.connection._sendMethod(self.channel, methods.exchangeBind,
+ { reserved1: 0
+ , destination: destination
+ , source: source
+ , routingKey: routingKey
+ , noWait: false
+ , "arguments": {}
+ });
+
+};
View
3  package.json
@@ -1,7 +1,7 @@
{ "name" : "amqp"
, "description" : "AMQP driver for node"
, "keywords" : [ "amqp" ]
-, "version" : "0.1.4"
+, "version" : "0.1.5"
, "preferGlobal" : true
, "author" : { "name" : "Ryan Dahl" }
, "contributors" :
@@ -17,6 +17,7 @@
, { "name" : "Andrei Vereha" }
, { "name" : "Mike Bardzinski" }
, { "name" : "James Carr" }
+ , { "name" : "Jason Pincin" }
]
, "repository" :
{ "type" : "git"
View
37 test/test-exchange-bind.js
@@ -0,0 +1,37 @@
+require('./harness');
+var testName = __filename.replace(__dirname+'/','').replace('.js','');
+var msgsReceived = 0;
+
+connection.addListener('ready', function () {
+ puts("connected to " + connection.serverProperties.product);
+ var callbackCalled = false;
+
+ connection.exchange('node.'+testName+'.dstExchange', {type: 'topic'}, function(dstExchange) {
+ connection.exchange('node.'+testName+'.srcExchange', {type: 'topic'}, function(srcExchange) {
+ dstExchange.bind(srcExchange, '#', function () {
+ connection.queue( 'node.'+testName+'.nestedExchangeQueue', { durable: false, autoDelete : true }, function (queue) {
+ queue.bind(dstExchange, '#', function () {
+ queue.subscribe(function ( msg ) {
+ puts(msg.data.toString());
+ msgsReceived++;
+ });
+ srcExchange.publish('node.'+testName+'.nestedExchangeTest',
+ 'Queue received message from non-directly-bound exchange.');
+
+ setTimeout(function () {
+ // wait one second to receive the message, then quit
+ queue.destroy();
+ dstExchange.destroy();
+ srcExchange.destroy();
+ connection.end();
+ }, 1000);
+ });
+ });
+ });
+ });
+ });
+});
+
+process.addListener('exit', function() {
+ assert.equal(1, msgsReceived);
+});
View
39 test/test-exchange-unbind.js
@@ -0,0 +1,39 @@
+require('./harness');
+var testName = __filename.replace(__dirname+'/','').replace('.js','');
+var msgsReceived = 0;
+connection.addListener('ready', function () {
+ puts("connected to " + connection.serverProperties.product);
+ var callbackCalled = false;
+
+ connection.exchange('node.'+testName+'.dstExchange', {type: 'topic'}, function(dstExchange) {
+ connection.exchange('node.'+testName+'.srcExchange', {type: 'topic'}, function(srcExchange) {
+ dstExchange.bind(srcExchange, '#', function () {
+ connection.queue( 'node.'+testName+'.nestedExchangeQueue', { durable: false, autoDelete : true }, function (queue) {
+ queue.bind(dstExchange, '#', function () {
+ queue.subscribe(function ( msg ) {
+ puts(msg.data.toString());
+ msgsReceived++;
+ });
+ srcExchange.publish('node.'+testName+'.nestedExchangeTest',
+ 'Queue received message from non-directly-bound exchange.');
+ dstExchange.unbind(srcExchange,'#', function () {
+ srcExchange.publish('node.'+testName+'.nestedExchangeTest',
+ 'You should NOT see this.');
+ });
+ setTimeout(function () {
+ // wait one second to receive the message, then quit
+ queue.destroy();
+ dstExchange.destroy();
+ srcExchange.destroy();
+ connection.end();
+ }, 1000);
+ });
+ });
+ });
+ });
+ });
+});
+
+process.addListener('exit', function() {
+ assert.equal(1, msgsReceived);
+});
Something went wrong with that request. Please try again.