Enable AMQP over SSL, topic exchanges, and graphite format #1

Merged
merged 4 commits into from Dec 17, 2013
View
@@ -1,28 +1,63 @@
# StatsD AMQP publisher backend
## Overview
+
This is a pluggable backend for [StatsD](https://github.com/etsy/statsd), which
-publishes stats to an AMQP queue instead of graphite
+publishes stats to an AMQP topic exchange instead of graphite.
## Installation
npm install statsd-amqp-backend
## Configuration
-You have to give basic information about your AMQP server to use
+
+You have to give basic information about your AMQP server to use:
+
```
-{ amqpHost: 'localhost'
-, amqpPort: 5672
-, amqpLogin: 'guest'
-, amqpPassword: 'guest'
-, amqpVhost: '/'
-, amqpQueue: 'statsd'
-, amqpDefaultExchange: ''
+{
+ backends: [ "statsd-amqp-backend" ]
+, amqp: {
+ host: 'localhost'
+ , port: 5672
+ , login: 'guest'
+ , password: 'guest'
+ , vhost: '/'
+ , defaultExchange: ''
+ , messageFormat: 'graphite' // can be 'graphite' or 'json'
+ , ssl: {
+ enabled : false
+ , keyFile : '/path/to/key/file'
+ , certFile : '/path/to/cert/file'
+ , caFile : '/path/to/cacert/file'
+ , rejectUnauthorized : false
+ }
+ }
}
```
+## AMQP
+
+When using `amqpMessageFormat: 'json'`, at flush time the entire metrics payload
+will be sent to the `amqpDefaultExchange` using the routing key `json_payload`,
+with contentType `application/json`.
+Example:
+
+```
+{"counters":{"my-metrics.bad_lines_seen":0,"my-metrics.packets_received":0},"gauges":{},"timers":{},"timer_counters":{},"sets":{},"counter_rates":{"my-metrics.bad_lines_seen":0,"my-metrics.packets_received":0},"timer_data":{},"pctThreshold":[90],"statsd_metrics":{"processing_time":0}}
+```
+
+When using `amqpMessageFormat: 'graphite'`, at flush time each metric will be sent
+individually to the `amqpDefaultExchange` using the metric's key name as the routing key,
+with contentType `text/graphite`.
+Example:
+
+```
+my-metrics.bad_lines_seen 0 1387057730
+my-metrics.macbook-aw.bad_lines_seen 0 1387057730
+```
+
## Dependencies
-- [node-amqp](https://github.com/postwait/node-amqp)
+- [amqplib](https://github.com/squaremo/amqp.node)
## Development
- [Bugs](https://github.com/mrtazz/statsd-amqp-backend/issues)
View
@@ -8,7 +8,7 @@
},
"version": "0.1.0",
"dependencies": {
- "amqp": "0.1.3"
+ "amqplib": ">=0.1"
},
"devDependencies": {
"nodeunit": "0.7.x"
@@ -8,25 +8,37 @@
*
* This backend supports the following config options:
*
- * amqpHost: Hostname of AMQP server.
- * amqpPort: Port to contact AMQP server at.
- * amqpLogin: Login for the AMQP server.
- * amqpPassword: Password for the AMQP server.
- * amqpVhost: vhost for the AMQP server.
- * amqpQueue: queue for the AMQP server.
- * amqpDefaultExchange: default exchange to use.
+ * amqp.host: Hostname of AMQP server.
+ * amqp.port: Port to contact AMQP server at.
+ * amqp.login: Login for the AMQP server.
+ * amqp.password: Password for the AMQP server.
+ * amqp.vhost: vhost for the AMQP server.
+ * amqp.defaultExchange: default AMQP topic exchange to use.
+ * amqp.messageFormat: AMQP message format: graphite or json
+ * amqp.ssl: SSL options for AMQP.
*/
-var util = require('util');
+var util = require('util');
+var fs = require('fs');
var amqp;
-var conn;
+var open;
+var channel;
var debug;
+var prefixStats;
var flushInterval;
-
-var amqpQueue;
+var flush_counts;
var amqpStats = {};
+var options = {};
+var exchangeOptions = {};
+
+// prefix configuration
+var globalPrefix;
+var prefixCounter;
+var prefixGauge;
+var prefixTimer;
+var globalSuffix;
var deepCopy = function(obj) {
if (Object.prototype.toString.call(obj) === '[object Array]') {
@@ -49,15 +61,143 @@ var deepCopy = function(obj) {
var flush_stats = function(ts, metrics)
{
var data = deepCopy(metrics);
+ var payload = [];
+ var key;
+ var value;
+ var result;
+ var ts_suffix = ' ' + ts + "\n";
- if (debug) {
- util.log("Publishing metrics: "+JSON.stringify(data));
- }
- conn.publish(amqpQueue, JSON.stringify(data));
+ return open.createChannel().then(function(ch) {
+ channel = ch;
+ var ok = ch.assertExchange(options.exchange, 'topic', {durable: true});
+ return ok.then(function() {
+ switch(options.format)
+ {
+ case 'graphite':
+ exchangeOptions = {
+ 'contentType': 'text/graphite',
+ 'appId': 'statsdAMQP',
+ 'deliveryMode': 2
+ }
-};
+ // Send counters
+ for (key in metrics.counters) {
+ value = metrics.counters[key];
+ var valuePerSecond = metrics.counter_rates[key];
+
+ result = globalPrefix + prefixCounter + key + '.rate' + globalSuffix + valuePerSecond + ts_suffix;
+ payload.push({
+ metric: globalPrefix + prefixCounter + key + '.rate',
+ result: result
+ });
+
+ if (flush_counts) {
+ result = globalPrefix + prefixCounter + key + '.count' + globalSuffix + value + ts_suffix;
+ payload.push({
+ metric: globalPrefix + prefixCounter + key + '.count',
+ result: result
+ });
+ }
+ }
+
+ // Send gauges
+ for (key in metrics.gauges) {
+ value = metrics.gauges[key];
+
+ result = globalPrefix + prefixGauge + key + globalSuffix + value + ts_suffix;
+ payload.push({
+ metric: globalPrefix + prefixGauge + key,
+ result: result
+ });
+ }
+
+ // Send timers
+ for (key in metrics.timer_data) {
+ for (timer_data_key in metrics.timer_data[key]) {
+ if (typeof(metrics.timer_data[key][timer_data_key]) === 'number') {
+ result = globalPrefix + prefixTimer + key + timer_data_key + globalSuffix + metrics.timer_data[key][timer_data_key] + ts_suffix;
+ payload.push({
+ metric: globalPrefix + prefixTimer + key + timer_data_key,
+ result: result
+ });
+ } else {
+ for (var timer_data_sub_key in metrics.timer_data[key][timer_data_key]) {
+ if (debug) {
+ util.log(metrics.timer_data[key][timer_data_key][timer_data_sub_key].toString());
+ }
+ result = globalPrefix + prefixTimer + key + timer_data_key + '.' + timer_data_sub_key + globalSuffix + metrics.timer_data[key][timer_data_key][timer_data_sub_key] + ts_suffix;
+ payload.push({
+ metric: globalPrefix + prefixTimer + key + timer_data_key + '.' + timer_data_sub_key,
+ result: result
+ });
+ }
+ }
+ }
+ }
+
+ // Send other stats
+ result = globalPrefix + prefixStats + '.numStats' + globalSuffix + payload.length + ts_suffix;
+ payload.push({
+ metric: globalPrefix + prefixStats + '.numStats',
+ result: result
+ });
+
+ // Send statsd metrics
+ for (key in metrics.statsd_metrics) {
+ result = globalPrefix + prefixStats + '.' + key + globalSuffix + metrics.statsd_metrics[key] + ts_suffix;
+ payload.push({
+ metric: globalPrefix + prefixStats + '.' + key,
+ result: result
+ });
+ }
+
+ post_stats(payload, function() {
+ if (debug) {
+ util.log("numStats: " + payload.length);
+ }
+ return ch.close();
+ });
+ break;
+ case 'json':
+ exchangeOptions = {
+ 'contentType': 'application/json',
+ 'appId': 'statsdAMQP',
+ 'deliveryMode': 2
+ }
+
+ payload.push({
+ metric: 'json_payload',
+ result: JSON.stringify(data)
+ });
+ post_stats(payload, function() {
+ return ch.close();
+ });
+ break;
+ }
+ });
+ });
+}
+var post_stats = function(payload, callback)
+{
+ try {
+ for (key in payload) {
+ data = payload[key];
+ channel.publish(options.exchange, data.metric, new Buffer(String(data.result)), exchangeOptions);
+ if (debug) {
+ util.log("Published: " + data.result);
+ }
+ }
+ amqpStats.last_flush = Math.round(new Date().getTime() / 1000);
+ callback();
+ } catch(e) {
+ if (debug) {
+ util.log(e);
+ }
+ amqpStats.last_exception = Math.round(new Date().getTime() / 1000);
+ }
+}
var backend_status = function(writeCb)
{
@@ -66,35 +206,74 @@ var backend_status = function(writeCb)
}
};
-exports.init = function(startup_time, config, events, amq)
+var connect = function(connectUri, sslOptions, cb)
{
- // take the amqp module as a param for testing
- amqp = typeof amq !== 'undefined' ? amq : require('amqp');
+ amqp.connect(connectUri, sslOptions).then(function(connection) {
+ connection.on('error', function(err) {
+ if (debug) {
+ util.log("Disconnected from AMQP server, retrying..");
+ }
+ connect(connectUri, sslOptions, function(cb) {
+ open = cb;
+ });
+ });
+ cb(connection);
+ }).then(null, console.warn);
+}
+
+exports.init = function(startup_time, config, events)
+{
+ // set defaults for prefixes & suffix
+ globalPrefix = "stats.";
+ prefixCounter = "counters.";
+ prefixGauge = "gauges.";
+ prefixTimer = "timers.";
+ globalSuffix = ' ';
+
+ amqp = require('amqplib');
debug = config.debug;
+ prefixStats = config.prefixStats || 'statsd';
+
// amqp settings
- amqpQueue = config.amqpQueue || 'statsd';
- var options = {};
-
- options.host = config.amqpHost;
- options.port = config.amqpPort || 5672;
- if (typeof config.amqpLogin !== 'undefined')
- {
- options.login = config.amqpLogin;
- }
- if (typeof config.amqpPassword !== 'undefined')
- {
- options.password = config.amqpPassword;
+ var sslOptions = {};
+ var connectPrefix = 'amqp://';
+
+ options.host = config.amqp.host || 'localhost';
+ options.port = config.amqp.port || 5672;
+ options.login = config.amqp.login || 'guest';
+ options.password = config.amqp.password || 'guest';
+ options.vhost = config.amqp.vhost || '/';
+ options.exchange = config.amqp.defaultExchange || '';
+ options.format = config.amqp.messageFormat || 'json';
+
+ // ssl settings
+ if (typeof config.amqp.ssl !== 'undefined' && config.amqp.ssl.enabled == true) {
+ connectPrefix = 'amqps://';
+ options.port = config.amqp.port || 5671;
+ var passphrase = config.amqp.ssl.passphrase || '';
+ var reject = config.amqp.ssl.rejectUnauthorized || false;
+
+ sslOptions = {
+ passphrase: passphrase,
+ cert: fs.readFileSync(config.amqp.ssl.certFile),
+ key: fs.readFileSync(config.amqp.ssl.keyFile),
+ ca: [fs.readFileSync(config.amqp.ssl.caFile)],
+ rejectUnauthorized: reject
+ }
}
- options.vhost = config.amqpVhost || '/';
- options.defaultExchange = config.amqpDefaultExchange || '';
- conn = amqp.createConnection(options);
+ connectUri = connectPrefix + options.login + ':' + options.password + '@' + options.host + ':' + options.port + '/' + options.vhost;
+ connect(connectUri, sslOptions, function(cb) {
+ open = cb;
+ });
amqpStats.last_flush = startup_time;
amqpStats.last_exception = startup_time;
flushInterval = config.flushInterval;
+ flush_counts = typeof(config.flush_counts) === "undefined" ? true : config.flush_counts;
+
events.on('flush', flush_stats);
events.on('status', backend_status);