diff --git a/backends/graphite.js b/backends/graphite.js new file mode 100644 index 00000000..4b3b29e8 --- /dev/null +++ b/backends/graphite.js @@ -0,0 +1,139 @@ +/* + * Flushes stats to graphite. + */ + +var net = require('net'), + util = require('util'); + +var debug; +var flushInterval; +var graphiteHost; +var graphitePort; + +var graphiteStats = {}; + +var post_stats = function(statString) { + if (graphiteHost) { + + try { + var graphite = net.createConnection(graphitePort, graphiteHost); + graphite.addListener('error', function(connectionException){ + if (debug) { + util.log(connectionException); + } + }); + graphite.on('connect', function() { + this.write(statString); + this.end(); + graphiteStats.last_flush = Math.round(new Date().getTime() / 1000); + }); + } catch(e){ + if (debug) { + util.log(e); + } + graphiteStats.last_exception = Math.round(new Date().getTime() / 1000); + } + } +} + +var flush_stats = function(metrics) { + var statString = ''; + var ts = Math.round(new Date().getTime() / 1000); + var numStats = 0; + var key; + + var counters = metrics.counters; + var gauges = metrics.gauges; + var timers = metrics.timers; + var pctThreshold = metrics.pctThreshold; + + for (key in counters) { + var value = counters[key]; + var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate + + statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n"; + statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n"; + + counters[key] = 0; + numStats += 1; + } + + for (key in timers) { + if (timers[key].length > 0) { + var values = timers[key].sort(function (a,b) { return a-b; }); + var count = values.length; + var min = values[0]; + var max = values[count - 1]; + + var mean = min; + var maxAtThreshold = max; + + var message = ""; + + var key2; + + for (key2 in pctThreshold) { + var pct = pctThreshold[key2]; + if (count > 1) { + var thresholdIndex = Math.round(((100 - pct) / 100) * count); + var numInThreshold = count - thresholdIndex; + var pctValues = values.slice(0, numInThreshold); + maxAtThreshold = pctValues[numInThreshold - 1]; + + // average the remaining timings + var sum = 0; + for (var i = 0; i < numInThreshold; i++) { + sum += pctValues[i]; + } + + mean = sum / numInThreshold; + } + + var clean_pct = '' + pct; + clean_pct.replace('.', '_'); + message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n"; + } + + timers[key] = []; + + message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n"; + statString += message; + + numStats += 1; + } + } + + for (key in gauges) { + statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n"; + numStats += 1; + } + + statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n"; + post_stats(statString); +}; + +var write_stats = function(writeCb) { + for (stat in graphiteStats) { + writeCb(stat, graphiteStats[stat]); + } +}; + +var init_backend = function(startup_time, config, metrics) { + debug = config.debug; + flushInterval = Number(config.flushInterval || 10000); + graphiteHost = config.graphiteHost; + graphitePort = config.graphitePort; + + graphiteStats.last_flush = startup_time; + graphiteStats.last_exception = startup_time; + + var flushInt = setInterval(function() { + flush_stats(metrics); + }, flushInterval); +}; + +exports.init = init_backend; +exports.write_stats = write_stats; diff --git a/exampleConfig.js b/exampleConfig.js index af08b4a4..465c0087 100644 --- a/exampleConfig.js +++ b/exampleConfig.js @@ -15,6 +15,9 @@ Graphite Required Variables: Optional Variables: + backends: an array of backends to load. Each backend must exist + by name in the directory backends/. If not specified, + the default graphite backend will be loaded. debug: debug flag [default: false] port: port to listen for messages on over UDP [default: 8125] mgmt_port: port to run the management TCP interface on [default: 8126] diff --git a/stats.js b/stats.js index 362d49c3..d1129851 100644 --- a/stats.js +++ b/stats.js @@ -8,14 +8,33 @@ var keyCounter = {}; var counters = {}; var timers = {}; var gauges = {}; -var debugInt, flushInt, keyFlushInt, server, mgmtServer; +var pctThreshold = null; +var backends = []; +var debugInt, keyFlushInt, server, mgmtServer; var startup_time = Math.round(new Date().getTime() / 1000); +var loadBackend = function(config, name) { + var backendmod = require("./backends/" + name); + backends.push({ + name: name, + mod: backendmod + }); + + var metrics = { + counters: counters, + gauges: gauges, + timers: timers, + pctThreshold: pctThreshold + } + + if (config.debug) { + util.log("Loading backend: " + name); + } + + backendmod.init(startup_time, config, metrics); +}; + var stats = { - graphite: { - last_flush: startup_time, - last_exception: startup_time - }, messages: { last_msg_seen: startup_time, bad_lines_seen: 0, @@ -108,20 +127,33 @@ config.configFile(process.argv[2], function (config, oldConfig) { stream.write("uptime: " + uptime + "\n"); - for (group in stats) { - for (metric in stats[group]) { - var val; + var stat_writer = function(group, metric, val) { + var delta; + + if (metric.match("^last_")) { + delta = now - val; + } + else { + delta = val; + } - if (metric.match("^last_")) { - val = now - stats[group][metric]; - } - else { - val = stats[group][metric]; - } + stream.write(group + "." + metric + ": " + delta + "\n"); + }; - stream.write(group + "." + metric + ": " + val + "\n"); + // Loop through the base stats + for (group in stats) { + for (metric in stats[group]) { + stat_writer(group, metric, stats[group][metric]); } } + + // Retrieve stats from each backend + for (var i = 0; i < backends.length; i++) { + backends[i].mod.write_stats(function(stat, val) { + stat_writer(backends[i].name, stat, val); + }); + } + stream.write("END\n\n"); break; @@ -181,107 +213,19 @@ config.configFile(process.argv[2], function (config, oldConfig) { util.log("server is up"); - var flushInterval = Number(config.flushInterval || 10000); - - var pctThreshold = config.percentThreshold || 90; + pctThreshold = config.percentThreshold || 90; if (!Array.isArray(pctThreshold)) { pctThreshold = [ pctThreshold ]; // listify percentiles so single values work the same } - flushInt = setInterval(function () { - var statString = ''; - var ts = Math.round(new Date().getTime() / 1000); - var numStats = 0; - var key; - - for (key in counters) { - var value = counters[key]; - var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate - - statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n"; - statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n"; - - counters[key] = 0; - numStats += 1; + if (config.backends) { + for (var i = 0; i < config.backends.length; i++) { + loadBackend(config, config.backends[i]); } - - for (key in timers) { - if (timers[key].length > 0) { - var values = timers[key].sort(function (a,b) { return a-b; }); - var count = values.length; - var min = values[0]; - var max = values[count - 1]; - - var mean = min; - var maxAtThreshold = max; - - var message = ""; - - var key2; - - for (key2 in pctThreshold) { - var pct = pctThreshold[key2]; - if (count > 1) { - var thresholdIndex = Math.round(((100 - pct) / 100) * count); - var numInThreshold = count - thresholdIndex; - var pctValues = values.slice(0, numInThreshold); - maxAtThreshold = pctValues[numInThreshold - 1]; - - // average the remaining timings - var sum = 0; - for (var i = 0; i < numInThreshold; i++) { - sum += pctValues[i]; - } - - mean = sum / numInThreshold; - } - - var clean_pct = '' + pct; - clean_pct.replace('.', '_'); - message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n"; - message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n"; - } - - timers[key] = []; - - message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n"; - message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n"; - message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n"; - statString += message; - - numStats += 1; - } - } - - for (key in gauges) { - statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n"; - numStats += 1; - } - - statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n"; - - if (config.graphiteHost) { - try { - var graphite = net.createConnection(config.graphitePort, config.graphiteHost); - graphite.addListener('error', function(connectionException){ - if (config.debug) { - util.log(connectionException); - } - }); - graphite.on('connect', function() { - this.write(statString); - this.end(); - stats['graphite']['last_flush'] = Math.round(new Date().getTime() / 1000); - }); - } catch(e){ - if (config.debug) { - util.log(e); - } - stats['graphite']['last_exception'] = Math.round(new Date().getTime() / 1000); - } - } - - }, flushInterval); + } else { + // The default backend is graphite + loadBackend(config, 'graphite'); + } if (keyFlushInterval > 0) { var keyFlushPercent = Number((config.keyFlush && config.keyFlush.percent) || 100); @@ -314,6 +258,8 @@ config.configFile(process.argv[2], function (config, oldConfig) { }, keyFlushInterval); } - } -}); + + ; + } +})