Skip to content

Commit

Permalink
fix: make Status an EventEmitter
Browse files Browse the repository at this point in the history
Fixes: #40
  • Loading branch information
lance committed Mar 30, 2017
1 parent 05c0a2f commit 8aad11a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 73 deletions.
100 changes: 46 additions & 54 deletions lib/status.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,51 @@
'use strict';

const CIRCUIT_BREAKER = Symbol('circuit-breaker');
const CIRCUIT_OPEN = Symbol('circuit-open');
const STATS_WINDOW = Symbol('stats-window');
const LISTENERS = Symbol('listeners');
const WINDOW = Symbol('window');
const BUCKETS = Symbol('buckets');
const TIMEOUT = Symbol('timeout');
const FIRES = Symbol('fires');
const FAILS = Symbol('fails');

const EventEmitter = require('events').EventEmitter;

/**
* @class
* Tracks execution status for a given {@link CircuitBreaker}
* @param {CircuitBreaker} circuit the {@link CircuitBreaker} to track status for
*/
class Status {
class Status extends EventEmitter {
constructor (circuit) {
this[LISTENERS] = new Set();
this[CIRCUIT_BREAKER] = circuit;
this[STATS_WINDOW] = [];
super();
this[FIRES] = 0;
this[FAILS] = 0;
this[CIRCUIT_OPEN] = false;

// Set up our statistical rolling window
this[BUCKETS] = circuit.options.rollingCountBuckets;
this[TIMEOUT] = circuit.options.rollingCountTimeout;
this[WINDOW] = new Array(this[BUCKETS]);
this.snapshot(); // take the first snapshot

const interval = setInterval(this.snapshot.bind(this),
Math.floor(this[TIMEOUT] / this[BUCKETS]));

// No unref() in the browser
if (typeof interval.unref === 'function') interval.unref();

// Keep total numbers for fires/failures
circuit.on('fire', () => this[FIRES]++);
circuit.on('failure', () => this[FAILS]++);

// Keep track of circuit open state
circuit.on('open', () => {
this[CIRCUIT_OPEN] = true;
this[STATS_WINDOW][0].isCircuitBreakerOpen = true;
this[WINDOW][0].isCircuitBreakerOpen = true;
});

circuit.on('close', () => {
this[CIRCUIT_OPEN] = false;
this[STATS_WINDOW][0].isCircuitBreakerOpen = false;
this[WINDOW][0].isCircuitBreakerOpen = false;
});

circuit.on('success', increment(this, 'successes'));
Expand All @@ -43,94 +56,71 @@ class Status {
circuit.on('reject', increment(this, 'rejects'));
circuit.on('cacheHit', increment(this, 'cacheHits'));
circuit.on('cacheMiss', increment(this, 'cacheMisses'));

// Set up our statistical rolling window
const buckets = circuit.options.rollingCountBuckets;
const timeout = circuit.options.rollingCountTimeout;

// Add the first bucket to the window
this[STATS_WINDOW].unshift(stats(this));

// TODO: do we guard against divide by zero, and for
// greater accuracy, do we require that timeout be
// evenly divisible by the number of buckets?
const bucketInterval = Math.floor(timeout / buckets);
const interval = setInterval(() => {
const window = this[STATS_WINDOW];
if (window.length === buckets) {
window.pop();
}
let next = stats(this);
window.unshift(next);
for (const listener of this[LISTENERS]) {
listener.call(listener, window[1]);
}
}, bucketInterval);
if (typeof interval.unref === 'function') interval.unref();
}

/**
* Add a status listener which will be called with the most
* recently completed snapshot each time a new one is created.
* @param {any} listener
*/
addSnapshotListener (listener) {
this[LISTENERS].add(listener);
snapshot () {
this[WINDOW].pop();
let next = stats(this);

this[WINDOW].unshift(next);
/**
* @emits 'snapshot' when a new status snapshot is taken
*/
this.emit('snapshot', next);
return next;
}

/**
* Gets the full stats window as an array of objects.
*/
get window () {
return this[STATS_WINDOW].slice();
return this[WINDOW].slice();
}

/**
* The number of times the action for this breaker executed successfully
* during the current statistical window.
*/
get successes () {
return this[STATS_WINDOW][0].successes;
return this[WINDOW][0].successes;
}

/**
* The number of times the breaker's action has failed
* during the current statistical window.
*/
get failures () {
return this[STATS_WINDOW][0].failures;
return this[FAILS];
}

/**
* The number of times a fallback function has been executed
* during the current statistical window.
*/
get fallbacks () {
return this[STATS_WINDOW][0].fallbacks;
return this[WINDOW][0].fallbacks;
}

/**
* The number of times during the current statistical window that
* this breaker been rejected because it was in the open state.
*/
get rejects () {
return this[STATS_WINDOW][0].rejects;
return this[WINDOW][0].rejects;
}

/**
* The number of times this circuit breaker has been fired
* during the current statistical window.
* The total number of times this circuit breaker has been fired
*/
get fires () {
return this[STATS_WINDOW][0].fires;
return this[FIRES];
}

/**
* The number of times this circuit breaker has timed out
* during the current statistical window.
*/
get timeouts () {
return this[STATS_WINDOW][0].timeouts;
return this[WINDOW][0].timeouts;
}

/**
Expand All @@ -139,7 +129,7 @@ class Status {
* caching, then this value will always be 0.
*/
get cacheHits () {
return this[STATS_WINDOW][0].cacheHits;
return this[WINDOW][0].cacheHits;
}

/**
Expand All @@ -148,14 +138,16 @@ class Status {
* this value will always be 0.
*/
get cacheMisses () {
return this[STATS_WINDOW][0].cacheMisses;
return this[WINDOW][0].cacheMisses;
}
}

const increment =
(status, property) => () => status[STATS_WINDOW][0][property]++;
(status, property) => () => {
status[WINDOW][0][property]++;
};

const stats = (circuit) => ({
const stats = circuit => ({
isCircuitBreakerOpen: circuit[CIRCUIT_OPEN],
failures: 0,
fallbacks: 0,
Expand Down
24 changes: 5 additions & 19 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ test('CircuitBreaker status', (t) => {
});

test('CircuitBreaker rolling counts', (t) => {
const opts = { rollingCountTimeout: 1000, rollingCountBuckets: 10 };
const opts = { rollingCountTimeout: 200, rollingCountBuckets: 2 };
const breaker = cb(passFail, opts);
const deepEqual = (t, expected) => (actual) => t.deepEqual(actual, expected, 'expected status values');
Fidelity.all([
Expand All @@ -355,41 +355,27 @@ test('CircuitBreaker rolling counts', (t) => {
setTimeout(() => {
const window = breaker.status.window;
t.ok(window.length > 1);
t.equal(window[window.length - 1].fires, 3, 'breaker stats are rolling');
t.deepEqual(breaker.status.successes, 0, 'breaker reset stats');
t.end();
}, 100);
}, 300);
});
});

test('CircuitBreaker status listeners', (t) => {
// 100ms snapshot intervals should ensure that event stats
// will be scattered across > 1 snapshot
const opts = { rollingCountTimeout: 2500, rollingCountBuckets: 25 };
const breaker = cb(passFail, opts);

const results = {
successes: 0,
fires: 0
};
breaker.status.addSnapshotListener((snapshot) => {
breaker.status.on('snapshot', (snapshot) => {
t.ok(snapshot.successes !== undefined, 'has successes stat');
t.ok(snapshot.fires !== undefined, 'has fires stat');
t.ok(snapshot.failures !== undefined, 'has failures stat');
t.ok(snapshot.fallbacks !== undefined, 'has fallbacks stat');
t.ok(snapshot.rejects !== undefined, 'has rejects stat');
t.ok(snapshot.timeouts !== undefined, 'has timeouts stat');

results.successes += snapshot.successes;
results.fires += snapshot.fires;
breaker.status.removeAllListeners('snapshot');
});
breaker.fire(10)
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => t.equal(results.fires, 5) && t.equal(results.successes, 5))
.then(t.end);
breaker.fire(10).then(_ => t.end());
});

test('CircuitBreaker fallback event', (t) => {
Expand Down

0 comments on commit 8aad11a

Please sign in to comment.