Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: circuit status now contains a rolling window #34

Merged
merged 6 commits into from
Mar 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ const CACHE = new WeakMap();
* @param options.rollingCountTimeout Sets the duration of the statistical
* rolling window, in milliseconds. This is how long Opossum keeps metrics for
* the circuit breaker to use and for publishing. Default: 10000
* @param options.rollingCountBuckets sets the number of buckets the rolling
* statistical window is divided into. So, if options.rollingCountTimeout is
* 10000, and options.rollingCountBuckets is 10, then the statistical window
* will be 1000 1 second snapshots in the statistical window. Default: 10
* @fires CircuitBreaker#halfOpen
*/
class CircuitBreaker extends EventEmitter {
constructor (action, options) {
super();
this.options = options;
this.options.rollingCountTimeout = options.rollingCountTimeout || 10000;
this.options.rollingCountBuckets = options.rollingCountBuckets || 10;
this.Promise = options.Promise;
this[STATUS] = new Status(this);
this[STATE] = CLOSED;
Expand Down Expand Up @@ -179,29 +184,31 @@ class CircuitBreaker extends EventEmitter {
* @fires CircuitBreaker#timeout
*/
fire () {
const args = Array.prototype.slice.call(arguments);

/**
* Emitted when the circuit breaker action is executed
* @event CircuitBreaker#fire
*/
this.emit('fire', args);

if (CACHE.get(this) !== undefined) {
/**
* Emitted when the circuit breaker is using the cache
* @event CircuitBreaker#cacheHits
* and finds a value.
* @event CircuitBreaker#cacheHit
*/
this.emit('cacheHits');
this.emit('cacheHit');
return CACHE.get(this);
} else if (this.options.cache) {
/**
* Emitted when the circuit breaker is not using the cache but
* the cache option is enabled.
* @event CircuitBreaker#cacheHits
* Emitted when the circuit breaker does not find a value in
* the cache, but the cache option is enabled.
* @event CircuitBreaker#cacheMiss
*/
this.emit('cacheMisses');
this.emit('cacheMiss');
}

const args = Array.prototype.slice.call(arguments);
/**
* Emitted when the circuit breaker action is executed
* @event CircuitBreaker#fire
*/
this.emit('fire', args);

if (this.opened || (this.halfOpen && this[PENDING_CLOSE])) {
/**
* Emitted when the circuit breaker is open and failing fast
Expand Down
160 changes: 134 additions & 26 deletions lib/status.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
'use strict';

const CIRCUIT_BREAKER = Symbol('circuit-breaker');
const CIRCUIT_OPEN = Symbol('circuit-open');
const STATS_WINDOW = Symbol('stats-window');
const LISTENERS = Symbol('listeners');
const FIRES = Symbol('fires');
const FAILS = Symbol('fails');

/**
* @class
Expand All @@ -9,55 +14,158 @@ const CIRCUIT_BREAKER = Symbol('circuit-breaker');
*/
class Status {
constructor (circuit) {
reset(this);
this[LISTENERS] = new Set();
this[CIRCUIT_BREAKER] = circuit;
circuit.on('success', () => this.successes++);
circuit.on('failure', () => this.failures++);
circuit.on('fallback', () => this.fallbacks++);
circuit.on('timeout', () => this.timeouts++);
circuit.on('fire', () => this.fires++);
circuit.on('reject', () => this.rejects++);
circuit.on('cacheHits', () => this.cacheHits++);
circuit.on('cacheMisses', () => this.cacheMisses++);
const interval = setInterval(
() => reset(this), circuit.options.rollingCountTimeout);
this[STATS_WINDOW] = [];
this[FIRES] = 0;
this[FAILS] = 0;
this[CIRCUIT_OPEN] = false;

// Keep total numbers for fires/failures
circuit.on('fire', () => this[FIRES]++);
circuit.on('failure', () => this[FAILS]++);

// Keep track of circuit open state
circuit.on('open', () => {
this[CIRCUIT_OPEN] = true;
this[STATS_WINDOW][0].isCircuitBreakerOpen = true;
});
circuit.on('close', () => {
this[CIRCUIT_OPEN] = false;
this[STATS_WINDOW][0].isCircuitBreakerOpen = false;
});

circuit.on('success', increment(this, 'successes'));
circuit.on('failure', increment(this, 'failures'));
circuit.on('fallback', increment(this, 'fallbacks'));
circuit.on('timeout', increment(this, 'timeouts'));
circuit.on('fire', increment(this, 'fires'));
circuit.on('reject', increment(this, 'rejects'));
circuit.on('cacheHit', increment(this, 'cacheHits'));
circuit.on('cacheMiss', increment(this, 'cacheMisses'));

// Set up our statistical rolling window
const buckets = circuit.options.rollingCountBuckets;
const timeout = circuit.options.rollingCountTimeout;

// Add the first bucket to the window
this[STATS_WINDOW].unshift(stats(this));

// TODO: do we guard against divide by zero, and for
// greater accuracy, do we require that timeout be
// evenly divisible by the number of buckets?
const bucketInterval = Math.floor(timeout / buckets);
const interval = setInterval(() => {
const window = this[STATS_WINDOW];
if (window.length === buckets) {
window.pop();
}
let next = stats(this);
window.unshift(next);
for (const listener of this[LISTENERS]) {
listener.call(listener, window[1]);
}
}, bucketInterval);
if (typeof interval.unref === 'function') interval.unref();
}
}

function reset (status) {
/**
* The number of times the breaker's action has failed
* Add a status listener which will be called with the most
* recently completed snapshot each time a new one is created.
* @param {any} listener
*/
status.failures = 0;
addSnapshotListener (listener) {
this[LISTENERS].add(listener);
}

/**
* The number of times a fallback function has been executed
* Gets the full stats window as an array of objects.
*/
status.fallbacks = 0;
get window () {
return this[STATS_WINDOW].slice();
}

/**
* The number of times the action for this breaker executed successfully
* during the current statistical window.
*/
get successes () {
return this[STATS_WINDOW][0].successes;
}

/**
* The number of times the breaker's action has failed
* during the current statistical window.
*/
status.successes = 0;
get failures () {
return this[STATS_WINDOW][0].failures;
}

/**
* The number of times this breaker been rejected because it was fired, but in the open state.
* The number of times a fallback function has been executed
* during the current statistical window.
*/
status.rejects = 0;
get fallbacks () {
return this[STATS_WINDOW][0].fallbacks;
}

/**
* The number of times during the current statistical window that
* this breaker been rejected because it was in the open state.
*/
get rejects () {
return this[STATS_WINDOW][0].rejects;
}

/**
* The number of times this circuit breaker has been fired
* during the current statistical window.
*/
status.fires = 0;
get fires () {
return this[STATS_WINDOW][0].fires;
}

/**
* The number of times this circuit breaker has timed out
* during the current statistical window.
*/
status.timeouts = 0;
get timeouts () {
return this[STATS_WINDOW][0].timeouts;
}

/**
* The number of the cache hits
* The number of times this circuit breaker has retrieved
* a value from the cache instead. If the circuit does not use
* caching, then this value will always be 0.
*/
status.cacheHits = 0;
get cacheHits () {
return this[STATS_WINDOW][0].cacheHits;
}

/**
* The number of the cache misses
* The number of times this circuit breaker has looked in the
* cache and found nothing. If the circuit does not use caching then
* this value will always be 0.
*/
status.cacheMisses = 0;
get cacheMisses () {
return this[STATS_WINDOW][0].cacheMisses;
}
}

const increment =
(status, property) => () => status[STATS_WINDOW][0][property]++;

const stats = (circuit) => ({
isCircuitBreakerOpen: circuit[CIRCUIT_OPEN],
failures: 0,
fallbacks: 0,
successes: 0,
rejects: 0,
fires: 0,
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
start: Date.now()
});

module.exports = exports = Status;
57 changes: 51 additions & 6 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ test('Passes parameters to the circuit function', (t) => {
});

test('Using cache', (t) => {
t.plan(7);
t.plan(9);
const expected = 34;
const options = {
cache: true
Expand All @@ -69,16 +69,18 @@ test('Using cache', (t) => {

breaker.fire(expected)
.then((arg) => {
t.equals(breaker.status.cacheHits, 0);
t.equals(breaker.status.cacheMisses, 1);
t.equals(breaker.status.cacheHits, 0, 'does not hit the cache');
t.equals(breaker.status.cacheMisses, 1, 'emits a cacheMiss');
t.equals(breaker.status.fires, 1, 'fired once');
t.equals(arg, expected, `cache hits:misses ${breaker.status.cacheHits}:${breaker.status.cacheMisses}`);
})
.catch(t.fail)
.then(() => {
breaker.fire(expected)
.then((arg) => {
t.equals(breaker.status.cacheHits, 1);
t.equals(breaker.status.cacheMisses, 1);
t.equals(breaker.status.cacheHits, 1, 'hit the cache');
t.equals(breaker.status.cacheMisses, 1, 'did not emit miss');
t.equals(breaker.status.fires, 2, 'fired twice');
t.equals(arg, expected, `cache hits:misses ${breaker.status.cacheHits}:${breaker.status.cacheMisses}`);
breaker.clearCache();
})
Expand Down Expand Up @@ -192,6 +194,15 @@ test('Breaker resets after a configurable amount of time', (t) => {
});
});

test('Breaker status reflects open state', (t) => {
t.plan(1);
const breaker = cb(passFail, {maxFailures: 0, resetTimeout: 100});
breaker.fire(-1)
.then(t.fail)
.catch(() => t.ok(breaker.status.window[0].isCircuitBreakerOpen))
.then(t.end);
});

test('Breaker resets for circuits with a fallback function', (t) => {
t.plan(2);
const fails = -1;
Expand Down Expand Up @@ -330,7 +341,8 @@ test('CircuitBreaker status', (t) => {
});

test('CircuitBreaker rolling counts', (t) => {
const breaker = cb(passFail, { rollingCountTimeout: 100 });
const opts = { rollingCountTimeout: 1000, rollingCountBuckets: 10 };
const breaker = cb(passFail, opts);
const deepEqual = (t, expected) => (actual) => t.deepEqual(actual, expected, 'expected status values');
Fidelity.all([
breaker.fire(10).then(deepEqual(t, 10)),
Expand All @@ -341,12 +353,45 @@ test('CircuitBreaker rolling counts', (t) => {
t.deepEqual(breaker.status.successes, 3, 'breaker succeeded 3 times'))
.then(() => {
setTimeout(() => {
const window = breaker.status.window;
t.ok(window.length > 1);
t.equal(window[window.length - 1].fires, 3, 'breaker stats are rolling');
t.deepEqual(breaker.status.successes, 0, 'breaker reset stats');
t.end();
}, 100);
});
});

test('CircuitBreaker status listeners', (t) => {
// 100ms snapshot intervals should ensure that event stats
// will be scattered across > 1 snapshot
const opts = { rollingCountTimeout: 2500, rollingCountBuckets: 25 };
const breaker = cb(passFail, opts);

const results = {
successes: 0,
fires: 0
};
breaker.status.addSnapshotListener((snapshot) => {
t.ok(snapshot.successes !== undefined, 'has successes stat');
t.ok(snapshot.fires !== undefined, 'has fires stat');
t.ok(snapshot.failures !== undefined, 'has failures stat');
t.ok(snapshot.fallbacks !== undefined, 'has fallbacks stat');
t.ok(snapshot.rejects !== undefined, 'has rejects stat');
t.ok(snapshot.timeouts !== undefined, 'has timeouts stat');

results.successes += snapshot.successes;
results.fires += snapshot.fires;
});
breaker.fire(10)
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => t.equal(results.fires, 5) && t.equal(results.successes, 5))
.then(t.end);
});

test('CircuitBreaker fallback event', (t) => {
t.plan(1);
const breaker = cb(passFail, {maxFailures: 0});
Expand Down