Skip to content

Commit

Permalink
Merge branch 'master' into selective-iface
Browse files Browse the repository at this point in the history
  • Loading branch information
bluesmoon committed Feb 7, 2012
2 parents c2ab6c9 + c9d827a commit c6c3ef1
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 20 deletions.
22 changes: 19 additions & 3 deletions README.md
Expand Up @@ -10,7 +10,7 @@ Concepts
--------

* *buckets*
Each stat is in it's own "bucket". They are not predefined anywhere. Buckets can be named anything that will translate to Graphite (periods make folders, etc)
Each stat is in its own "bucket". They are not predefined anywhere. Buckets can be named anything that will translate to Graphite (periods make folders, etc)

* *values*
Each stat will have a value. How it is interpreted depends on modifiers
Expand All @@ -23,15 +23,15 @@ Counting

gorets:1|c

This is a simple counter. Add 1 to the "gorets" bucket. It stays in memory until the flush interval.
This is a simple counter. Add 1 to the "gorets" bucket. It stays in memory until the flush interval `config.flushInterval`.


Timing
------

glork:320|ms

The glork took 320ms to complete this time. StatsD figures out 90th percentile, average (mean), lower and upper bounds for the flush interval.
The glork took 320ms to complete this time. StatsD figures out 90th percentile, average (mean), lower and upper bounds for the flush interval. The percentile threshold can be tweaked with `config.percentThreshold`.

Sampling
--------
Expand All @@ -40,6 +40,16 @@ Sampling

Tells StatsD that this counter is being sent sampled every 1/10th of the time.

Debugging
---------

There are additional config variables available for debugging:

* `debug` - log exceptions and periodically print out information on counters and timers
* `debugInterval` - interval for printing out information on counters and timers
* `dumpMessages` - print debug info on incoming messages

For more information, check the `exampleConfig.js`.

Guts
----
Expand Down Expand Up @@ -92,6 +102,12 @@ Installation and Configuration

node stats.js /path/to/config

Tests
-----

A test framework has been added using node-unit and some custom code to start and manipulate statsd. Please add tests under test/ for any new features or bug fixes encountered. Testing a live server can be tricky, attempts were made to eliminate race conditions but it may be possible to encounter a stuck state. If doing dev work, a `killall node` will kill any stray test servers in the background (don't do this on a production machine!).

Tests can be executd with `./run_tests.sh`.

Inspiration
-----------
Expand Down
2 changes: 1 addition & 1 deletion config.js
Expand Up @@ -14,7 +14,7 @@ var Configurator = function (file) {
if (err) { throw err; }
old_config = self.config;

self.config = process.compile('config = ' + data, file);
self.config = eval('config = ' + fs.readFileSync(file));
self.emit('configChanged', self.config);
});
};
Expand Down
26 changes: 25 additions & 1 deletion exampleConfig.js
@@ -1,6 +1,30 @@
/*
Required Variables:
port: StatsD listening port [default: 8125]
Graphite Required Variables:
(Leave these unset to avoid sending stats to Graphite.
Set debug flag and leave these unset to run in 'dry' debug mode -
useful for testing statsd clients without a Graphite server.)
graphiteHost: hostname or IP of Graphite server
graphitePort: port of Graphite server
Optional Variables:
debug: debug flag [default: false]
debugInterval: interval to print debug information [ms, default: 10000]
dumpMessages: log all incoming messages
flushInterval: interval (in ms) to flush to Graphite
percentThreshold: for time information, calculate the Nth percentile
[%, default: 90]
*/
{
graphitePort: 2003
, graphiteHost: "graphite.host.com"
, port: 8125
}

2 changes: 1 addition & 1 deletion python_example.py
Expand Up @@ -72,7 +72,7 @@ def send(data, sample_rate=1):
import random
if random.random() <= sample_rate:
for stat in data.keys():
value = data[stat]
value = sampled_data[stat]
sampled_data[stat] = "%s|@%s" %(value, sample_rate)
else:
sampled_data=data
Expand Down
12 changes: 12 additions & 0 deletions run_tests.sh
@@ -0,0 +1,12 @@
#!/usr/bin/env node
try {
var reporter = require('nodeunit').reporters.default;
}
catch(e) {
console.log("Cannot find nodeunit module.");
console.log("Make sure to run 'npm install nodeunit'");
process.exit();
}

