Skip to content

Commit

Permalink
Split out the graphite persistence code into a separate backend
Browse files Browse the repository at this point in the history
module.

This provides a basic framework for adding multiple, pluggable,
backends for persisting stats.
  • Loading branch information
mheffner committed Apr 26, 2012
1 parent 980b582 commit 17454a3
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 113 deletions.
139 changes: 139 additions & 0 deletions backends/graphite.js
@@ -0,0 +1,139 @@
/*
* Flushes stats to graphite.
*/

var net = require('net'),
util = require('util');

var debug;
var flushInterval;
var graphiteHost;
var graphitePort;

var graphiteStats = {};

var post_stats = function(statString) {
if (graphiteHost) {

try {
var graphite = net.createConnection(graphitePort, graphiteHost);
graphite.addListener('error', function(connectionException){
if (debug) {
util.log(connectionException);
}
});
graphite.on('connect', function() {
this.write(statString);
this.end();
graphiteStats.last_flush = Math.round(new Date().getTime() / 1000);
});
} catch(e){
if (debug) {
util.log(e);
}
graphiteStats.last_exception = Math.round(new Date().getTime() / 1000);
}
}
}

var flush_stats = function(metrics) {
var statString = '';
var ts = Math.round(new Date().getTime() / 1000);
var numStats = 0;
var key;

var counters = metrics.counters;
var gauges = metrics.gauges;
var timers = metrics.timers;
var pctThreshold = metrics.pctThreshold;

for (key in counters) {
var value = counters[key];
var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate

statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n";
statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n";

counters[key] = 0;
numStats += 1;
}

for (key in timers) {
if (timers[key].length > 0) {
var values = timers[key].sort(function (a,b) { return a-b; });
var count = values.length;
var min = values[0];
var max = values[count - 1];

var mean = min;
var maxAtThreshold = max;

var message = "";

var key2;

for (key2 in pctThreshold) {
var pct = pctThreshold[key2];
if (count > 1) {
var thresholdIndex = Math.round(((100 - pct) / 100) * count);
var numInThreshold = count - thresholdIndex;
var pctValues = values.slice(0, numInThreshold);
maxAtThreshold = pctValues[numInThreshold - 1];

// average the remaining timings
var sum = 0;
for (var i = 0; i < numInThreshold; i++) {
sum += pctValues[i];
}

mean = sum / numInThreshold;
}

var clean_pct = '' + pct;
clean_pct.replace('.', '_');
message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n";
}

timers[key] = [];

message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
statString += message;

numStats += 1;
}
}

for (key in gauges) {
statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n";
numStats += 1;
}

statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
post_stats(statString);
};

var write_stats = function(writeCb) {
for (stat in graphiteStats) {
writeCb(stat, graphiteStats[stat]);
}
};

var init_backend = function(startup_time, config, metrics) {
debug = config.debug;
flushInterval = Number(config.flushInterval || 10000);
graphiteHost = config.graphiteHost;
graphitePort = config.graphitePort;

graphiteStats.last_flush = startup_time;
graphiteStats.last_exception = startup_time;

var flushInt = setInterval(function() {
flush_stats(metrics);
}, flushInterval);
};

exports.init = init_backend;
exports.write_stats = write_stats;
3 changes: 3 additions & 0 deletions exampleConfig.js
Expand Up @@ -15,6 +15,9 @@ Graphite Required Variables:
Optional Variables:
backends: an array of backends to load. Each backend must exist
by name in the directory backends/. If not specified,
the default graphite backend will be loaded.
debug: debug flag [default: false]
port: port to listen for messages on over UDP [default: 8125]
mgmt_port: port to run the management TCP interface on [default: 8126]
Expand Down
172 changes: 59 additions & 113 deletions stats.js
Expand Up @@ -8,14 +8,33 @@ var keyCounter = {};
var counters = {};
var timers = {};
var gauges = {};
var debugInt, flushInt, keyFlushInt, server, mgmtServer;
var pctThreshold = null;
var backends = [];
var debugInt, keyFlushInt, server, mgmtServer;
var startup_time = Math.round(new Date().getTime() / 1000);

