Permalink
Browse files

initial import of StatsD

  • Loading branch information...
0 parents commit 122964c8b910594ac3a6055a6b9f981bdbbd8ff7 @kastner kastner committed Dec 30, 2010
Showing with 269 additions and 0 deletions.
  1. +39 −0 config.js
  2. +6 −0 exampleConfig.js
  3. +96 −0 php-example.php
  4. +128 −0 stats.js
@@ -0,0 +1,39 @@
+var fs = require('fs')
+ , sys = require('sys')
+
+var Configurator = function (file) {
+
+ var self = this;
+ var config = {};
+ var oldConfig = {};
+
+ this.updateConfig = function () {
+ sys.log('reading config file: ' + file);
+
+ fs.readFile(file, function (err, data) {
+ if (err) { throw err; }
+ old_config = self.config;
+
+ self.config = process.compile('config = ' + data, file);
+ self.emit('configChanged', self.config);
+ });
+ };
+
+ this.updateConfig();
+
+ fs.watchFile(file, function (curr, prev) {
+ if (curr.ino != prev.ino) { self.updateConfig(); }
+ });
+};
+
+sys.inherits(Configurator, process.EventEmitter);
+
+exports.Configurator = Configurator;
+
+exports.configFile = function(file, callbackFunc) {
+ var config = new Configurator(file);
+ config.on('configChanged', function() {
+ callbackFunc(config.config, config.oldConfig);
+ });
+};
+
@@ -0,0 +1,6 @@
+{
+ graphitePort: 2003
+, graphiteHost: "graphite.host.com"
+, port: 8125
+}
+
@@ -0,0 +1,96 @@
+<?php
+
+/**
+ * Sends statistics to the stats daemon over UDP
+ *
+ **/
+
+class StatsD {
+
+ /**
+ * Log timing information
+ *
+ * @param string $stats The metric to in log timing info for.
+ * @param float $time The ellapsed time (ms) to log
+ * @param float|1 $sampleRate the rate (0-1) for sampling.
+ **/
+ public static function timing($stat, $time, $sampleRate=1) {
+ StatsD::send(array($stat => "$time|ms"), $sampleRate);
+ }
+
+ /**
+ * Increments one or more stats counters
+ *
+ * @param string|array $stats The metric(s) to increment.
+ * @param float|1 $sampleRate the rate (0-1) for sampling.
+ * @return boolean
+ **/
+ public static function increment($stats, $sampleRate=1) {
+ StatsD::updateStats($stats, 1, $sampleRate);
+ }
+
+ /**
+ * Decrements one or more stats counters.
+ *
+ * @param string|array $stats The metric(s) to decrement.
+ * @param float|1 $sampleRate the rate (0-1) for sampling.
+ * @return boolean
+ **/
+ public static function decrement($stats, $sampleRate=1) {
+ StatsD::updateStats($stats, -1, $sampleRate);
+ }
+
+ /**
+ * Updates one or more stats counters by arbitrary amounts.
+ *
+ * @param string|array $stats The metric(s) to update. Should be either a string or array of metrics.
+ * @param int|1 $delta The amount to increment/decrement each metric by.
+ * @param float|1 $sampleRate the rate (0-1) for sampling.
+ * @return boolean
+ **/
+ public static function updateStats($stats, $delta=1, $sampleRate=1) {
+ if (!is_array($stats)) { $stats = array($stats); }
+ $data = array();
+ foreach($stats as $stat) {
+ $data[$stat] = "$delta|c";
+ }
+
+ StatsD::send($data, $sampleRate);
+ }
+
+ /*
+ * Squirt the metrics over UDP
+ **/
+ public static function send($data, $sampleRate=1) {
+ $config = Config::getInstance();
+ if (! $config->isEnabled("statsd")) { return; }
+
+ // sampling
+ $sampledData = array();
+
+ if ($sampleRate < 1) {
+ foreach ($data as $stat => $value) {
+ if ((mt_rand() / mt_getrandmax()) <= $sampleRate) {
+ $sampledData[$stat] = "$value|@$sampleRate";
+ }
+ }
+ } else {
+ $sampledData = $data;
+ }
+
+ if (empty($sampledData)) { return; }
+
+ // Wrap this in a try/catch - failures in any of this should be silently ignored
+ try {
+ $host = $config->getConfig("statsd.host");
+ $port = $config->getConfig("statsd.port");
+ $fp = fsockopen("udp://$host", $port, $errno, $errstr);
+ if (! $fp) { return; }
+ foreach ($sampledData as $stat => $value) {
+ fwrite($fp, "$stat:$value");
+ }
+ fclose($fp);
+ } catch (Exception $e) {
+ }
+ }
+}
128 stats.js
@@ -0,0 +1,128 @@
+var dgram = require('dgram')
+ , sys = require('sys')
+ , net = require('net')
+ , config = require('./config')
+
+var counters = {};
+var timers = {};
+var debugInt, flushInt, server;
+
+config.configFile(process.argv[2], function (config, oldConfig) {
+ if (! config.debug && debugInt) {
+ clearInterval(debugInt);
+ debugInt = false;
+ }
+
+ if (config.debug) {
+ if (debugInt !== undefined) { clearInterval(debugInt); }
+ debugInt = setInterval(function () {
+ sys.log("Counters:\n" + sys.inspect(counters) + "\nTimers:\n" + sys.inspect(timers));
+ }, config.debugInterval || 10000);
+ }
+
+ if (server === undefined) {
+ server = dgram.createSocket('udp4', function (msg, rinfo) {
+ if (config.dumpMessages) { sys.log(msg.toString()); }
+ var bits = msg.toString().split(':');
+ var key = bits.shift()
+ .replace(/\s+/g, '_')
+ .replace(/\//g, '-')
+ .replace(/[^a-zA-Z_\-0-9\.]/g, '');
+
+ if (bits.length == 0) {
+ bits.push("1");
+ }
+
+ for (var i = 0; i < bits.length; i++) {
+ var sampleRate = 1;
+ var fields = bits[i].split("|");
+ if (fields[1] == "ms") {
+ if (! timers[key]) {
+ timers[key] = [];
+ }
+ timers[key].push(Number(fields[0] || 0));
+ } else {
+ if (fields[2] && fields[2].match(/^@([\d\.]+)/)) {
+ sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
+ }
+ if (! counters[key]) {
+ counters[key] = 0;
+ }
+ counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
+ }
+ }
+ });
+
+ server.bind(config.port || 8125);
+
+ var flushInterval = Number(config.flushInterval || 10000);
+
+ flushInt = setInterval(function () {
+ var statString = '';
+ var ts = Math.round(new Date().getTime() / 1000);
+ var numStats = 0;
+ var key;
+
+ for (key in counters) {
+ var value = counters[key] / (flushInterval / 1000);
+ var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n";
+ statString += message;
+ counters[key] = 0;
+
+ numStats += 1;
+ }
+
+ for (key in timers) {
+ if (timers[key].length > 0) {
+ var pctThreshold = config.percentThreshold || 90;
+ var values = timers[key].sort(function (a,b) { return a-b; });
+ var count = values.length;
+ var min = values[0];
+ var max = values[count - 1];
+
+ var mean = min;
+ var maxAtThreshold = max;
+
+ if (count > 1) {
+ var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
+ var numInThreshold = count - thresholdIndex;
+ values = values.slice(0, numInThreshold);
+ maxAtThreshold = values[numInThreshold - 1];
+
+ // average the remaining timings
+ var sum = 0;
+ for (var i = 0; i < numInThreshold; i++) {
+ sum += values[i];
+ }
+
+ mean = sum / numInThreshold;
+ }
+
+ timers[key] = [];
+
+ var message = "";
+ message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
+ statString += message;
+
+ numStats += 1;
+ }
+ }
+
+ statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
+
+ var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
+
+ graphite.on('connect', function() {
+ this.write(statString);
+ this.end();
+ });
+
+ }, flushInterval);
+ }
+
+});
+

0 comments on commit 122964c

Please sign in to comment.