Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

first commit

  • Loading branch information...
commit ebac5e5f4a594efcf77ca4140af031840402517a 0 parents
Damodharan Rajalingam authored
23 LICENSE
@@ -0,0 +1,23 @@
+(The MIT License)
+
+Copyright(c) 2012 Yahoo! Inc. All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+Author(s): Damodharan Rajalingam (damu at yahoo-inc dot com)
146 README.md
@@ -0,0 +1,146 @@
+# GELF Receiver
+
+This is a NodeJS module to implement a [GELF][1] receiver. The module can
+currently handle following kinds of GELF events.
+
+* Uncompresssed
+* GZIPed
+* ZLIBed
+* Chunked - The chunked message can again be any of the three formats above
+
+# API
+
+To create a GELF receiver one must `require('gelfr')`,then call the
+createGELFServer() to create a server instance, subscribe to *'message'*
+event and start the server.
+
+ var gelfr = require('gelfr');
+ var s = gelfr.createGELFServer();
+ s.on('message', function (msg) { console.log(JSON.stringify(msg) });
+ s.start();
+
+## Functions
+
+### createGELFServer(options)
+
+Creates a new GELF receiver. The *options* argument is optional and can
+have following fields:
+
+* *proto* - udp4 or udp6. (Default: *udp4*)
+* *port* - port to listen for events. Default is *0* i.e. any random port
+ available for listening
+* *addr* - address to bind to. (Default: *'0.0.0.0'*)
+* *listener* - Listener function for *message* events. None by default
+* *notifyBadMsg* - true or false. If set emits badmessage event when
+ invalid messages arrive. (Default: *false*)
+* *chunkTTL* - TTL for reconstruction of chunked messages in seconds. If
+ all parts of a chunked message does not arrive within this time it will be
+ dropped. (Default: *2 seconds*)
+* *parseJSON* - ture or false - Set whether the message should be parsed as
+ into a JSON object. (Default: *false*)
+
+## GELF Server Functions
+
+`createGELFServer(options)` returns a receiver object. The receiver object
+has the following functions asssociated with it.
+
+### start()
+
+Start the server
+
+### stop()
+
+Stop the server
+
+### address()
+
+Return the address of the server. Returns the IP address and port to which
+the receiver is listening to.
+
+### parseJSON(flag)
+
+If no flag is passed returns whether the received message will be parsed
+into an object or not. If the flag is passed (true or false) then it
+enables/disables parsing of received message to an object.
+
+## Events
+
+GELF Receiver emits following events
+
+### ready
+
+`function () { }`
+
+Emitted when server is ready to receive packets.
+
+### end
+
+`function () { }`
+
+Emitted when server has stopped and will no more receive any packets and
+will not emit any more events.
+
+### message
+
+`function (msg) { }`
+
+Emitted when a new message is received. For chunked messages this event
+will be emitted once all the chunks have been received and the message is
+reassembled. The *msg* argument passed is an object with following
+properties.
+
+ {
+ compression: 'GZIP' or 'ZLIB' or 'NONE', /* compression method */
+ chunked: true or false, /* whether the message was chunked */
+ chunks: <int>, /* number of chunks. for non-chunked messages 1 */
+ data: 'utf8-string' or Parsed-json-object, /* payload */
+ sender: 'ip-address', /* ip adress string of the sender */
+ }
+
+### error
+
+`function (err) { }`
+
+Emitted when there is some error.
+
+### badmessage
+
+`function (err) { }`
+
+Emitted when a bad/invalid message is received and notifyBadMsg option was
+set to *true* when creating the listener. The *err* object looks like below:
+
+ {
+ error: "error_string",
+ data: Binary_data, /* the data packet that caused error */
+ sender: 'ip-address', /* ip address of the sender */
+ }
+
+[1]: https://github.com/Graylog2/graylog2-docs/wiki/GELF "GELF Format"
+
+# License
+(The MIT License)
+
+Copyright(c) 2012 Yahoo! Inc. All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+# Authors
+
+* Damodharan Rajalingam (damu at yahoo-inc dot com)
53 examples/gelf-receiver.js
@@ -0,0 +1,53 @@
+/*
+Copyright(c) 2012 Yahoo! Inc. All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+Author(s): Damodharan Rajalingam (damu at yahoo-inc dot com)
+*/
+
+var g = require('../index');
+
+s = g.createGELFServer({
+ port: 11211, /* Port to listen for GELF events. Optional. Default: 0 i.e any random port*/
+ addr: "0.0.0.0", /* Address to bing to. Optional. Default: 0.0.0.0 (all addresses) */
+ proto: "udp4" /* udp4 or udp6. Optional. Default: udp4 */
+ });
+
+s.on('message', function(msg) {
+ console.log("Received : " + JSON.stringify(msg));
+ });
+
+s.on('ready', function() {
+ console.log("Server started to listen for packets");
+ });
+
+s.on('error', function(err) {
+ console.log("Error: " + JSON.stringify(err));
+ });
+
+s.on('end', function() {
+ console.log("Server has stopped");
+ });
+
+process.on('SIGINT', function() {
+ s.stop();
+ });
+
+s.start();
227 index.js
@@ -0,0 +1,227 @@
+/*
+Copyright(c) 2012 Yahoo! Inc. All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+Author(s): Damodharan Rajalingam (damu at yahoo-inc dot com)
+*/
+var dgram = require("dgram"),
+ zlib = require("zlib"),
+ util = require("util"),
+ EventEmitter = require("events").EventEmitter,
+ ttlStore = require("ttlStore").ttlStore;
+
+var DEFAULT_PORT = 0;
+var DEFAULT_ADDR = "0.0.0.0";
+var DEFAULT_CHUNK_TTL = 5;
+var DEFAULT_JSON_PARSE = false;
+
+function concatBuffers(bufs, totalBytes) {
+ var data = new Buffer(totalBytes);
+ var totalChunks = bufs.length;
+ var offset=0;
+// console.log("Total chunks: " + totalChunks);
+ for(i=0; i<totalChunks; i++) {
+ chunk=bufs[i];
+// console.log("Copying chunk of size " + chunk.length + " at offset " + offset);
+ chunk.copy(data, offset);
+ offset += chunk.length;
+ }
+ return data;
+}
+
+function gelfServer(options) {
+ if(!(this instanceof gelfServer)) return new gelfServer(options);
+ if(!options) options={};
+ var self = this;
+ self._port = options.port || DEFAULT_PORT;
+ self._addr = options.addr || DEFAULT_ADDR;
+ self._shouldNotifyBadMsg = options.notifyBadMsg || false;
+ self._chunkTTL = (options.chunkTTL && options.chunkTTL > 0) ? options.chunkTTL : DEFAULT_CHUNK_TTL;
+ self._chunkedMessages = new ttlStore({ttl: self._chunkTTL});
+ self._parseJSON = (options.parseJSON === true ? true : false);
+
+ var proto = (options.hasOwnProperty('proto') && /udp[46]/.test(options.proto) ? options.proto : 'udp4');
+ if(options.listener)
+ self.addListener('message', listener);
+ var sock = dgram.createSocket(proto);
+ sock.on('message', function(msg, r) {
+ self._handleMessage(msg, r);
+ });
+ sock.on('listening', function() { self.emit('ready') });
+ sock.on('close', function() { self.emit('end') });
+ sock.on('error', function(err) { throw err });
+ self.sock = sock;
+}
+
+util.inherits(gelfServer, EventEmitter);
+
+gelfServer.prototype.start = function() {
+ this.sock.bind(this._port, this._addr);
+}
+
+gelfServer.prototype.stop = function() {
+ this.sock.close();
+ this._chunkedMessages.close();
+ this._chunkedMessages = undefined;
+}
+
+gelfServer.prototype.address = function() {
+ return this.sock.address();
+}
+
+gelfServer.prototype._notifyBadMsg = function(err, data, sender) {
+ if(this._shouldNotifyBadMsg === true) {
+ this.emit('badmessage', { 'error': err, 'data': data.toString('base64'), 'sender': sender });
+ }
+}
+
+gelfServer.prototype._handleChunkedData = function (msg, r, chunked, chunks) {
+ var self = this;
+ var msgId = msg.slice(2,10).toString('base64') + ":" + r.address; //msgId in packet + remote address form the key
+ var seq = msg[10];
+ var total = msg[11];
+ var Chunk = msg.slice(12);
+ var chunkedMessages = self._chunkedMessages;
+ if(chunked) {
+ // This is a message recontructed from chunked messages
+ // i.e. a chunked message inside another chunked message
+ self._chunkedMessages.remove(msgId);
+ this._notifyBadMsg('Chunked message inside a chunked message', msg, r);
+ return null;
+ }
+// console.log("Received Chunk: " + (seq+1) + "/ " + total + " of msg " + msgId);
+ if(chunkedMessages.has(msgId)) {
+ var cmsg = chunkedMessages.get(msgId);
+ // Part of an already read chunked message
+ if(!cmsg.chunks[seq]) {
+ cmsg.chunksReceived += 1;
+ cmsg.chunks[seq] = Chunk;
+ cmsg.totalLen += Chunk.length;
+ }
+
+ //console.log(JSON.stringify(cmsg));
+ if(cmsg.totalChunks == cmsg.chunksReceived) {
+ // console.log("Complete message for msg:" + msgId + " received");
+ fullMsg=concatBuffers(cmsg.chunks, cmsg.totalLen);
+// console.log(fullMsg);
+ process.nextTick(function() {
+ self._handleMessage(fullMsg, r, true, cmsg.totalChunks);
+ self._chunkedMessages.remove(msgId);
+ });
+ }
+ } else {
+ // New chunked message
+// console.log("Received new chunked message: " + msgId);
+ var newChunk = {
+ "totalChunks" : total,
+ "chunksReceived" : 1,
+ "chunks" : new Array(),
+ "totalLen" : 0
+ };
+ newChunk.chunks[seq] = Chunk;
+ newChunk.totalLen += Chunk.length;
+ chunkedMessages.put(msgId, newChunk);
+ }
+}
+
+gelfServer.prototype._handleZlibData = function(msg, r, chunked, chunks) {
+ var self = this;
+ zlib.unzip(msg, function(err,data) {
+ if(!err) {
+ self.emit('message',
+ {
+ 'data': self._decodeMsg(data.toString('utf-8')),
+ 'compression': 'ZLIB',
+ 'chunked': (chunked ? true : false),
+ 'chunks': (chunked && chunks ? chunks : 1),
+ 'sender': r.address
+ });
+ } else {
+ self._notifyBadMsg("Unzip Failed: " + err, msg, r);
+ }
+ });
+}
+
+gelfServer.prototype._handleGzipData = function(msg, r, chunked, chunks) {
+ var self = this;
+ zlib.gunzip(msg, function(err,data) {
+ if(!err) {
+ self.emit('message',
+ {
+ 'data': self._decodeMsg(data.toString('utf-8')),
+ 'compression': 'GZIP',
+ 'chunked': (chunked ? true : false),
+ 'chunks': (chunked && chunks ? chunks : 1),
+ 'sender': r.address
+ });
+ } else {
+ self._notifyBadMsg("GUnzip Failed: " + err, msg, r);
+ }
+ });
+}
+
+gelfServer.prototype._handleUncompressedData = function(msg, r, chunked, chunks) {
+ var self = this;
+ self.emit('message',
+ {
+ 'data': self._decodeMsg(msg.slice(2).toString('utf-8')),
+ 'compression': 'NONE',
+ 'chunked': (chunked ? true : false),
+ 'chunks': (chunked && chunks ? chunks : 1),
+ 'sender': r.address
+ });
+}
+
+gelfServer.prototype._handleMessage = function(msg, r, chunked, chunks) {
+ if(msg[0] == 0x78 && msg[1] == 0x9c) { // ZLib-ed message
+ this._handleZlibData(msg, r, chunked, chunks);
+ } else if(msg[0] == 0x1f && msg[1] == 0x8b) { // Gzipped message
+ this._handleGzipData(msg, r, chunked, chunks);
+ } else if(msg[0] == 0x1e && msg[1] == 0x0f) { // Chunked message
+ this._handleChunkedData(msg,r, chunked, chunks);
+ } else if(msg[0] == 0x1f && msg[1] == 0x3c) { //Uncompressed
+ this._handleUncompressedData(msg, r, chunked, chunks);
+ } else {
+ this._notifyBadMsg("Invalid magic number", msg, r);
+ }
+}
+
+gelfServer.prototype.parseJSON = function (flag) {
+ if(flag == undefined) {
+ return self._parseJSON;
+ }
+ if(flag === true || flag === false) {
+ self._parseJSON = flag;
+ }
+}
+
+gelfServer.prototype._decodeMsg = function (msg) {
+ if(this._parseJSON == true) {
+ return JSON.parse(msg);
+ } else {
+ return msg;
+ }
+}
+
+function createGELFServer(options) {
+ return new gelfServer(options);
+}
+
+exports.createGELFServer = createGELFServer;
82 node_modules/ttlStore/index.js
@@ -0,0 +1,82 @@
+/*
+Copyright(c) 2012 Yahoo! Inc. All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+Author(s): Damodharan Rajalingam (damu at yahoo-inc dot com)
+*/
+
+function reaper(bins, reap_interval) {
+ var oldBin = bins.pop();
+ delete oldBin; //delete the really-old bin
+ bins.unshift({}); //push a new 'new' bin
+}
+
+function ttlStore(options) {
+ this._ttl = options['ttl'] || 0;
+ this._bins = [
+ {}, // new
+ {}, // old
+ {} // really old
+ ];
+ if(this._ttl && this._ttl > 0) {
+ this._reap_interval = Math.ceil((this._ttl * 1000)/2);
+ var self=this;
+ this._reaper = setInterval(function() { reaper(self._bins, self._reap_interval); }, self._reap_interval);
+ }
+}
+
+ttlStore.prototype.put = function(key, val) {
+ this._bins[0][key] = val;
+ delete this._bins[1][key];
+ delete this._bins[2][key];
+}
+
+ttlStore.prototype.get = function(key) {
+ if(this._bins[0].hasOwnProperty(key)) return this._bins[0][key];
+ if(this._bins[1].hasOwnProperty(key))return this._bins[1][key];
+ return undefined;
+}
+
+ttlStore.prototype.remove = function(key) {
+ if(this._bins[0].hasOwnProperty(key)) {
+ var res = this._bins[0][key];
+ delete this._bins[0][key];
+ delete this._bins[1][key];
+ return res;
+ }
+ if(this._bins[1].hasOwnProperty(key)) {
+ var res = this._bins[1][key];
+ delete this._bins[1][key];
+ return res;
+ }
+ return undefined;
+}
+
+ttlStore.prototype.close = function() {
+ clearInterval(this._reaper);
+ delete this._bins;
+}
+
+ttlStore.prototype.has = function(key) {
+ if(!key) return false;
+ return (this._bins[0].hasOwnProperty(key) || this._bins[1].hasOwnProperty(key));
+}
+
+exports.ttlStore = ttlStore;
17 package.json
@@ -0,0 +1,17 @@
+{
+ "author": "Damodharan Rajalingam <damu@yahoo-inc.com>",
+ "name": "gelfr",
+ "description": "Node package to receive GELF events",
+ "version": "0.0.1",
+ "keywords": [ "gelf", "logging", "server", "monitoring" ],
+ "scripts": {
+ "test": "node test/gelfr-tests.js"
+ }
+ "main": "index.js",
+ "dependencies": {},
+ "devDependencies": {},
+ "optionalDependencies": {},
+ "engines": {
+ "node": ">=0.6"
+ }
+}
218 test/gelfr-tests.js
@@ -0,0 +1,218 @@
+/*
+Copyright(c) 2012 Yahoo! Inc. All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+Author(s): Damodharan Rajalingam (damu at yahoo-inc dot com)
+*/
+
+var zlib = require('zlib'),
+ dgram = require('dgram'),
+ os = require('os'),
+ assert=require('assert'),
+ gs = require('../index');
+
+/*
+ Test information
+*/
+var recv_test_results = {
+ 'GZIP': '',
+ 'ZLIB': '',
+ 'NONE': '',
+ 'chunked-GZIP': '',
+ 'chunked-ZLIB': '',
+ 'chunked-NONE': ''
+};
+
+/* Set the event to true when the event fires */
+var event_test_results = {
+ 'ready': false,
+ 'end': false,
+ 'badmessage': false
+ // 'message' event is covered by data receive tests
+}
+
+/* Number of messages received */
+var messages_count = 0;
+
+var data = {
+ "host": os.hostname(),
+ "version":"1.0",
+ "timestamp": Math.floor(+new Date() / 1000),
+ "facility":"gelf-tester",
+ "short_message":"This is a test message to test the gelfr module."
+};
+
+var json_data = JSON.stringify(data);
+
+/*
+ Utility functions
+*/
+
+/* Function to send multiple UDP packets */
+function sendUDPPackets(dataArray, port, addr) {
+ if(dataArray.length != 0) {
+ data = dataArray.shift();
+ client.send(data, 0, data.length, port, addr, function (err,bytes) {
+ if(!err) { sendUDPPackets(dataArray, port, addr); }
+ });
+ }
+}
+
+/* Function to chunk messages */
+function getChunks(arr,msg, id_str, csize, max_chunks) {
+ var chunk_size = csize || 100;
+ var total_chunks=Math.ceil(msg.length/chunk_size);
+ var chunks_to_send = Math.min(total_chunks, max_chunks||Infinity);
+ var msg_id=new Buffer(id_str+(+new Date()));
+ var data=new Buffer(msg);
+ var i=0;
+ var rem=data.length;
+ var dArr = [];
+ while(i<chunks_to_send) {
+ var cbuf;
+ if(rem > chunk_size) {
+ cbuf = new Buffer(12+chunk_size);
+ data.copy(cbuf,12,i*chunk_size, (i+1)*chunk_size);
+ } else {
+ cbuf = new Buffer(12+rem);
+ data.copy(cbuf,12,i*chunk_size, i*chunk_size+rem);
+ }
+ cbuf[0]=0x1e;
+ cbuf[1]=0x0f;
+ cbuf[11]=total_chunks;
+ cbuf[10]=i; // sequence number
+ msg_id.copy(cbuf,2,0,8);
+ arr.push(cbuf);
+ i++;
+ rem -= chunk_size;
+ }
+}
+
+function assertResults() {
+ /* Test if all message types have been received successfully */
+ for(var test in recv_test_results) {
+ console.log("Testing message type: " + test);
+ assert.equal(recv_test_results[test], json_data, "Data received for " + test + " is incorrect");
+ }
+
+ console.log("Check if all messages were received and no invalid messages delivered");
+ assert.equal(messages_count, 6, "Invalid message count");
+
+ /* Test if events fired correctly */
+ for(var e in event_test_results) {
+ console.log("Testing event: " + e);
+ assert.equal(event_test_results[e], true, "Event " + e + " was not triggered");
+ }
+ console.log("All tests OK");
+}
+
+/*
+ Initialize the server
+*/
+var server = gs.createGELFServer({
+ 'notifyBadMsg' : true,
+ 'addr': '127.0.0.1',
+ 'port': 11212,
+ 'chunkTTL': 2
+});
+
+server.on('ready', function() {
+ var addr = server.address();
+ console.log("Server started successfully. (Listening on " + addr.address + ":" + addr.port + ")");
+ event_test_results.ready = true;
+ });
+
+server.on('end', function() {
+ console.log("Server shutdown successfully");
+ event_test_results.end = true;
+ });
+
+server.on('badmessage', function(err) {
+ console.log("Bad message received");
+ event_test_results.badmessage = true;
+ });
+
+server.on('message', function (msg) {
+ var test_type = (msg.chunked ? 'chunked-' : '') + msg.compression;
+ console.log("Received message of type " + test_type);
+ recv_test_results[test_type] = msg.data;
+ console.log("Received message: " + msg.data);
+ messages_count++;
+ });
+
+server.start();
+
+/*
+ Client setup and send test data
+*/
+var client = dgram.createSocket('udp4');
+var server_addr = '127.0.0.1';
+var server_port = server.address().port;
+var arr=[];
+var completed=0;
+var total_types = 3; // ZLIB, GZIP and UNCOMPRESSED
+/* ZLIB data */
+zlib.deflate(json_data, function(err,data) {
+ if(!err) {
+ client.send(data, 0,data.length, server_port, server_addr);
+ getChunks(arr,data,'zlib-chunked');
+ completed++;
+ if(completed == total_types) { sendUDPPackets(arr,server_port,server_addr); }
+ } else {
+ console.log("Error ZLIB-ing data: " + err);
+ }
+ });
+
+/* GZIP data */
+zlib.gzip(json_data, function(err,data) {
+ if(!err) {
+ client.send(data, 0, data.length, server_port, server_addr);
+ getChunks(arr,data,'gzip-chunked');
+ completed++;
+ if(completed == total_types) { sendUDPPackets(arr,server_port,server_addr); }
+ } else {
+ console.log("Error GZIP-ing data: " + err);
+ }
+ });
+
+/* Uncompressed data */
+var udata = new Buffer('00' + json_data);
+udata[0]=0x1f;
+udata[1]=0x3c;
+client.send(udata, 0, udata.length, server_port, server_addr);
+getChunks(arr,udata,'none-chunked');
+// Send an incomplete chunk message by restricting max_chunks to 1
+getChunks(arr,udata,'incomplete-chunked',100,1);
+completed++;
+if(completed == total_types) { sendUDPPackets(arr,server_port,server_addr); }
+
+/* Bad data */
+var bdata = new Buffer(udata.length);
+udata.copy(bdata);
+bdata[0]=0;
+client.send(bdata, 0, bdata.length, server_port, server_addr);
+
+/* Stop the server after waiting for 1 sec */
+setTimeout( function() {
+ server.stop();
+ client.close();
+ assertResults();
+ }, 2000);
+
Please sign in to comment.
Something went wrong with that request. Please try again.