Skip to content

Commit

Permalink
feat: Addition of Hystrix Mertrics Stream. GH-ISSUE nodeshift#39
Browse files Browse the repository at this point in the history
A stream is now provided using the circuit.hystrixStats.getHystrixStream method.
This stream can be easily turned into a SSE stream for use with a Hystrix Dashboard.
  • Loading branch information
lholmquist committed Apr 5, 2017
1 parent 3619678 commit a6b6591
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 0 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,11 @@ you create the breaker. E.g.
// Force opossum to use native JS promises
const breaker = circuitBreaker(readFile, { Promise: Promise });
```

### Hystrix Metrics

A Hystrix Stream is available for use with a Hystrix Dashboard using the `circuitBreaker.hystrixStats.getHystrixStream` method.

This method returns a [Node.js Stream](https://nodejs.org/api/stream.html), which makes it easy to create an SSE stream that will be compliant with a Hystrix Dashboard.

Additional Reading: [Hystrix Metrics Event Stream](https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream), [Turbine](https://github.com/Netflix/Turbine/wiki), [Hystrix Dashboard](https://github.com/Netflix/Hystrix/wiki/Dashboard)
11 changes: 11 additions & 0 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const EventEmitter = require('events');
const Status = require('./status');
const HystrixStats = require('./hystrix-stats');

const STATE = Symbol('state');
const OPEN = Symbol('open');
Expand All @@ -12,6 +13,7 @@ const FALLBACK_FUNCTION = Symbol('fallback');
const STATUS = Symbol('status');
const NAME = Symbol('name');
const GROUP = Symbol('group');
const HYSTRIX_STATS = Symbol('hystrix-stats');
const CACHE = new WeakMap();

/**
Expand Down Expand Up @@ -95,6 +97,9 @@ class CircuitBreaker extends EventEmitter {
if (this.options.cache) {
CACHE.set(this, undefined);
}

// Register this instance of the circuit breaker with the hystrix stats listener
this[HYSTRIX_STATS] = new HystrixStats(this);
}

/**
Expand Down Expand Up @@ -179,6 +184,12 @@ class CircuitBreaker extends EventEmitter {
return this[STATUS].stats;
}

/**
A convenience function that returns the hystrixStats
*/
get hystrixStats () {
return this[HYSTRIX_STATS];
}
/**
* Provide a fallback function for this {@link CircuitBreaker}. This
* function will be executed when the circuit is `fire`d and fails.
Expand Down
69 changes: 69 additions & 0 deletions lib/hystrix-formatter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
'use strict';

// A function to map our stats data to the hystrix format
// returns JSON
function hystrixFormatter (stats) {
const json = {};
json.type = 'HystrixCommand';
json.name = stats.name;
json.group = stats.group;
json.currentTime = new Date();
json.isCircuitBreakerOpen = !stats.closed;
json.errorPercentage = stats.fires === 0 ? 0 : (stats.failures / stats.fires) * 100;
json.errorCount = stats.failures;
json.requestCount = stats.fires;
json.rollingCountBadRequests = stats.failures;
json.rollingCountCollapsedRequests = 0;
json.rollingCountEmit = stats.fires;
json.rollingCountExceptionsThrown = 0;
json.rollingCountFailure = stats.failures;
json.rollingCountFallbackEmit = stats.fallbacks;
json.rollingCountFallbackFailure = 0;
json.rollingCountFallbackMissing = 0;
json.rollingCountFallbackRejection = 0;
json.rollingCountFallbackSuccess = 0;
json.rollingCountResponsesFromCache = stats.cacheHits;
json.rollingCountSemaphoreRejected = stats.rejects;
json.rollingCountShortCircuited = stats.rejects;
json.rollingCountSuccess = stats.successes;
json.rollingCountThreadPoolRejected = 0;
json.rollingCountTimeout = stats.timeouts;
json.currentConcurrentExecutionCount = 0;
json.rollingMaxConcurrentExecutionCount = 0;
// TODO: caluclate these latency values
json.latencyExecute_mean = 0;
json.latencyExecute = {
'0': 0,
'25': 0,
'50': 0,
'75': 0,
'90': 0,
'95': 0,
'99': 0,
'99.5': 0,
'100': 0
};
json.latencyTotal_mean = 0;
json.latencyTotal = { '0': 0, '25': 0, '50': 0, '75': 0, '90': 0, '95': 0, '99': 0, '99.5': 0, '100': 0 };
json.propertyValue_circuitBreakerRequestVolumeThreshold = 5;
json.propertyValue_circuitBreakerSleepWindowInMilliseconds = stats.options.resetTimeout;
json.propertyValue_circuitBreakerErrorThresholdPercentage = stats.options.errorThresholdPercentage;
json.propertyValue_circuitBreakerForceOpen = false;
json.propertyValue_circuitBreakerForceClosed = false;
json.propertyValue_circuitBreakerEnabled = true; // Whether circuit breaker should be enabled.
json.propertyValue_executionIsolationStrategy = 'THREAD';
json.propertyValue_executionIsolationThreadTimeoutInMilliseconds = 300;
json.propertyValue_executionTimeoutInMilliseconds = stats.options.timeout;
json.propertyValue_executionIsolationThreadInterruptOnTimeout = true;
json.propertyValue_executionIsolationThreadPoolKeyOverride = null;
json.propertyValue_executionIsolationSemaphoreMaxConcurrentRequests = 10;
json.propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests = 10;
json.propertyValue_metricsRollingStatisticalWindowInMilliseconds = 10000;
json.propertyValue_requestCacheEnabled = stats.options.cache || false;
json.propertyValue_requestLogEnabled = true;
json.reportingHosts = 1;

return json;
}

module.exports = exports = hystrixFormatter;
72 changes: 72 additions & 0 deletions lib/hystrix-stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
'use strict';

const stream = require('stream');
const hystrixFormatter = require('./hystrix-formatter');

/**
* @class
* <p>
* Stream Hystrix Metrics for a given {@link CircuitBreaker}.
* A HystrixStats instance is created for every {@link CircuitBreaker}
* and does not typically need to be created by a user.
* </p>
* <p>
* A HystrixStats instance will listen for all events on the {@link CircuitBreaker.status.snapshot}
* and format the data to the proper Hystrix format. Making it easy to construct an Event Stream for a client
* </p>
*
* @example
* const circuit = circuitBreaker(fs.readFile, {});
*
* circuit.hystrixStats.getHystrixStream().pipe(response);
* @see CircuitBreaker#hystrixStats
*/
class HystrixStats {
constructor (circuit) {
this._circuit = circuit;

// Listen for the stats's snapshot event
this._circuit.status.on('snapshot', this._hystrixSnapshotListener.bind(this));

this._readableStream = new stream.Readable({
objectMode: true
});

// Need a _read() function to satisfy the protocol
this._readableStream._read = () => {};
this._readableStream.resume();

this._hystrixStream = new stream.Transform({
objectMode: true
});

// Need a _transform() function to satisfy the protocol
this._hystrixStream._transform = this._hystrixTransformer;
this._hystrixStream.resume();

this._readableStream.pipe(this._hystrixStream);
}

// The stats coming in should be already "Reduced"
_hystrixTransformer (stats, encoding, cb) {
const formattedStats = hystrixFormatter(stats);

// Need to take the stats and map them to the hystrix format
return cb(null, `data: ${JSON.stringify(formattedStats)}\n\n`);
}

/**
A convenience function that returns the hystrxStream
*/
getHystrixStream () {
return this._hystrixStream;
}

// This will take the stats data from the listener and push it on the stream to be transformed
_hystrixSnapshotListener (stats) {
const circuit = this._circuit;
this._readableStream.push(Object.assign({}, {name: circuit.name, closed: circuit.closed, group: circuit.group, options: circuit.options}, stats));
}
}

module.exports = exports = HystrixStats;
1 change: 1 addition & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ test('api', (t) => {
t.ok(breaker.closed, 'CircuitBreaker.closed');
t.ok(breaker.status, 'CircuitBreaker.status');
t.ok(breaker.options, 'CircuitBreaker.options');
t.ok(breaker.hystrixStats, 'CircuitBreaker.hystrixStats');
t.equals(breaker.action, passFail, 'CircuitBreaker.action');
t.end();
});
Expand Down

0 comments on commit a6b6591

Please sign in to comment.