Skip to content

Commit

Permalink
Merge branch '229_consumer_cancel_notify' of https://github.com/cmoes…
Browse files Browse the repository at this point in the history
  • Loading branch information
postwait committed Mar 12, 2014
2 parents beb346c + f8e3ee8 commit 26b98e4
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 2 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,15 @@ You can also specify additional client properties for your connection
by setting the `clientProperties` field on the `options` object.

{ clientProperties: { applicationName: 'myApplication'
, capabilities: { consumer_cancel_notify: true
}
}
}

If the `consumer_cancel_notify` capability is set to `true` (as above), then
RabbitMQ's [Consumer Cancel Notification](http://www.rabbitmq.com/consumer-cancel.html)
feature will be enabled.

By default the following client properties are set

{ product: 'node-amqp'
Expand Down Expand Up @@ -309,6 +315,11 @@ The `messageObject` can be used to acknowledge a given message using:
```javascript
messageObject.acknowledge(false); // use true if you want to acknowledge all previous messages of the queue
```
If the `consumer_cancel_notify` capability was enabled when the connection was
created, the queue will emit `basicCancel` upon receiving a consumer cancel
notification from the server. The queue's channel will be automatically closed.
In a clustered environment, developers may want to consider automatically
re-subscribing to the queue on this event.

This method will emit `'basicQosOk'` when ready.

Expand Down Expand Up @@ -405,6 +416,9 @@ the queue will only be deleted if there are no consumers. If
+options.ifEmpty+ is true, the queue will only be deleted if it has no
messages.

Note: the successful destruction of a queue will cause a consumer cancel
notification to be emitted (for clients who have enabled the
`consumer_cancel_notify` option when creating the connection).



Expand Down
4 changes: 2 additions & 2 deletions lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ Channel.prototype._onChannelMethod = function(channel, method, args) {
}
};

Channel.prototype.close = function() {
Channel.prototype.close = function(reason) {
this.state = 'closing';
this.connection._sendMethod(this.channel, methods.channelClose,
{'replyText': 'Goodbye from node',
{'replyText': reason ? reason : 'Goodbye from node',
'replyCode': 200,
'classId': 0,
'methodId': 0});
Expand Down
4 changes: 4 additions & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ Queue.prototype._onMethod = function (channel, method, args) {
case methods.queueDeleteOk:
break;

case methods.basicCancel:
this.close("Closed due to basicCancel received on consumer (" + args.consumerTag + ")");
break;

default:
throw new Error("Uncaught method '" + method.name + "' with args " +
JSON.stringify(args) + "; tasks = " + JSON.stringify(this._tasks));
Expand Down
48 changes: 48 additions & 0 deletions test/test-consumer-cancel-notify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
var harness = require('./harness');

if (typeof(options.clientProperties) === 'undefined') {
options.clientProperties = {};
}
if (typeof(options.clientProperties.capabilities) === 'undefined') {
options.clientProperties.capabilities = {};
}
options.clientProperties.capabilities.consumer_cancel_notify = true;

var connection = harness.run();
var notifyCount = 0;

connection.once('ready', function() {
// set a timer to close things if we're not done in one second
var finisherId = setTimeout(function () {
connection.end();
}, 1000);

connection.queue('node-ccn-queue', function(q) {
q.bind("#")
q.on('queueBindOk', function() {
q.on('basicCancel', function(args) {
notifyCount++;
});

q.on('close', function() {
connection.end();
clearTimeout(finisherId);
});

q.on('basicConsumeOk', function () {
connection.queue('node-ccn-queue', function(q2) {
q2.destroy();
});
});

q.subscribe(function (m) {
// no-op
})
});
});
});


process.addListener('exit', function () {
assert.equal(1, notifyCount);
});

0 comments on commit 26b98e4

Please sign in to comment.