Skip to content

Commit

Permalink
Merge pull request #67 from lamchakchan/66-error-queue-subscriptions
Browse files Browse the repository at this point in the history
Add getAll interface
  • Loading branch information
Lam Chan committed Apr 25, 2017
2 parents 3a68f94 + b0ecfd8 commit 4eeb588
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 10 deletions.
36 changes: 30 additions & 6 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ Subscribe to messages from the queue.

##### parameter(s)

- `queue` - the name of the queue to subscribe messages to. *[string]* **Required**
- `handlers` - a `key` / `handler` hash where the key reflects the name of the `message.event` or `routeKey`. And the handler reflects a `Function` as `(message, [ack, [reject, [requeue]]]) => {}`. *[Object]* **Required**
- `queue` - the name of the queue to subscribe messages to. A queue with the provided name will be created if one does not exist. *[string]* **Required**
- `handlers` - a `key` / `handler` hash where the key reflects the name of the `message.event` or `routeKey`. And the handler reflects a `Function` as `(message, [meta, [ack, [reject, [requeue]]]]) => {}`. *[Object]* **Required**
- `options` - optional settings. *[Object]* **Optional**
- `queue` - settings for the queue. [Settings](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue) are proxied through to amqplib `assertQueue`. *[Object]* **Optional**
- `globalExchange` - value of the exchange to transact through for message publishing. Defaults to one provided in the [config](#config). *[string]* **Optional**
Expand Down Expand Up @@ -584,10 +584,6 @@ Pop a message directly off a queue. The payload returned is the RabbitMQ `paylo
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const message = {
// other stuff you want to send
}

bunnyBus.get('queue1', (err, result) => {
//result contains an rabbit payload object
//JSON.tostring(result.content) will contain the message that was sent.
Expand All @@ -599,6 +595,34 @@ bunnyBus.get('queue1')
.catch((err) =>{});
```
#### `getAll(queue, handler, [options, [callback]])`
Pop all messages directly off a queue until there is no more. Handler is called for each message that is popped.
##### parameter(s)
- `queue` - the name of the queue. *[string]* **Required**
- `handler` - a handler reflects a `Function` as `(message, [meta, [ack]]) => {}`. *[Function]* **Required**
- `options` - optional settings. *[Object]* **Optional**
- `get` - [Settings](http://www.squaremobius.net/amqp.node/channel_api.html#channel_get) are proxied through to amqplib `get`. *[Object]* **Optional**
- `meta` - allows for meta data regarding the payload to be returned. Turning this on will adjust the handler to be a `Function` as `(message, meta, [ack]) => {}`. *[boolean]* **Optional**
- `callback` - node style callback `(err) => {}`. This is called when all currently retrievable items have been passed to the provided `handler`. *[Function]* **Optional**
```Javascript
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const handler = (message, ack) => {
ack(() => {});
}

bunnyBus.getAll('queue1', handler, (err) => {});

// promise api
bunnyBus.getAll('queue1', handler)
.catch((err) =>{});
```
### Internal-use Methods
The following methods are available in the public API, but manual use of them is highly discouraged.
Expand Down
48 changes: 48 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,54 @@ class BunnyBus extends EventEmitter{
}
}

getAll(queue, handler, options, callback) {

callback = Helpers.reduceCallback(options, callback);

if (callback === undefined) {
return Helpers.toPromise($.promise, $.getAll, queue, handler, options);
}

const getOptions = (options && options.get);
const meta = (options && options.meta);

let endUntil = false;

Async.until(
() => endUntil,
(cb) => {

$.get(queue, getOptions, (err, payload) => {

if (payload) {
$.logger.trace(payload);

const parsedPayload = Helpers.parsePayload(payload);

if (meta) {
handler(
parsedPayload.message,
parsedPayload.metaData,
$._ack.bind(null, payload)
);
}
else {
handler(
parsedPayload.message,
$._ack.bind(null, payload)
);
}
}
else {
endUntil = true;
}
cb(err);
});
},
(err) => callback(err)
);
}

publish(message, options, callback) {

callback = Helpers.reduceCallback(options, callback);
Expand Down
57 changes: 57 additions & 0 deletions test/assertions/assertGetAll.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict';

const Async = require('async');
const Code = require('code');
const expect = Code.expect;

const assertGetAll = (instance, message, queueName, meta, limit, callback) => {

const callbackLimit = limit + 1;
let callbackCounter = 0;

const options = {
meta
};

const callbackReconciler = () => {

if (++callbackCounter === callbackLimit) {
callback();
}
};

const handlerWithoutMeta = (sentMessage, ack) => {

expect(sentMessage).to.be.equal(message);
ack(callbackReconciler);
};

const handlerWithMeta = (sentMessage, sentMeta, ack) => {

expect(sentMessage).to.be.equal(message);
expect(sentMeta).to.exist();

ack(callbackReconciler);
};

const handler = meta ? handlerWithMeta : handlerWithoutMeta;

Async.waterfall([
(cb) => {

Async.times(
limit,
(n, next) => instance.send(message, queueName, next),
cb
);
},
(result, cb) => instance.getAll(queueName, handler, options, cb)
],
(err) => {

expect(err).to.be.null();
callbackReconciler();
});
};

module.exports = assertGetAll;
50 changes: 50 additions & 0 deletions test/assertions/assertGetAllPromise.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict';

const Promise = require('bluebird');
const Code = require('code');
const expect = Code.expect;

const assertGetAll = (instance, message, queueName, meta, limit) => {

return new Promise((resolve, reject) => {

const callbackLimit = limit + 1;
let callbackCounter = 0;

const options = {
meta
};

const callbackReconciler = () => {

if (++callbackCounter === callbackLimit) {
resolve();
}
};

const handlerWithoutMeta = (sentMessage, ack) => {

expect(sentMessage).to.be.equal(message);
ack(callbackReconciler);
};

const handlerWithMeta = (sentMessage, sentMeta, ack) => {

expect(sentMessage).to.be.equal(message);
expect(sentMeta).to.exist();

ack(callbackReconciler);
};

const handler = meta ? handlerWithMeta : handlerWithoutMeta;

Promise
.resolve()
.then(() => Promise.mapSeries(new Array(limit), () => instance.send(message, queueName)))
.then(() => instance.getAll(queueName, handler, options))
.then(callbackReconciler)
.catch(reject);
});
};

module.exports = assertGetAll;
2 changes: 2 additions & 0 deletions test/assertions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module.exports = {
assertPublishPromise : require('./assertPublishPromise'),
assertSend : require('./assertSend'),
assertSendPromise : require('./assertSendPromise'),
assertGetAll : require('./assertGetAll'),
assertGetAllPromise : require('./assertGetAllPromise'),
assertConvertToBuffer : require('./assertConvertToBuffer'),
assertReduceCallback : require('./assertReduceCallback'),
assertUndefinedReduceCallback : require('./assertUndefinedReduceCallback'),
Expand Down
34 changes: 30 additions & 4 deletions test/integration-callback.js
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,32 @@ describe('positive integration tests - Callback api', () => {
});
});

describe('getAll', () => {

const queueName = 'test-get-all-queue-1';
const message = { name : 'bunnybus' };

beforeEach((done) => {

instance._closeConnection(done);
});

afterEach((done) => {

instance.deleteQueue(queueName, done);
});

it('should retrieve all message without meta flag', (done) => {

Assertions.assertGetAll(instance, message, queueName, false, 10, done);
});

it('should retrieve all message with meta flag', (done) => {

Assertions.assertGetAll(instance, message, queueName, true, 10, done);
});
});

describe('publish', () => {

const queueName = 'test-publish-queue-1';
Expand Down Expand Up @@ -414,7 +440,7 @@ describe('positive integration tests - Callback api', () => {
const queueName = 'test-subscribe-queue-1';
const errorQueueName = `${queueName}_error`;
const publishOptions = { routeKey : 'a.b' };
const subscribeOptions = { meta : true };
const subscribeOptionsWithMeta = { meta : true };
const messageObject = { event : 'a.b', name : 'bunnybus' };
const messageString = 'bunnybus';
const messageBuffer = new Buffer(messageString);
Expand Down Expand Up @@ -477,7 +503,7 @@ describe('positive integration tests - Callback api', () => {
};

Async.waterfall([
instance.subscribe.bind(instance, queueName, handlers, subscribeOptions),
instance.subscribe.bind(instance, queueName, handlers, subscribeOptionsWithMeta),
instance.publish.bind(instance, messageObject)
],
(err) => {
Expand Down Expand Up @@ -521,7 +547,7 @@ describe('positive integration tests - Callback api', () => {
};

Async.waterfall([
instance.subscribe.bind(instance, queueName, handlers, subscribeOptions),
instance.subscribe.bind(instance, queueName, handlers, subscribeOptionsWithMeta),
instance.publish.bind(instance, messageString, publishOptions)
],
(err) => {
Expand Down Expand Up @@ -565,7 +591,7 @@ describe('positive integration tests - Callback api', () => {
};

Async.waterfall([
instance.subscribe.bind(instance, queueName, handlers, subscribeOptions),
instance.subscribe.bind(instance, queueName, handlers, subscribeOptionsWithMeta),
instance.publish.bind(instance, messageBuffer, publishOptions)
],
(err) => {
Expand Down
26 changes: 26 additions & 0 deletions test/integration-promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,32 @@ describe('positive integration tests - Promise api', () => {
});
});

describe('getAll', () => {

const queueName = 'test-get-all-queue-1';
const message = { name : 'bunnybus' };

beforeEach(() => {

return instance._closeConnection();
});

afterEach(() => {

return instance.deleteQueue(queueName);
});

it('should retrieve all message without meta flag', () => {

return Assertions.assertGetAllPromise(instance, message, queueName, false, 10);
});

it('should retrieve all message with meta flag', () => {

return Assertions.assertGetAllPromise(instance, message, queueName, true, 10);
});
});

describe('publish', () => {

const queueName = 'test-publish-queue-1';
Expand Down

0 comments on commit 4eeb588

Please sign in to comment.