diff --git a/README.md b/README.md index d288ca60..6b1cd299 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,10 @@ StatsD [![Build Status](https://secure.travis-ci.org/etsy/statsd.png)](http://travis-ci.org/etsy/statsd) ====== -A network daemon for aggregating statistics (counters, timers, and gauges), rolling them up, then sending them to [graphite][graphite]. +A network daemon that runs on the [Node.js][node] platform and +listens for statistics, like counters and timers, sent over [UDP][udp] +and sends aggregates to one or more pluggable backend services (e.g., +[Graphite][graphite]). We ([Etsy][etsy]) [blogged][blog post] about how it works and why we created it. @@ -16,7 +19,8 @@ Concepts Each stat will have a value. How it is interpreted depends on modifiers * *flush* - After the flush interval timeout (default 10 seconds), stats are munged and sent over to Graphite. + After the flush interval timeout (default 10 seconds), stats are + aggregated and sent to an upstream backend service. Counting -------- @@ -64,14 +68,33 @@ There are additional config variables available for debugging: For more information, check the `exampleConfig.js`. -Guts ----- +Supported Backends +------------------ -* [UDP][udp] - Client libraries use UDP to send information to the StatsD daemon. +StatsD supports multiple, pluggable, backend modules that can publish +statistics from the local StatsD daemon to a backend service or data +store. Backend services can retain statistics for +longer durations in a time series data store, visualize statistics in +graphs or tables, or generate alerts based on defined thresholds. A +backend can also correlate statistics sent from StatsD daemons running +across multiple hosts in an infrastructure. -* [NodeJS][node] -* [Graphite][graphite] +StatsD supports the following backends: + +* [Graphite][graphite] (`graphite`): Graphite is an open-source + time-series data store that provides visualization through a + web-browser interface. + +By default, the `graphite` backend will be loaded automatically. To +select which backends are loaded, set the `backends` configuration +variable to the list of backend modules to load. Each backend module +must exist by its name in the `backends/` top-level directory. + +To add a new backend, see the section *Backend Interface* below that +describes the backend module interface. + +Graphite Schema +--------------- Graphite uses "schemas" to define the different round robin datasets it houses (analogous to RRAs in rrdtool). Here's what Etsy is using for the stats databases: @@ -100,11 +123,17 @@ A really simple TCP management interface is available by default on port 8126 or The stats output currently will give you: * uptime: the number of seconds elapsed since statsd started -* graphite.last_flush: the number of seconds elapsed since the last successful flush to graphite -* graphite.last_exception: the number of seconds elapsed since the last exception thrown whilst flushing to graphite * messages.last_msg_seen: the number of elapsed seconds since statsd received a message * messages.bad_lines_seen: the number of bad lines seen since startup +Each backend will also publish a set of statistics, prefixed by its +module name. + +Graphite: + +* graphite.last_flush: the number of seconds elapsed since the last successful flush to graphite +* graphite.last_exception: the number of seconds elapsed since the last exception thrown whilst flushing to graphite + A simple nagios check can be found in the utils/ directory that can be used to check metric thresholds, for example the number of seconds since the last successful flush to graphite. Installation and Configuration @@ -124,6 +153,64 @@ A test framework has been added using node-unit and some custom code to start an Tests can be executd with `./run_tests.sh`. +Backend Interface +----------------- + +Backend modules are Node.js [modules][nodemods] that listen for a +number of events emitted from StatsD. Each backend module should +export the following initialization function: + +* `init(startup_time, config, events)`: This method is invoked from StatsD to + initialize the backend module. It accepts three parameters: + `startup_time` is the startup time of StatsD in epoch seconds, + `config` is the parsed config file hash, and `events` is the event + emitter that backends can use to listen for events. + + The backend module should return `true` from init() to indicate + success. A return of `false` indicates a failure to load the module + (missing configuration?) and will cause StatsD to exit. + +Backends can listen for the following events emitted by StatsD from +the `events` object: + +* Event: **'flush'** + + Parameters: `(time_stamp, metrics)` + + Emitted on each flush interval so that backends can push aggregate + metrics to their respective backend services. The event is passed + two parameters: `time_stamp` is the current time in epoch seconds + and `metrics` is a hash representing the StatsD statistics: + + ``` +metrics: { + counters: counters, + gauges: gauges, + timers: timers, + pctThreshold: pctThreshold +} + ``` + + Each backend module is passed the same set of statistics, so a + backend module should treat the metrics as immutable + structures. StatsD will reset timers and counters after each + listener has handled the event. + +* Event: **'status'** + + Parameters: `(writeCb)` + + Emitted when a user invokes a *stats* command on the management + server port. It allows each backend module to dump backend-specific + status statistics to the management port. + + The `writeCb` callback function has a signature of `f(error, + backend_name, stat_name, stat_value)`. The backend module should + invoke this method with each stat_name and stat_value that should be + sent to the management port. StatsD will prefix each stat name with + the `backend_name`. The backend should set `error` to *null*, or, in + the case of a failure, an appropriate error. + Inspiration ----------- @@ -151,7 +238,8 @@ We'll do our best to get your changes in! [etsy]: http://www.etsy.com [blog post]: http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/ [node]: http://nodejs.org -[udp]: http://enwp.org/udp +[nodemods]: http://nodejs.org/api/modules.html +[udp]: http://en.wikipedia.org/wiki/User_Datagram_Protocol Contributors diff --git a/backends/graphite.js b/backends/graphite.js new file mode 100644 index 00000000..69692fb5 --- /dev/null +++ b/backends/graphite.js @@ -0,0 +1,143 @@ +/* + * Flush stats to graphite (http://graphite.wikidot.com/). + * + * To enable this backend, include 'graphite' in the backends + * configuration array: + * + * backends: ['graphite'] + * + * This backend supports the following config options: + * + * graphiteHost: Hostname of graphite server. + * graphitePort: Port to contact graphite server at. + */ + +var net = require('net'), + util = require('util'); + +var debug; +var flushInterval; +var graphiteHost; +var graphitePort; + +var graphiteStats = {}; + +var post_stats = function graphite_post_stats(statString) { + if (graphiteHost) { + try { + var graphite = net.createConnection(graphitePort, graphiteHost); + graphite.addListener('error', function(connectionException){ + if (debug) { + util.log(connectionException); + } + }); + graphite.on('connect', function() { + this.write(statString); + this.end(); + graphiteStats.last_flush = Math.round(new Date().getTime() / 1000); + }); + } catch(e){ + if (debug) { + util.log(e); + } + graphiteStats.last_exception = Math.round(new Date().getTime() / 1000); + } + } +} + +var flush_stats = function graphite_flush(ts, metrics) { + var statString = ''; + var numStats = 0; + var key; + + var counters = metrics.counters; + var gauges = metrics.gauges; + var timers = metrics.timers; + var pctThreshold = metrics.pctThreshold; + + for (key in counters) { + var value = counters[key]; + var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate + + statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n"; + statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n"; + + numStats += 1; + } + + for (key in timers) { + if (timers[key].length > 0) { + var values = timers[key].sort(function (a,b) { return a-b; }); + var count = values.length; + var min = values[0]; + var max = values[count - 1]; + + var mean = min; + var maxAtThreshold = max; + + var message = ""; + + var key2; + + for (key2 in pctThreshold) { + var pct = pctThreshold[key2]; + if (count > 1) { + var thresholdIndex = Math.round(((100 - pct) / 100) * count); + var numInThreshold = count - thresholdIndex; + var pctValues = values.slice(0, numInThreshold); + maxAtThreshold = pctValues[numInThreshold - 1]; + + // average the remaining timings + var sum = 0; + for (var i = 0; i < numInThreshold; i++) { + sum += pctValues[i]; + } + + mean = sum / numInThreshold; + } + + var clean_pct = '' + pct; + clean_pct.replace('.', '_'); + message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n"; + } + + message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n"; + statString += message; + + numStats += 1; + } + } + + for (key in gauges) { + statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n"; + numStats += 1; + } + + statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n"; + post_stats(statString); +}; + +var backend_status = function graphite_status(writeCb) { + for (stat in graphiteStats) { + writeCb(null, 'graphite', stat, graphiteStats[stat]); + } +}; + +exports.init = function graphite_init(startup_time, config, events) { + debug = config.debug; + graphiteHost = config.graphiteHost; + graphitePort = config.graphitePort; + + graphiteStats.last_flush = startup_time; + graphiteStats.last_exception = startup_time; + + flushInterval = config.flushInterval; + + events.on('flush', flush_stats); + events.on('status', backend_status); + + return true; +}; diff --git a/exampleConfig.js b/exampleConfig.js index af08b4a4..465c0087 100644 --- a/exampleConfig.js +++ b/exampleConfig.js @@ -15,6 +15,9 @@ Graphite Required Variables: Optional Variables: + backends: an array of backends to load. Each backend must exist + by name in the directory backends/. If not specified, + the default graphite backend will be loaded. debug: debug flag [default: false] port: port to listen for messages on over UDP [default: 8125] mgmt_port: port to run the management TCP interface on [default: 8126] diff --git a/stats.js b/stats.js index 362d49c3..adf67bc1 100644 --- a/stats.js +++ b/stats.js @@ -3,19 +3,61 @@ var dgram = require('dgram') , net = require('net') , config = require('./config') , fs = require('fs') + , events = require('events') var keyCounter = {}; var counters = {}; var timers = {}; var gauges = {}; -var debugInt, flushInt, keyFlushInt, server, mgmtServer; +var pctThreshold = null; +var debugInt, flushInterval, keyFlushInt, server, mgmtServer; var startup_time = Math.round(new Date().getTime() / 1000); +var backendEvents = new events.EventEmitter(); + +// Load and init the backend from the backends/ directory. +function loadBackend(config, name) { + var backendmod = require("./backends/" + name); + + if (config.debug) { + util.log("Loading backend: " + name); + } + + var ret = backendmod.init(startup_time, config, backendEvents); + if (!ret) { + util.log("Failed to load backend: " + name); + process.exit(1); + } +}; + +// Flush metrics to each backend. +function flushMetrics() { + var time_stamp = Math.round(new Date().getTime() / 1000); + + var metrics_hash = { + counters: counters, + gauges: gauges, + timers: timers, + pctThreshold: pctThreshold + } + + // After all listeners, reset the stats + backendEvents.once('flush', function clear_metrics(ts, metrics) { + // Clear the counters + for (key in metrics.counters) { + metrics.counters[key] = 0; + } + + // Clear the timers + for (key in metrics.timers) { + metrics.timers[key] = []; + } + }); + + // Flush metrics to each backend. + backendEvents.emit('flush', time_stamp, metrics_hash); +}; var stats = { - graphite: { - last_flush: startup_time, - last_exception: startup_time - }, messages: { last_msg_seen: startup_time, bad_lines_seen: 0, @@ -108,21 +150,40 @@ config.configFile(process.argv[2], function (config, oldConfig) { stream.write("uptime: " + uptime + "\n"); - for (group in stats) { - for (metric in stats[group]) { - var val; + var stat_writer = function(group, metric, val) { + var delta; - if (metric.match("^last_")) { - val = now - stats[group][metric]; - } - else { - val = stats[group][metric]; - } + if (metric.match("^last_")) { + delta = now - val; + } + else { + delta = val; + } + + stream.write(group + "." + metric + ": " + delta + "\n"); + }; - stream.write(group + "." + metric + ": " + val + "\n"); + // Loop through the base stats + for (group in stats) { + for (metric in stats[group]) { + stat_writer(group, metric, stats[group][metric]); } } - stream.write("END\n\n"); + + backendEvents.once('status', function(writeCb) { + stream.write("END\n\n"); + }); + + // Let each backend contribute its status + backendEvents.emit('status', function(err, name, stat, val) { + if (err) { + util.log("Failed to read stats for backend " + + name + ": " + err); + } else { + stat_writer(name, stat, val); + } + }); + break; case "counters": @@ -181,107 +242,25 @@ config.configFile(process.argv[2], function (config, oldConfig) { util.log("server is up"); - var flushInterval = Number(config.flushInterval || 10000); - - var pctThreshold = config.percentThreshold || 90; + pctThreshold = config.percentThreshold || 90; if (!Array.isArray(pctThreshold)) { pctThreshold = [ pctThreshold ]; // listify percentiles so single values work the same } - flushInt = setInterval(function () { - var statString = ''; - var ts = Math.round(new Date().getTime() / 1000); - var numStats = 0; - var key; - - for (key in counters) { - var value = counters[key]; - var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate + flushInterval = Number(config.flushInterval || 10000); + config.flushInterval = flushInterval; - statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n"; - statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n"; - - counters[key] = 0; - numStats += 1; - } - - for (key in timers) { - if (timers[key].length > 0) { - var values = timers[key].sort(function (a,b) { return a-b; }); - var count = values.length; - var min = values[0]; - var max = values[count - 1]; - - var mean = min; - var maxAtThreshold = max; - - var message = ""; - - var key2; - - for (key2 in pctThreshold) { - var pct = pctThreshold[key2]; - if (count > 1) { - var thresholdIndex = Math.round(((100 - pct) / 100) * count); - var numInThreshold = count - thresholdIndex; - var pctValues = values.slice(0, numInThreshold); - maxAtThreshold = pctValues[numInThreshold - 1]; - - // average the remaining timings - var sum = 0; - for (var i = 0; i < numInThreshold; i++) { - sum += pctValues[i]; - } - - mean = sum / numInThreshold; - } - - var clean_pct = '' + pct; - clean_pct.replace('.', '_'); - message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n"; - message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n"; - } - - timers[key] = []; - - message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n"; - message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n"; - message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n"; - statString += message; - - numStats += 1; - } - } - - for (key in gauges) { - statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n"; - numStats += 1; - } - - statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n"; - - if (config.graphiteHost) { - try { - var graphite = net.createConnection(config.graphitePort, config.graphiteHost); - graphite.addListener('error', function(connectionException){ - if (config.debug) { - util.log(connectionException); - } - }); - graphite.on('connect', function() { - this.write(statString); - this.end(); - stats['graphite']['last_flush'] = Math.round(new Date().getTime() / 1000); - }); - } catch(e){ - if (config.debug) { - util.log(e); - } - stats['graphite']['last_exception'] = Math.round(new Date().getTime() / 1000); - } + if (config.backends) { + for (var i = 0; i < config.backends.length; i++) { + loadBackend(config, config.backends[i]); } + } else { + // The default backend is graphite + loadBackend(config, 'graphite'); + } - }, flushInterval); + // Setup the flush timer + var flushInt = setInterval(flushMetrics, flushInterval); if (keyFlushInterval > 0) { var keyFlushPercent = Number((config.keyFlush && config.keyFlush.percent) || 100); @@ -314,6 +293,8 @@ config.configFile(process.argv[2], function (config, oldConfig) { }, keyFlushInterval); } - } -}); + + ; + } +}) diff --git a/test/graphite_tests.js b/test/graphite_tests.js index 5c50c2c1..381a9cf2 100644 --- a/test/graphite_tests.js +++ b/test/graphite_tests.js @@ -42,7 +42,7 @@ var statsd_send = function(data,sock,host,port,cb){ var collect_for = function(server,timeout,cb){ var received = []; var in_flight = 0; - var start_time = new Date().getTime(); + var timed_out = false; var collector = function(req,res){ in_flight += 1; var body = ''; @@ -50,7 +50,7 @@ var collect_for = function(server,timeout,cb){ req.on('end',function(){ received = received.concat(body.split("\n")); in_flight -= 1; - if((in_flight < 1) && (new Date().getTime() > (start_time + timeout))){ + if((in_flight < 1) && timed_out){ server.removeListener('request',collector); cb(received); } @@ -58,8 +58,9 @@ var collect_for = function(server,timeout,cb){ } setTimeout(function (){ - server.removeListener('connection',collector); - if((in_flight < 1)){ + timed_out = true; + if((in_flight < 1)) { + server.removeListener('connection',collector); cb(received); } },timeout);