Skip to content

Commit

Permalink
Merge pull request #5 from reneweb/master
Browse files Browse the repository at this point in the history
Add option to restrict queue size
  • Loading branch information
madbence committed Jul 4, 2016
2 parents 0ef7120 + 0fd1ed0 commit fa64456
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 5 deletions.
8 changes: 5 additions & 3 deletions README.md
Expand Up @@ -26,7 +26,7 @@ var logstash = new Logstash({
port: 13333
});
logstash.send(message [, callback]);
```
```

## API

Expand All @@ -37,6 +37,7 @@ Takes the following parameters:
* **host**: remote hostname
* **port**: remote port
* **format** (optional): formatter function (by default the message gets JSON.stringified)
* **maxQueueSize** (optional): Restricts the amount of messages queued when there is no connection. If not specified the queue is not limited in size.

Example:

Expand All @@ -49,7 +50,8 @@ new Client({
message.formattedAt = new Date();
message.password = '!FILTERED!';
return JSON.stringify(message, null, 2);
}
},
maxQueueSize: 1000
});
```

Expand All @@ -58,7 +60,7 @@ new Client({
Takes the following parameters:

* **message**: an object what you are trying to send to your logstash instance
* **callback** (optional): a function called when the message has been sent
* **callback** (optional): a function called when the message has been sent

Example:

Expand Down
6 changes: 5 additions & 1 deletion lib/transports/transport.js
Expand Up @@ -25,6 +25,10 @@ Transport.prototype.send = function send(message, cb) {
message: message,
cb: cb
});

if(this.queue.length > this.options.maxQueueSize) {
this.queue.shift();
}
};

Transport.prototype._send = function _send() {
Expand All @@ -39,4 +43,4 @@ Transport.prototype.dequeue = function dequeue() {
}
};

module.exports = Transport;
module.exports = Transport;
21 changes: 20 additions & 1 deletion test/transports.js
Expand Up @@ -9,6 +9,25 @@ test('Offline support for transport', function(t) {
t.end();
});

test('Maximum queue size when offline', function(t) {
var transport = new (Transports.Transport)({maxQueueSize: 1});
transport.send('foo');
transport.send('foo1');
transport.send('foo2');
t.equal(transport.queue.length, 1, 'should not exceed maxQueueSize');
t.equal(transport.queue[0].message, 'foo2', 'should store message');
t.end();
});

test('Unlimited queue size when offline', function(t) {
var transport = new (Transports.Transport)();
for(var i = 0; i < 10000; i++) {
transport.send('foo');
}
t.equal(transport.queue.length, 10000, 'should include all messages');
t.end();
});

test('Send messages if transport connected', function(t) {
var transport = new (Transports.Transport)();
transport.connected = true;
Expand All @@ -18,4 +37,4 @@ test('Send messages if transport connected', function(t) {
};

transport.send('foo-bar');
});
});

0 comments on commit fa64456

Please sign in to comment.