Skip to content

Commit

Permalink
Fix bug where by confirm incorrectly used on built-in exchanges.
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredhanson committed Mar 29, 2014
1 parent 2cbb196 commit d580300
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions lib/crane-amqp/broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,17 @@ Broker.prototype.connect = function(options, readyListener) {

// AMQP uses period ('.') separators rather than slash ('/')
name = name.replace(/\//g, '.');

opts.type = (opts.type === undefined) ? 'direct' : opts.type;
opts.durable = (opts.durable === undefined) ? true : opts.durable;
opts.autoDelete = (opts.autoDelete === undefined) ? false : opts.autoDelete;
opts.confirm = (opts.confirm === undefined) ? true : opts.confirm;
if (name.indexOf('amq.') !== 0) {
opts.type = (opts.type === undefined) ? 'direct' : opts.type;
opts.durable = (opts.durable === undefined) ? true : opts.durable;
opts.autoDelete = (opts.autoDelete === undefined) ? false : opts.autoDelete;
opts.confirm = (opts.confirm === undefined) ? true : opts.confirm;
} else {
// Options for built-in exchanges can not be overridden.
opts = {};
}

debug('exchange %s %s', name);

self._exchange = self._connection.exchange(name, opts, function(exchange) {
return self.emit('ready');
});
Expand Down Expand Up @@ -140,35 +143,34 @@ Broker.prototype.declare = function(name, options, cb) {
q.once('error', onQueueDeclareError);
}

Broker.prototype.enqueue = function(queue, msg, options, cb) {
Broker.prototype.enqueue = function(topic, msg, options, cb) {
if (typeof options == 'function') {
cb = options;
options = undefined;
}
options = options || {};

// AMQP uses period ('.') separators rather than slash ('/')
queue = queue.replace(/\//g, '.');
topic = topic.replace(/\//g, '.');

options.deliveryMode = (options.deliveryMode === undefined) ? PERSISTENT_MODE : options.deliveryMode;
// TODO: This option appears to have no effect. Why?
//options.mandatory = (options.mandatory === undefined) ? true : options.mandatory;

// FIXME: not working properly
/*
if (this._exchange.options && this._exchange.options.confirm) {
this._exchange.publish(queue, msg, options, function(hadError) {
debug('publish %s (confirm)', topic);
this._exchange.publish(topic, msg, options, function(hadError) {
var err;
if (hadError) {
err = new Error('Failed to enqueue task in queue "' + queue + '"');
err = new Error('Failed to publish message to topic "' + topic + '"');
}
return cb(err);
});
} else { */
this._exchange.publish(queue, msg, options);
// TODO: call cb on nextTick
return cb && cb();
/*}*/
} else {
debug('publish %s', topic);
this._exchange.publish(topic, msg, options);
if (cb) { return process.nextTick(cb); }
}
}

Broker.prototype.subscribe = function(queue, options, cb) {
Expand Down

0 comments on commit d580300

Please sign in to comment.