Skip to content

Commit

Permalink
chunking pt 2
Browse files Browse the repository at this point in the history
  • Loading branch information
robertkowalski committed Jul 19, 2012
1 parent 581f3b9 commit 7e0695d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 22 deletions.
50 changes: 39 additions & 11 deletions gelf.js
@@ -1,7 +1,8 @@
var deflate = require('zlib').deflate, var deflate = require('zlib').deflate,
dgram = require('dgram'), dgram = require('dgram'),
EventEmitter = require('events').EventEmitter, EventEmitter = require('events').EventEmitter,
os = require('os'); os = require('os'),
crypto = require('crypto');


var Gelf = function(config) { var Gelf = function(config) {
var self = this; var self = this;
Expand All @@ -11,7 +12,7 @@ var Gelf = function(config) {
graylogPort: 12201, graylogPort: 12201,
graylogHostname: '127.0.0.1', graylogHostname: '127.0.0.1',
connection: 'wan', connection: 'wan',
maxChunkSizeWan: 1420, maxChunkSizeWan: 10,
maxChunkSizeLan: 8154 maxChunkSizeLan: 8154
}; };
} else { } else {
Expand Down Expand Up @@ -79,7 +80,7 @@ Gelf.prototype.compress = function(message, callback) {
}); });
}; };


Gelf.prototype.sendSingleMessage = function(message) { Gelf.prototype.sendMessage = function(message) {
var self = this, var self = this,
client = dgram.createSocket('udp4'); client = dgram.createSocket('udp4');


Expand All @@ -98,35 +99,62 @@ Gelf.prototype.processMessage = function(buffer) {


if (config.connection === 'wan') { if (config.connection === 'wan') {
if (chunkSize > config.maxChunkSizeWan) { if (chunkSize > config.maxChunkSizeWan) {
self.prepareMultipleChunks(buffer, config.maxChunkSizeWan, self.sendMultipleChunks); self.processMultipleChunks(buffer, config.maxChunkSizeWan);
return; return;
} }
} else if (self.config.connection === 'lan') { } else if (self.config.connection === 'lan') {
if (chunkSize > config.maxChunkSizeLan) { if (chunkSize > config.maxChunkSizeLan) {
self.prepareMultipleChunks(buffer, config.maxChunkSizeLan, self.sendMultipleChunks); self.processMultipleChunks(buffer, config.maxChunkSizeLan);
return; return;
} }
} }
self.sendSingleMessage(buffer); self.sendMessage(buffer);
}; };


Gelf.prototype.prepareMultipleChunks = function(buffer, chunkSize, callback) { Gelf.prototype.processMultipleChunks = function(buffer, chunkSize) {
var self = this;

var chunkArray = self.prepareMultipleChunks(buffer, chunkSize);
var datagrams = self.prepareDatagrams(chunkArray);

self.sendMultipleMessages(datagrams);
};

Gelf.prototype.prepareMultipleChunks = function(buffer, chunkSize) {
var chunkArray = [], var chunkArray = [],
index; index;


for (index = 0; index < buffer.length; index += chunkSize) { for (index = 0; index < buffer.length; index += chunkSize) {
chunkArray.push(buffer.slice(index, index + chunkSize)); chunkArray.push(Array.prototype.slice.call(buffer, index, index + chunkSize));
} }


callback && callback(chunkArray); return chunkArray;
};

Gelf.prototype.prepareDatagrams = function(chunkArray) {
var self = this,
count = chunkArray.length,
gelfMagicNumber = [0x1e, 0x0f],
msgId = self.getMessageId(),
datagrams = [];

chunkArray.forEach(function(chunk, index) {
datagrams[index] = new Buffer(gelfMagicNumber.concat(msgId, index, count, chunk));
});

return datagrams;
}; };


Gelf.prototype.getMessageId = function() { Gelf.prototype.getMessageId = function() {
return new Date().getTime() * 1000 + '' + Math.ceil(10000 * Math.random()); return Array.prototype.slice.call(crypto.randomBytes(8));
}; };


Gelf.prototype.sendMultipleChunks = function() { Gelf.prototype.sendMultipleMessages = function(datagrams) {
var self = this;


datagrams.forEach(function(buffer) {
self.sendSingleMessage(buffer);
});
}; };


module.exports = Gelf; module.exports = Gelf;
5 changes: 1 addition & 4 deletions test.js
Expand Up @@ -2,8 +2,5 @@ Gelf = require('./gelf');


var gelf = new Gelf(null); var gelf = new Gelf(null);


var message = {


}; gelf.emit('gelf.log', "wahwah");

gelf.emit('gelf.log', message);
20 changes: 13 additions & 7 deletions test/test.gelf.js
Expand Up @@ -36,7 +36,7 @@ describe('Gelf', function(done) {
server.bind(graylogStdPort); server.bind(graylogStdPort);


var gelf = new Gelf(null); var gelf = new Gelf(null);
gelf.sendSingleMessage(new Buffer('bar')); gelf.sendMessage(new Buffer('bar'));
}); });


it('should emit and receive events with attached event listeners', function(done) { it('should emit and receive events with attached event listeners', function(done) {
Expand Down Expand Up @@ -116,20 +116,26 @@ describe('Gelf', function(done) {
expect(stub).to.have.been.calledOnce; expect(stub).to.have.been.calledOnce;
done(); done();
}); });

sinon.stub(gelf, 'prepareDatagrams');
sinon.stub(gelf, 'sendMessage');
sinon.stub(gelf, 'sendMultipleMessages');

gelf.emit('gelf.log', 'mehgssssssggggggggguiguguigiugigigiugigigigig'); gelf.emit('gelf.log', 'mehgssssssggggggggguiguguigiugigigiugigigigig');
}); });


it('prepares chunks according to the given chunksize', function() { it('prepares chunks according to the given chunksize', function() {
var gelf = new Gelf(); var gelf = new Gelf();


var callback = sinon.spy(); expect(gelf.prepareMultipleChunks('123456789', 2)).to.deep.equal([['1', '2'], ['3', '4'], ['5', '6'], ['7', '8'], ['9']]);
var secondCallback = sinon.spy(); expect(gelf.prepareMultipleChunks('1234567890', 2)).to.deep.equal([['1', '2'], ['3', '4'], ['5', '6'], ['7', '8'], ['9', '0']]);
});


gelf.prepareMultipleChunks('123456789', 2, callback); it('returns an Array from the buffer when calling getMessageId()', function() {
gelf.prepareMultipleChunks('1234567890', 2, secondCallback); var gelf = new Gelf();


expect(callback).to.have.been.calledWith(['12', '34', '56', '78', '9']); expect(Array.isArray(gelf.getMessageId())).to.be.ok;
expect(secondCallback).to.have.been.calledWith(['12', '34', '56', '78', '90']); expect(gelf.getMessageId().length).to.deep.equal(8);
}); });


}); });

0 comments on commit 7e0695d

Please sign in to comment.