diff --git a/lib/crane-amqp/broker.js b/lib/crane-amqp/broker.js index dbd3d20..bfec945 100644 --- a/lib/crane-amqp/broker.js +++ b/lib/crane-amqp/broker.js @@ -70,14 +70,17 @@ Broker.prototype.connect = function(options, readyListener) { // AMQP uses period ('.') separators rather than slash ('/') name = name.replace(/\//g, '.'); - - opts.type = (opts.type === undefined) ? 'direct' : opts.type; - opts.durable = (opts.durable === undefined) ? true : opts.durable; - opts.autoDelete = (opts.autoDelete === undefined) ? false : opts.autoDelete; - opts.confirm = (opts.confirm === undefined) ? true : opts.confirm; + if (name.indexOf('amq.') !== 0) { + opts.type = (opts.type === undefined) ? 'direct' : opts.type; + opts.durable = (opts.durable === undefined) ? true : opts.durable; + opts.autoDelete = (opts.autoDelete === undefined) ? false : opts.autoDelete; + opts.confirm = (opts.confirm === undefined) ? true : opts.confirm; + } else { + // Options for built-in exchanges can not be overridden. + opts = {}; + } debug('exchange %s %s', name); - self._exchange = self._connection.exchange(name, opts, function(exchange) { return self.emit('ready'); }); @@ -140,7 +143,7 @@ Broker.prototype.declare = function(name, options, cb) { q.once('error', onQueueDeclareError); } -Broker.prototype.enqueue = function(queue, msg, options, cb) { +Broker.prototype.enqueue = function(topic, msg, options, cb) { if (typeof options == 'function') { cb = options; options = undefined; @@ -148,27 +151,26 @@ Broker.prototype.enqueue = function(queue, msg, options, cb) { options = options || {}; // AMQP uses period ('.') separators rather than slash ('/') - queue = queue.replace(/\//g, '.'); + topic = topic.replace(/\//g, '.'); options.deliveryMode = (options.deliveryMode === undefined) ? PERSISTENT_MODE : options.deliveryMode; // TODO: This option appears to have no effect. Why? //options.mandatory = (options.mandatory === undefined) ? true : options.mandatory; - // FIXME: not working properly - /* if (this._exchange.options && this._exchange.options.confirm) { - this._exchange.publish(queue, msg, options, function(hadError) { + debug('publish %s (confirm)', topic); + this._exchange.publish(topic, msg, options, function(hadError) { var err; if (hadError) { - err = new Error('Failed to enqueue task in queue "' + queue + '"'); + err = new Error('Failed to publish message to topic "' + topic + '"'); } return cb(err); }); - } else { */ - this._exchange.publish(queue, msg, options); - // TODO: call cb on nextTick - return cb && cb(); - /*}*/ + } else { + debug('publish %s', topic); + this._exchange.publish(topic, msg, options); + if (cb) { return process.nextTick(cb); } + } } Broker.prototype.subscribe = function(queue, options, cb) {