diff --git a/lib/status.js b/lib/status.js index 40525047..50799136 100644 --- a/lib/status.js +++ b/lib/status.js @@ -1,26 +1,38 @@ 'use strict'; -const CIRCUIT_BREAKER = Symbol('circuit-breaker'); const CIRCUIT_OPEN = Symbol('circuit-open'); -const STATS_WINDOW = Symbol('stats-window'); -const LISTENERS = Symbol('listeners'); +const WINDOW = Symbol('window'); +const BUCKETS = Symbol('buckets'); +const TIMEOUT = Symbol('timeout'); const FIRES = Symbol('fires'); const FAILS = Symbol('fails'); +const EventEmitter = require('events').EventEmitter; + /** * @class * Tracks execution status for a given {@link CircuitBreaker} * @param {CircuitBreaker} circuit the {@link CircuitBreaker} to track status for */ -class Status { +class Status extends EventEmitter { constructor (circuit) { - this[LISTENERS] = new Set(); - this[CIRCUIT_BREAKER] = circuit; - this[STATS_WINDOW] = []; + super(); this[FIRES] = 0; this[FAILS] = 0; this[CIRCUIT_OPEN] = false; + // Set up our statistical rolling window + this[BUCKETS] = circuit.options.rollingCountBuckets; + this[TIMEOUT] = circuit.options.rollingCountTimeout; + this[WINDOW] = new Array(this[BUCKETS]); + this.snapshot(); // take the first snapshot + + const interval = setInterval(this.snapshot.bind(this), + Math.floor(this[TIMEOUT] / this[BUCKETS])); + + // No unref() in the browser + if (typeof interval.unref === 'function') interval.unref(); + // Keep total numbers for fires/failures circuit.on('fire', () => this[FIRES]++); circuit.on('failure', () => this[FAILS]++); @@ -28,11 +40,12 @@ class Status { // Keep track of circuit open state circuit.on('open', () => { this[CIRCUIT_OPEN] = true; - this[STATS_WINDOW][0].isCircuitBreakerOpen = true; + this[WINDOW][0].isCircuitBreakerOpen = true; }); + circuit.on('close', () => { this[CIRCUIT_OPEN] = false; - this[STATS_WINDOW][0].isCircuitBreakerOpen = false; + this[WINDOW][0].isCircuitBreakerOpen = false; }); circuit.on('success', increment(this, 'successes')); @@ -43,46 +56,25 @@ class Status { circuit.on('reject', increment(this, 'rejects')); circuit.on('cacheHit', increment(this, 'cacheHits')); circuit.on('cacheMiss', increment(this, 'cacheMisses')); - - // Set up our statistical rolling window - const buckets = circuit.options.rollingCountBuckets; - const timeout = circuit.options.rollingCountTimeout; - - // Add the first bucket to the window - this[STATS_WINDOW].unshift(stats(this)); - - // TODO: do we guard against divide by zero, and for - // greater accuracy, do we require that timeout be - // evenly divisible by the number of buckets? - const bucketInterval = Math.floor(timeout / buckets); - const interval = setInterval(() => { - const window = this[STATS_WINDOW]; - if (window.length === buckets) { - window.pop(); - } - let next = stats(this); - window.unshift(next); - for (const listener of this[LISTENERS]) { - listener.call(listener, window[1]); - } - }, bucketInterval); - if (typeof interval.unref === 'function') interval.unref(); } - /** - * Add a status listener which will be called with the most - * recently completed snapshot each time a new one is created. - * @param {any} listener - */ - addSnapshotListener (listener) { - this[LISTENERS].add(listener); + snapshot () { + this[WINDOW].pop(); + let next = stats(this); + + this[WINDOW].unshift(next); + /** + * @emits 'snapshot' when a new status snapshot is taken + */ + this.emit('snapshot', next); + return next; } /** * Gets the full stats window as an array of objects. */ get window () { - return this[STATS_WINDOW].slice(); + return this[WINDOW].slice(); } /** @@ -90,15 +82,14 @@ class Status { * during the current statistical window. */ get successes () { - return this[STATS_WINDOW][0].successes; + return this[WINDOW][0].successes; } /** * The number of times the breaker's action has failed - * during the current statistical window. */ get failures () { - return this[STATS_WINDOW][0].failures; + return this[FAILS]; } /** @@ -106,7 +97,7 @@ class Status { * during the current statistical window. */ get fallbacks () { - return this[STATS_WINDOW][0].fallbacks; + return this[WINDOW][0].fallbacks; } /** @@ -114,15 +105,14 @@ class Status { * this breaker been rejected because it was in the open state. */ get rejects () { - return this[STATS_WINDOW][0].rejects; + return this[WINDOW][0].rejects; } /** - * The number of times this circuit breaker has been fired - * during the current statistical window. + * The total number of times this circuit breaker has been fired */ get fires () { - return this[STATS_WINDOW][0].fires; + return this[FIRES]; } /** @@ -130,7 +120,7 @@ class Status { * during the current statistical window. */ get timeouts () { - return this[STATS_WINDOW][0].timeouts; + return this[WINDOW][0].timeouts; } /** @@ -139,7 +129,7 @@ class Status { * caching, then this value will always be 0. */ get cacheHits () { - return this[STATS_WINDOW][0].cacheHits; + return this[WINDOW][0].cacheHits; } /** @@ -148,14 +138,16 @@ class Status { * this value will always be 0. */ get cacheMisses () { - return this[STATS_WINDOW][0].cacheMisses; + return this[WINDOW][0].cacheMisses; } } const increment = - (status, property) => () => status[STATS_WINDOW][0][property]++; + (status, property) => () => { + status[WINDOW][0][property]++; + }; -const stats = (circuit) => ({ +const stats = circuit => ({ isCircuitBreakerOpen: circuit[CIRCUIT_OPEN], failures: 0, fallbacks: 0, diff --git a/test/test.js b/test/test.js index 1d97ccb3..79f120ef 100644 --- a/test/test.js +++ b/test/test.js @@ -341,7 +341,7 @@ test('CircuitBreaker status', (t) => { }); test('CircuitBreaker rolling counts', (t) => { - const opts = { rollingCountTimeout: 1000, rollingCountBuckets: 10 }; + const opts = { rollingCountTimeout: 200, rollingCountBuckets: 2 }; const breaker = cb(passFail, opts); const deepEqual = (t, expected) => (actual) => t.deepEqual(actual, expected, 'expected status values'); Fidelity.all([ @@ -355,24 +355,17 @@ test('CircuitBreaker rolling counts', (t) => { setTimeout(() => { const window = breaker.status.window; t.ok(window.length > 1); - t.equal(window[window.length - 1].fires, 3, 'breaker stats are rolling'); t.deepEqual(breaker.status.successes, 0, 'breaker reset stats'); t.end(); - }, 100); + }, 300); }); }); test('CircuitBreaker status listeners', (t) => { - // 100ms snapshot intervals should ensure that event stats - // will be scattered across > 1 snapshot const opts = { rollingCountTimeout: 2500, rollingCountBuckets: 25 }; const breaker = cb(passFail, opts); - const results = { - successes: 0, - fires: 0 - }; - breaker.status.addSnapshotListener((snapshot) => { + breaker.status.on('snapshot', (snapshot) => { t.ok(snapshot.successes !== undefined, 'has successes stat'); t.ok(snapshot.fires !== undefined, 'has fires stat'); t.ok(snapshot.failures !== undefined, 'has failures stat'); @@ -380,16 +373,9 @@ test('CircuitBreaker status listeners', (t) => { t.ok(snapshot.rejects !== undefined, 'has rejects stat'); t.ok(snapshot.timeouts !== undefined, 'has timeouts stat'); - results.successes += snapshot.successes; - results.fires += snapshot.fires; + breaker.status.removeAllListeners('snapshot'); }); - breaker.fire(10) - .then(() => breaker.fire(10)) - .then(() => breaker.fire(10)) - .then(() => breaker.fire(10)) - .then(() => breaker.fire(10)) - .then(() => t.equal(results.fires, 5) && t.equal(results.successes, 5)) - .then(t.end); + breaker.fire(10).then(_ => t.end()); }); test('CircuitBreaker fallback event', (t) => {