process.chdir(__dirname);
reporter.run(['test/']);
32 changes: 18 additions & 14 deletions stats.js
Expand Up @@ -131,6 +131,8 @@ config.configFile(process.argv[2], function (config, oldConfig) {
server.bind(config.port || 8125, config.address || undefined);
mgmtServer.listen(config.mgmt_port || 8126, config.mgmt_address || undefined);

sys.log("server is up");

var flushInterval = Number(config.flushInterval || 10000);

flushInt = setInterval(function () {
Expand Down Expand Up @@ -191,23 +193,25 @@ config.configFile(process.argv[2], function (config, oldConfig) {

statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";

try {
var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
graphite.addListener('error', function(connectionException){
if (config.graphiteHost) {
try {
var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
graphite.addListener('error', function(connectionException){
if (config.debug) {
sys.log(connectionException);
}
});
graphite.on('connect', function() {
this.write(statString);
this.end();
stats['graphite']['last_flush'] = Math.round(new Date().getTime() / 1000);
});
} catch(e){
if (config.debug) {
sys.log(connectionException);
sys.log(e);
}
});
graphite.on('connect', function() {
this.write(statString);
this.end();
stats['graphite']['last_flush'] = Math.round(new Date().getTime() / 1000);
});
} catch(e){
if (config.debug) {
sys.log(e);
stats['graphite']['last_exception'] = Math.round(new Date().getTime() / 1000);
}
stats['graphite']['last_exception'] = Math.round(new Date().getTime() / 1000);
}

}, flushInterval);
Expand Down
227 changes: 227 additions & 0 deletions test/graphite_tests.js
@@ -0,0 +1,227 @@
var fs = require('fs'),
net = require('net'),
temp = require('temp'),
spawn = require('child_process').spawn,
sys = require('sys'),
urlparse = require('url').parse,
_ = require('underscore'),
dgram = require('dgram'),
qsparse = require('querystring').parse,
http = require('http');


var writeconfig = function(text,worker,cb,obj){
temp.open({suffix: '-statsdconf.js'}, function(err, info) {
if (err) throw err;
fs.write(info.fd, text);
fs.close(info.fd, function(err) {
if (err) throw err;
worker(info.path,cb,obj);
});
});
}

var array_contents_are_equal = function(first,second){
var intlen = _.intersection(first,second).length;
var unlen = _.union(first,second).length;
return (intlen == unlen) && (intlen == first.length);
}

var statsd_send = function(data,sock,host,port,cb){
send_data = new Buffer(data);
sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
if (err) {
throw err;
}
cb();
});
}

// keep collecting data until a specified timeout period has elapsed
// this will let us capture all data chunks so we don't miss one
var collect_for = function(server,timeout,cb){
var received = [];
var in_flight = 0;
var start_time = new Date().getTime();
var collector = function(req,res){
in_flight += 1;
var body = '';
req.on('data',function(data){ body += data; });
req.on('end',function(){
received = received.concat(body.split("\n"));
in_flight -= 1;
if((in_flight < 1) && (new Date().getTime() > (start_time + timeout))){
server.removeListener('request',collector);
cb(received);
}
});
}

setTimeout(function (){
server.removeListener('connection',collector);
if((in_flight < 1)){
cb(received);
}
},timeout);

server.on('connection',collector);
}

module.exports = {
setUp: function (callback) {
this.testport = 31337;
this.myflush = 200;
var configfile = "{graphService: \"graphite\"\n\
, batch: 200 \n\
, flushInterval: " + this.myflush + " \n\
, port: 8125\n\
, dumpMessages: false \n\
, debug: false\n\
, graphitePort: " + this.testport + "\n\
, graphiteHost: \"127.0.0.1\"}";

this.acceptor = net.createServer();
this.acceptor.listen(this.testport);
this.sock = dgram.createSocket('udp4');

this.server_up = true;
this.ok_to_die = false;
this.exit_callback_callback = process.exit;

writeconfig(configfile,function(path,cb,obj){
obj.path = path;
obj.server = spawn('node',['stats.js', path]);
obj.exit_callback = function (code) {
obj.server_up = false;
if(!obj.ok_to_die){
console.log('node server unexpectedly quit with code: ' + code);
process.exit(1);
}
else {
obj.exit_callback_callback();
}
};
obj.server.on('exit', obj.exit_callback);
obj.server.stderr.on('data', function (data) {
console.log('stderr: ' + data.toString().replace(/\n$/,''));
});
/*
obj.server.stdout.on('data', function (data) {
console.log('stdout: ' + data.toString().replace(/\n$/,''));
});
*/
obj.server.stdout.on('data', function (data) {
// wait until server is up before we finish setUp
if (data.toString().match(/server is up/)) {
cb();
}
});

},callback,this);
},
tearDown: function (callback) {
this.sock.close();
this.acceptor.close();
this.ok_to_die = true;
if(this.server_up){
this.exit_callback_callback = callback;
this.server.kill();
} else {
callback();
}
},

send_well_formed_posts: function (test) {
test.expect(2);

// we should integrate a timeout into this
this.acceptor.once('connection',function(c){
var body = '';
c.on('data',function(d){ body += d; });
c.on('end',function(){
var rows = body.split("\n");
var entries = _.map(rows, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'statsd.numStats'),'graphite output includes numStats');
test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'statsd.numStats' })['statsd.numStats'],0);
test.done();
});
});
},

timers_are_valid: function (test) {
test.expect(3);

var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 1);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');

var testtimervalue_test = function(post){
var mykey = 'stats.timers.a_test_value.mean';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean should be ' + testvalue);

test.done();
});
});
});
},

counts_are_valid: function (test) {
test.expect(4);

var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|c',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 1);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');

var testavgvalue_test = function(post){
var mykey = 'stats.a_test_value';
return _.include(_.keys(post),mykey) && (post[mykey] == (testvalue/(me.myflush / 1000)));
};
test.ok(_.any(hashes,testavgvalue_test), 'stats.a_test_value should be ' + (testvalue/(me.myflush / 1000)));

var testcountvalue_test = function(post){
var mykey = 'stats_counts.a_test_value';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testcountvalue_test), 'stats_counts.a_test_value should be ' + testvalue);

test.done();
});
});
});
}
}

0 comments on commit c6c3ef1

Please sign in to comment.