Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getAll interface #67

Merged
merged 2 commits into from
Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ Subscribe to messages from the queue.

##### parameter(s)

- `queue` - the name of the queue to subscribe messages to. *[string]* **Required**
- `handlers` - a `key` / `handler` hash where the key reflects the name of the `message.event` or `routeKey`. And the handler reflects a `Function` as `(message, [ack, [reject, [requeue]]]) => {}`. *[Object]* **Required**
- `queue` - the name of the queue to subscribe messages to. A queue with the provided name will be created if one does not exist. *[string]* **Required**
- `handlers` - a `key` / `handler` hash where the key reflects the name of the `message.event` or `routeKey`. And the handler reflects a `Function` as `(message, [meta, [ack, [reject, [requeue]]]]) => {}`. *[Object]* **Required**
- `options` - optional settings. *[Object]* **Optional**
- `queue` - settings for the queue. [Settings](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue) are proxied through to amqplib `assertQueue`. *[Object]* **Optional**
- `globalExchange` - value of the exchange to transact through for message publishing. Defaults to one provided in the [config](#config). *[string]* **Optional**
Expand Down Expand Up @@ -584,10 +584,6 @@ Pop a message directly off a queue. The payload returned is the RabbitMQ `paylo
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const message = {
// other stuff you want to send
}

bunnyBus.get('queue1', (err, result) => {
//result contains an rabbit payload object
//JSON.tostring(result.content) will contain the message that was sent.
Expand All @@ -599,6 +595,34 @@ bunnyBus.get('queue1')
.catch((err) =>{});
```

#### `getAll(queue, handler, [options, [callback]])`

Pop all messages directly off a queue until there is no more. Handler is called for each message that is popped.

##### parameter(s)

- `queue` - the name of the queue. *[string]* **Required**
- `handler` - a handler reflects a `Function` as `(message, [meta, [ack]]) => {}`. *[Function]* **Required**
- `options` - optional settings. *[Object]* **Optional**
- `get` - [Settings](http://www.squaremobius.net/amqp.node/channel_api.html#channel_get) are proxied through to amqplib `get`. *[Object]* **Optional**
- `meta` - allows for meta data regarding the payload to be returned. Turning this on will adjust the handler to be a `Function` as `(message, meta, [ack]) => {}`. *[boolean]* **Optional**
- `callback` - node style callback `(err) => {}`. This is called when all currently retrievable items have been passed to the provided `handler`. *[Function]* **Optional**

```Javascript
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const handler = (message, ack) => {
ack(() => {});
}

bunnyBus.getAll('queue1', handler, (err) => {});

// promise api
bunnyBus.getAll('queue1', handler)
.catch((err) =>{});
```

### Internal-use Methods

The following methods are available in the public API, but manual use of them is highly discouraged.
Expand Down
48 changes: 48 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,54 @@ class BunnyBus extends EventEmitter{
}
}

getAll(queue, handler, options, callback) {

callback = Helpers.reduceCallback(options, callback);

if (callback === undefined) {
return Helpers.toPromise($.promise, $.getAll, queue, handler, options);
}

const getOptions = (options && options.get);
const meta = (options && options.meta);

let endUntil = false;

Async.until(
() => endUntil,
(cb) => {

$.get(queue, getOptions, (err, payload) => {

if (payload) {
$.logger.trace(payload);

const parsedPayload = Helpers.parsePayload(payload);

if (meta) {
handler(
parsedPayload.message,
parsedPayload.metaData,
$._ack.bind(null, payload)
);
}
else {
handler(
parsedPayload.message,
$._ack.bind(null, payload)
);
}
}
else {
endUntil = true;
}
cb(err);
});
},
(err) => callback(err)
);
}

publish(message, options, callback) {

callback = Helpers.reduceCallback(options, callback);
Expand Down
57 changes: 57 additions & 0 deletions test/assertions/assertGetAll.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict';

const Async = require('async');
const Code = require('code');
const expect = Code.expect;

const assertGetAll = (instance, message, queueName, meta, limit, callback) => {

const callbackLimit = limit + 1;
let callbackCounter = 0;

const options = {
meta
};

const callbackReconciler = () => {

if (++callbackCounter === callbackLimit) {
callback();
}
};

const handlerWithoutMeta = (sentMessage, ack) => {

expect(sentMessage).to.be.equal(message);
ack(callbackReconciler);
};

const handlerWithMeta = (sentMessage, sentMeta, ack) => {

expect(sentMessage).to.be.equal(message);
expect(sentMeta).to.exist();

ack(callbackReconciler);
};

const handler = meta ? handlerWithMeta : handlerWithoutMeta;

Async.waterfall([
(cb) => {

Async.times(
limit,
(n, next) => instance.send(message, queueName, next),
cb
);
},
(result, cb) => instance.getAll(queueName, handler, options, cb)
],
(err) => {

expect(err).to.be.null();
callbackReconciler();
});
};

module.exports = assertGetAll;
50 changes: 50 additions & 0 deletions test/assertions/assertGetAllPromise.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict';

const Promise = require('bluebird');
const Code = require('code');
const expect = Code.expect;

const assertGetAll = (instance, message, queueName, meta, limit) => {

return new Promise((resolve, reject) => {

const callbackLimit = limit + 1;
let callbackCounter = 0;

const options = {
meta
};

const callbackReconciler = () => {

if (++callbackCounter === callbackLimit) {
resolve();
}
};

const handlerWithoutMeta = (sentMessage, ack) => {

expect(sentMessage).to.be.equal(message);
ack(callbackReconciler);
};

const handlerWithMeta = (sentMessage, sentMeta, ack) => {

expect(sentMessage).to.be.equal(message);
expect(sentMeta).to.exist();

ack(callbackReconciler);
};

const handler = meta ? handlerWithMeta : handlerWithoutMeta;

Promise
.resolve()
.then(() => Promise.mapSeries(new Array(limit), () => instance.send(message, queueName)))
.then(() => instance.getAll(queueName, handler, options))
.then(callbackReconciler)
.catch(reject);
});
};

module.exports = assertGetAll;
2 changes: 2 additions & 0 deletions test/assertions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module.exports = {
assertPublishPromise : require('./assertPublishPromise'),
assertSend : require('./assertSend'),
assertSendPromise : require('./assertSendPromise'),
assertGetAll : require('./assertGetAll'),
assertGetAllPromise : require('./assertGetAllPromise'),
assertConvertToBuffer : require('./assertConvertToBuffer'),
assertReduceCallback : require('./assertReduceCallback'),
assertUndefinedReduceCallback : require('./assertUndefinedReduceCallback'),
Expand Down
34 changes: 30 additions & 4 deletions test/integration-callback.js
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,32 @@ describe('positive integration tests - Callback api', () => {
});
});

describe('getAll', () => {

const queueName = 'test-get-all-queue-1';
const message = { name : 'bunnybus' };

beforeEach((done) => {

instance._closeConnection(done);
});

afterEach((done) => {

instance.deleteQueue(queueName, done);
});

it('should retrieve all message without meta flag', (done) => {

Assertions.assertGetAll(instance, message, queueName, false, 10, done);
});

it('should retrieve all message with meta flag', (done) => {

Assertions.assertGetAll(instance, message, queueName, true, 10, done);
});
});

describe('publish', () => {

const queueName = 'test-publish-queue-1';
Expand Down Expand Up @@ -414,7 +440,7 @@ describe('positive integration tests - Callback api', () => {
const queueName = 'test-subscribe-queue-1';
const errorQueueName = `${queueName}_error`;
const publishOptions = { routeKey : 'a.b' };
const subscribeOptions = { meta : true };
const subscribeOptionsWithMeta = { meta : true };
const messageObject = { event : 'a.b', name : 'bunnybus' };
const messageString = 'bunnybus';
const messageBuffer = new Buffer(messageString);
Expand Down Expand Up @@ -477,7 +503,7 @@ describe('positive integration tests - Callback api', () => {
};

Async.waterfall([
instance.subscribe.bind(instance, queueName, handlers, subscribeOptions),
instance.subscribe.bind(instance, queueName, handlers, subscribeOptionsWithMeta),
instance.publish.bind(instance, messageObject)
],
(err) => {
Expand Down Expand Up @@ -521,7 +547,7 @@ describe('positive integration tests - Callback api', () => {
};

Async.waterfall([
instance.subscribe.bind(instance, queueName, handlers, subscribeOptions),
instance.subscribe.bind(instance, queueName, handlers, subscribeOptionsWithMeta),
instance.publish.bind(instance, messageString, publishOptions)
],
(err) => {
Expand Down Expand Up @@ -565,7 +591,7 @@ describe('positive integration tests - Callback api', () => {
};

Async.waterfall([
instance.subscribe.bind(instance, queueName, handlers, subscribeOptions),
instance.subscribe.bind(instance, queueName, handlers, subscribeOptionsWithMeta),
instance.publish.bind(instance, messageBuffer, publishOptions)
],
(err) => {
Expand Down
26 changes: 26 additions & 0 deletions test/integration-promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,32 @@ describe('positive integration tests - Promise api', () => {
});
});

describe('getAll', () => {

const queueName = 'test-get-all-queue-1';
const message = { name : 'bunnybus' };

beforeEach(() => {

return instance._closeConnection();
});

afterEach(() => {

return instance.deleteQueue(queueName);
});

it('should retrieve all message without meta flag', () => {

return Assertions.assertGetAllPromise(instance, message, queueName, false, 10);
});

it('should retrieve all message with meta flag', () => {

return Assertions.assertGetAllPromise(instance, message, queueName, true, 10);
});
});

describe('publish', () => {

const queueName = 'test-publish-queue-1';
Expand Down