Skip to content

Commit

Permalink
Improve queue declaration and binding.
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredhanson committed Mar 31, 2014
1 parent d0877d2 commit c4812c8
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 30 deletions.
59 changes: 29 additions & 30 deletions lib/crane-amqp/broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
var EventEmitter = require('events').EventEmitter
, amqp = require('amqp')
, util = require('util')
, Queue = require('./queue')
, Message = require('./message')
, NoQueueError = require('./errors/noqueueerror')
, NoSubscriptionError = require('./errors/nosubscriptionerror')
Expand Down Expand Up @@ -286,6 +287,7 @@ Broker.prototype.unsubscribe = function(queue, options, cb) {
if (!q) { return cb(new NoQueueError('Queue "' + queue + '" not declared')); }
if (!ctag) { return cb(new NoSubscriptionError('Not subscribed to queue "' + queue + '"')); }

debug('unsubscribe %s', queue);
q.unsubscribe(ctag)
.addCallback(function(ok) {
delete self._ctags[queue];
Expand All @@ -301,54 +303,51 @@ Broker.prototype.unsubscribe = function(queue, options, cb) {
});
}

Broker.prototype.declare = function(name, options, cb) {
Broker.prototype.declare = function(queue, options, cb) {
if (typeof options == 'function') {
cb = options;
options = undefined;
}
options = options || {};
var bind = (options.bind === undefined) ? true : options.bind;

// AMQP uses period ('.') separators rather than slash ('/')
name = name.replace(/\//g, '.');

queue = queue.replace(/\//g, '.');
// Task queues are declared as durable, by default. This ensures that tasks
// are not lost in the event that that server is stopped or crashes.
options.durable = (options.durable === undefined) ? true : options.durable;
options.autoDelete = (options.autoDelete === undefined) ? false : options.autoDelete;

var q = this._connection.queue(name, options, function(q) {
q.removeListener('error', onQueueDeclareError);
var self = this;

var onError = function(err) {
return cb(err);
};

debug('declare %s', queue);
var q = this._connection.queue(queue, options, function(q) {
var wq = new Queue(q, self._exchange.name);
q.removeListener('error', onError);

var exchange = options.bind;
if (exchange) {
// AMQP uses period ('.') separators rather than slash ('/')
exchange = exchange.replace(/\//g, '.');

// TODO: support binding to multiple topics
var topic = options.topic || name;
topic = topic.replace(/\//g, '.');

q.bind(exchange, topic, function(q) {
q.removeListener('error', onQueueBindError);
return cb && cb();
if (bind) {
wq.bind(queue, function(err) {
if (err) { return cb(err); }
return cb(null, wq);
});

// Listen for events that indicate that the queue failed to be bound.
var onQueueBindError = function(err) {
return cb && cb(err);
};
q.once('error', onQueueBindError);
} else {
return cb && cb();
return cb(null, wq);
}
});
this._queues[name] = q;
this._queues[queue] = q;

// Listen for events that indicate that the queue failed to be declared.
var onQueueDeclareError = function(err) {
return cb && cb(err);
};
q.once('error', onQueueDeclareError);
// NOTE: This will occur if a queue is redeclared with different properties.
//
// For example, the underlying `amqp` emits an error with the following
// properties:
// - message: PRECONDITION_FAILED - parameters for queue 'foo' in
// vhost '/' not equivalent
// - code: 406
q.once('error', onError);
}


Expand Down
53 changes: 53 additions & 0 deletions lib/crane-amqp/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Module dependencies.
*/
var debug = require('debug')('crane-amqp');


/**
* `Broker` constructor.
*
* @api protected
*/
function Queue(queue, exchange) {
this._q = queue;
this._exchange = exchange;
}

/**
* Bind queue to topic.
*
* @api public
*/
Queue.prototype.bind = function(topic, cb) {
// AMQP uses period ('.') separators rather than slash ('/')
topic = topic.replace(/\//g, '.');

var q = this._q
, exchange = this._exchange;

var onError = function(err) {
return cb(err);
};

debug('bind %s %s %s', q.name, this._exchange, topic);
q.bind(exchange, topic, function(q) {
q.removeListener('error', onError);
return cb();
});

// NOTE: This will occur if an attempt is made to bind to an exchange that
// does not exist.
//
// For example, the underlying `amqp` emits an error with the following
// properties:
// - message: NOT_FOUND - no exchange 'foo' in vhost '/'
// - code: 404
q.once('error', onError);
}


/**
* Expose `Broker`.
*/
module.exports = Queue;

0 comments on commit c4812c8

Please sign in to comment.