Skip to content

Commit

Permalink
Publishing should not know about the queues you are publishing to.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Allen committed Jan 7, 2015
1 parent fae7407 commit 1987757
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 97 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ coverage
*.out
*.pid
*.gz
*.swp

pids
logs
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: node_js
node_js:
- 0.10
- "0.10"
services:
- rabbitmq
before_script:
Expand Down
26 changes: 7 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ A simple wrapper to https://github.com/squaremo/amqp.node.
Allows you to have any number of publish queues, one consume queue and to perform
consume and publish operations.

- All queues will be declared (made to exist).
- If you specify a routing key for a publish queue, then a binding will be set up
- You can specify a queue which will be declared (made to exist). This will be
the queue from which you will consume.
- If you specify a routing key for the queue, then a binding will be set up
(I.e. a mapping that tells AMQP to route message with that routing key to that
queue on the exchange you have specified).
- Any options you specify at the per-queue level are passed directly through to
Expand All @@ -29,20 +30,10 @@ var AMQP = require('amqp-wrapper');
var config = {
url: process.env.AMQP_URL,
exchange: process.env.AMQP_EXCHANGE,
queues: {
consume: {
name: process.env.AMQP_CONSUME,
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib.
},
publish: [
{
name: process.env.AMQP_RESPONSE,
routingKey: process.env.AMQP_RESPONSE_ROUTING_KEY,
options: {deadLetterExchange: process.env.AMQP_DEAD_LETTER_EXCHANGE}
},
{ // ...
}
]
queue: {
name: process.env.AMQP_CONSUME,
routingKey: process.env.AMQP_ROUTING_KEY, // If supplied, queue is bound to this key on the exchange.
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib.
},
// Set the QOS/prefetch.
prefetch: 100
Expand All @@ -67,9 +58,6 @@ callback(err, requeue)
// Start consuming:
amqp.consume(handleMessage);

// Publishing to one of the queues declared on connect.
amqp.publishToQueue(name, payload, done);

// Publishing to arbitrary routing key.
amqp.publish(routingKey, payload, options, done);
```
Expand Down
36 changes: 7 additions & 29 deletions amqp.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
'use strict';

var amqp = require('amqplib/callback_api');
var _ = require('lodash');
var async = require('async');
var stringifysafe = require('json-stringify-safe');
var queueSetup = require('./lib/queue-setup');
var debug = require('debug')('amqp-wrapper');
Expand Down Expand Up @@ -47,36 +45,14 @@ module.exports = function(config) {
if (err) {
return cb(err);
}
var tasks = [];
if (config.queues.publish && config.queues.publish instanceof Array) {
tasks.push(function(callback) {
queueSetup.setupForPublish(channel, config, callback);
});
if (config.queue && config.queue.name) {
queueSetup.setupForConsume(channel, config, cb);
} else {
cb();
}
if (config.queues.consume && config.queues.consume.name) {
tasks.push(function(callback) {
queueSetup.setupForConsume(channel, config, callback);
});
}
async.series(tasks, cb);
}
},

/**
* Publish a message to one of the AMQP queues specified on connect.
* @param {string} name The name of the queue to use.
* @param {string} message The message to publish.
* @param {Function(err)} callback The callback to call when done.
*/
publishToQueue: function(name, message, callback) {
if (typeof message === 'object') {
message = stringifysafe(message);
}
var publishQueue = _.find(config.queues.publish, {'name': name});
channel.publish(config.exchange, publishQueue.routingKey,
new Buffer(message), {}, callback);
},

/**
* Publish a message using the specified routing key.
* @param {string} routingKey The name of the queue to use.
Expand Down Expand Up @@ -133,9 +109,11 @@ module.exports = function(config) {
}
}

channel.consume(config.queues.consume.name, callback, {noAck: false});
channel.consume(config.queue.name, callback, {noAck: false});
}
};

return ret;
};

// vim: set et sw=2:
35 changes: 16 additions & 19 deletions lib/queue-setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,21 @@ function maybeDeclareDeadLetters(channel, queue, callback) {
* For publishing, we assert the queue is there and bind it to the routing
* key we are going to use.
*/
exports.setupForPublish = function(channel, params, callback) {
async.each(
params.queues.publish,
function(queue, cb) {
debug('setupForPublish()', queue);
async.series([
async.apply(maybeDeclareDeadLetters, channel, queue),
channel.assertQueue.bind(channel, queue.name, queue.options),
channel.bindQueue.bind(channel,
queue.name, params.exchange, queue.routingKey, {})
], cb);
}, callback);
};

// For consuming, we only assert the queue is there.
exports.setupForConsume = function(channel, params, callback) {
debug('setupForConsume()');
channel.prefetch(params.prefetch);
channel.assertQueue(params.queues.consume.name,
params.queues.consume.options, callback);
var queue = params.queue;
debug('setupForConsume()', queue);
async.series([
maybeDeclareDeadLetters.bind(undefined, channel, queue),
channel.assertQueue.bind(channel, queue.name, queue.options),
function(callback) {
if (queue.routingKey) {
channel.bindQueue(queue.name, params.exchange, queue.routingKey, {},
callback);
} else {
callback();
}
}
], callback);
};

// vim: set sw=2 et:
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"amqplib": "^0.2.1",
"async": "^0.9.0",
"debug": "^1.0.4",
"lodash": "^2.4.1",
"json-stringify-safe": "^5.0.0"
},
"devDependencies": {
Expand Down
9 changes: 9 additions & 0 deletions test/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module.exports = {
url: 'amqp://guest:guest@localhost/amqp-wrapper-testing',
exchange: 'mytestexchange',
queue: {
name: 'myconsumequeue',
routingKey: 'myRoutingQueue',
options: {deadLetterExchange: 'wow'}
}
};
18 changes: 0 additions & 18 deletions test/config.json

This file was deleted.

8 changes: 8 additions & 0 deletions test/configNoKeyNoExchange.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module.exports = {
url: 'amqp://guest:guest@localhost/amqp-wrapper-testing',
exchange: 'hasone',
queue: {
name: 'myconsumequeue',
options: {}
}
};
49 changes: 39 additions & 10 deletions test/lib/amqp.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
'use strict';

var exec = require('child_process').exec;
var SandboxedModule = require('sandboxed-module');
var AMQP = require('../../amqp');

var expect = require('expect.js');

describe('AMQP', function() {
// Set up a vhost for testing purposes so we don't pollute /.
before(function(done) {
exec('curl -u guest:guest -H "content-type:application/json" ' +
'-XPUT http://localhost:15672/api/vhosts/amqp-wrapper-testing', next);
function next(err) {
if (err) {
return done(err);
}
exec('curl -u guest:guest -H "content-type:application/json" ' +
'-XPUT http://localhost:15672/' +
'api/permissions/amqp-wrapper-testing/guest ' +
'-d \'{"configure":".*","write":".*","read":".*"}\'',
done);
}
});

var config = require('../config');
describe('#constructor', function() {
it('should throw with empty constructor', function(done) {
Expand Down Expand Up @@ -44,7 +61,7 @@ describe('AMQP', function() {
var amqp = AMQP(config);
amqp.connect(done);
});
it('should setup for publishing and consuming', function(done) {
it('should declare your queue, and bind it', function(done) {
var amqpLibMock = require('./amqplibmock')();
var mockedAMQP = SandboxedModule.require('../../amqp', {
requires: {
Expand All @@ -57,22 +74,32 @@ describe('AMQP', function() {
return done(err);
}

// two queues, one of which is dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(3);
// Bind the publishing queue, and its dead letter queue.
// one queue, dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2);
// Bind the consume queue, and its dead letter queue.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(2);
done();
});
});
});
describe('#publishToQueue', function() {
it('should call the callback successfully', function(done) {
var amqp = AMQP(config);
amqp.connect(function(err) {
it('should just declare if you don\'t specify routing key', function(done) {
var amqpLibMock = require('./amqplibmock')();
var config = require('../configNoKeyNoExchange');
var mockedAMQP = SandboxedModule.require('../../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config);

mockedAMQP.connect(function(err) {
if (err) {
return done(err);
}
amqp.publishToQueue('mypublishqueue', 'test', done);

// one queue, not dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(1);
// No binding.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(0);
done();
});
});
});
Expand Down Expand Up @@ -180,3 +207,5 @@ describe('AMQP', function() {
});
});
});

// vim: set et sw=2 colorcolumn=80:

0 comments on commit 1987757

Please sign in to comment.