var loadBackend = function(config, name) {
var backendmod = require("./backends/" + name);
backends.push({
name: name,
mod: backendmod
});

var metrics = {
counters: counters,
gauges: gauges,
timers: timers,
pctThreshold: pctThreshold
}

if (config.debug) {
util.log("Loading backend: " + name);
}

backendmod.init(startup_time, config, metrics);
};

var stats = {
graphite: {
last_flush: startup_time,
last_exception: startup_time
},
messages: {
last_msg_seen: startup_time,
bad_lines_seen: 0,
Expand Down Expand Up @@ -108,20 +127,33 @@ config.configFile(process.argv[2], function (config, oldConfig) {

stream.write("uptime: " + uptime + "\n");

for (group in stats) {
for (metric in stats[group]) {
var val;
var stat_writer = function(group, metric, val) {
var delta;

if (metric.match("^last_")) {
delta = now - val;
}
else {
delta = val;
}

if (metric.match("^last_")) {
val = now - stats[group][metric];
}
else {
val = stats[group][metric];
}
stream.write(group + "." + metric + ": " + delta + "\n");
};

stream.write(group + "." + metric + ": " + val + "\n");
// Loop through the base stats
for (group in stats) {
for (metric in stats[group]) {
stat_writer(group, metric, stats[group][metric]);
}
}

// Retrieve stats from each backend
for (var i = 0; i < backends.length; i++) {
backends[i].mod.write_stats(function(stat, val) {
stat_writer(backends[i].name, stat, val);
});
}

stream.write("END\n\n");
break;

Expand Down Expand Up @@ -181,107 +213,19 @@ config.configFile(process.argv[2], function (config, oldConfig) {

util.log("server is up");

var flushInterval = Number(config.flushInterval || 10000);

var pctThreshold = config.percentThreshold || 90;
pctThreshold = config.percentThreshold || 90;
if (!Array.isArray(pctThreshold)) {
pctThreshold = [ pctThreshold ]; // listify percentiles so single values work the same
}

flushInt = setInterval(function () {
var statString = '';
var ts = Math.round(new Date().getTime() / 1000);
var numStats = 0;
var key;

for (key in counters) {
var value = counters[key];
var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate

statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n";
statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n";

counters[key] = 0;
numStats += 1;
if (config.backends) {
for (var i = 0; i < config.backends.length; i++) {
loadBackend(config, config.backends[i]);
}

for (key in timers) {
if (timers[key].length > 0) {
var values = timers[key].sort(function (a,b) { return a-b; });
var count = values.length;
var min = values[0];
var max = values[count - 1];

var mean = min;
var maxAtThreshold = max;

var message = "";

var key2;

for (key2 in pctThreshold) {
var pct = pctThreshold[key2];
if (count > 1) {
var thresholdIndex = Math.round(((100 - pct) / 100) * count);
var numInThreshold = count - thresholdIndex;
var pctValues = values.slice(0, numInThreshold);
maxAtThreshold = pctValues[numInThreshold - 1];

// average the remaining timings
var sum = 0;
for (var i = 0; i < numInThreshold; i++) {
sum += pctValues[i];
}

mean = sum / numInThreshold;
}

var clean_pct = '' + pct;
clean_pct.replace('.', '_');
message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n";
}

timers[key] = [];

message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
statString += message;

numStats += 1;
}
}

for (key in gauges) {
statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n";
numStats += 1;
}

statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";

if (config.graphiteHost) {
try {
var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
graphite.addListener('error', function(connectionException){
if (config.debug) {
util.log(connectionException);
}
});
graphite.on('connect', function() {
this.write(statString);
this.end();
stats['graphite']['last_flush'] = Math.round(new Date().getTime() / 1000);
});
} catch(e){
if (config.debug) {
util.log(e);
}
stats['graphite']['last_exception'] = Math.round(new Date().getTime() / 1000);
}
}

}, flushInterval);
} else {
// The default backend is graphite
loadBackend(config, 'graphite');
}

if (keyFlushInterval > 0) {
var keyFlushPercent = Number((config.keyFlush && config.keyFlush.percent) || 100);
Expand Down Expand Up @@ -314,6 +258,8 @@ config.configFile(process.argv[2], function (config, oldConfig) {
}, keyFlushInterval);
}

}
});

;

}
})

0 comments on commit 17454a3

Please sign in to comment.