diff --git a/.travis.yml b/.travis.yml index 6023fbd..a53ceaa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,7 @@ language: node_js node_js: - - "0.12" - - "0.10" - - "iojs" - - "4" - - "5" + - "6" + - "7" + - "8" after_success: - "npm run post-to-coveralls-io" diff --git a/lib/index.js b/lib/index.js index f29ecf9..bedbbb8 100644 --- a/lib/index.js +++ b/lib/index.js @@ -13,7 +13,13 @@ var toUpdate = []; var inStream; var order = []; var orderNextIdx = -1; -var flushing = false; +var flushingUpdateQueue = false; +var flushingStreamValue = false; + +function flushing() { + return flushingUpdateQueue || flushingStreamValue; +} + /** @namespace */ var flyd = {} @@ -180,9 +186,10 @@ flyd.endsOn = function(endS, s) { * var squaredNumbers = flyd.map(function(n) { return n*n; }, numbers); */ // Library functions use self callback to accept (null, undefined) update triggers. -flyd.map = curryN(2, function(f, s) { +function map(f, s) { return combine(function(s, self) { self(f(s.val)); }, [s]); -}) +} +flyd.map = curryN(2, map) /** * Chain a stream @@ -379,7 +386,7 @@ flyd.curryN = curryN * var numbers = flyd.stream(0); * var squaredNumbers = numbers.map(function(n) { return n*n; }); */ -function boundMap(f) { return flyd.map(f, this); } +function boundMap(f) { return map(f, this); } /** * Returns the result of applying function `fn` to this stream @@ -421,7 +428,7 @@ function chain(f, s) { internalEnded(newS.end); // Update self on call -- newS is never handed out so deps don't matter - last = flyd.map(own, newS); + last = map(own, newS); }, [s]); flyd.endsOn(flatEnd.end, flatStream); @@ -524,12 +531,12 @@ function streamToString() { function createStream() { function s(n) { if (arguments.length === 0) return s.val - updateStreamValue(s, n) + updateStreamValue(n, s) return s } s.hasVal = false; s.val = undefined; - s.vals = []; + s.updaters = []; s.listeners = []; s.queued = false; s.end = undefined; @@ -580,11 +587,23 @@ function createDependentStream(deps, fn) { * @param {stream} stream - the stream to check depencencies from * @return {Boolean} `true` if all dependencies have vales, `false` otherwise */ -function initialDepsNotMet(stream) { +function initialDependenciesMet(stream) { stream.depsMet = stream.deps.every(function(s) { return s.hasVal; }); - return !stream.depsMet; + return stream.depsMet; +} + +function dependenciesAreMet(stream) { + return stream.depsMet === true || initialDependenciesMet(stream); +} + +function isEnded(stream) { + return stream.end && stream.end.val === true; +} + +function listenersNeedUpdating(s) { + return s.listeners.some(function(s) { return s.shouldUpdate; }); } /** @@ -593,19 +612,9 @@ function initialDepsNotMet(stream) { * @param {stream} stream - the stream to update */ function updateStream(s) { - if ((s.depsMet !== true && initialDepsNotMet(s)) || - (s.end !== undefined && s.end.val === true)) { - if (toUpdate.length > 0 && inStream !== undefined) { - toUpdate.push(function() { - updateStream(s); - }); - } - return; - } + if (isEnded(s) || !dependenciesAreMet(s)) return; if (inStream !== undefined) { - toUpdate.push(function() { - updateStream(s); - }); + updateLaterUsing(updateStream, s); return; } inStream = s; @@ -617,7 +626,15 @@ function updateStream(s) { inStream = undefined; if (s.depsChanged !== undefined) s.depsChanged = []; s.shouldUpdate = false; - if (flushing === false) flushUpdate(); + if (flushing() === false) flushUpdate(); + if (listenersNeedUpdating(s)) { + if (!flushingStreamValue) s(s.val) + else { + s.listeners.forEach(function(listener) { + if (listener.shouldUpdate) updateLaterUsing(updateStream, listener); + }); + } + } } /** @@ -625,7 +642,7 @@ function updateStream(s) { * Update the dependencies of a stream * @param {stream} stream */ -function updateDeps(s) { +function updateListeners(s) { var i, o, list var listeners = s.listeners; for (i = 0; i < listeners.length; ++i) { @@ -663,16 +680,23 @@ function findDeps(s) { } } +function updateLaterUsing(updater, stream) { + toUpdate.push(stream); + stream.updaters.push(updater); + stream.shouldUpdate = true; +} + /** * @private */ function flushUpdate() { - flushing = true; + flushingUpdateQueue = true; while (toUpdate.length > 0) { - var updater = toUpdate.shift(); - updater(); + var stream = toUpdate.shift(); + var nextUpdateFn = stream.updaters.shift(); + if (nextUpdateFn && stream.shouldUpdate) nextUpdateFn(stream); } - flushing = false; + flushingUpdateQueue = false; } /** @@ -681,19 +705,18 @@ function flushUpdate() { * @param {stream} stream * @param {*} value */ -function updateStreamValue(s, n) { +function updateStreamValue(n, s) { s.val = n; s.hasVal = true; if (inStream === undefined) { - flushing = true; - updateDeps(s); - if (toUpdate.length > 0) flushUpdate(); else flushing = false; + flushingStreamValue = true; + updateListeners(s); + if (toUpdate.length > 0) flushUpdate(); + flushingStreamValue = false; } else if (inStream === s) { markListeners(s, s.listeners); } else { - toUpdate.push(function() { - updateStreamValue(s, n); - }); + updateLaterUsing(function(s) { updateStreamValue(n, s); }, s); } } diff --git a/module/switchlatest/index.js b/module/switchlatest/index.js index 74b831a..699f73e 100644 --- a/module/switchlatest/index.js +++ b/module/switchlatest/index.js @@ -1,11 +1,12 @@ var flyd = require('../../lib'); +var takeUntil = require('../takeuntil'); +var drop = require('ramda/src/drop'); + +var dropCurrentValue = flyd.transduce(drop(1)); module.exports = function(s) { - var inner; - return flyd.combine(function(s, self) { - inner = s(); - flyd.endsOn(flyd.merge(s, inner.end), flyd.combine(function(inner) { - self(inner()); - }, [inner])); + return flyd.combine(function(stream$, self) { + var value$ = stream$(); + flyd.on(self, takeUntil(value$, dropCurrentValue(stream$))); }, [s]); }; diff --git a/module/switchlatest/test/index.js b/module/switchlatest/test/index.js index 123db1b..650ebbf 100644 --- a/module/switchlatest/test/index.js +++ b/module/switchlatest/test/index.js @@ -4,7 +4,7 @@ var assert = require('assert'); var switchLatest = require('../index.js'); -describe('takeUntil', function() { +describe('switchLatest', function() { it('emits values from first stream in stream', function() { var result = []; var source = stream(); diff --git a/module/takeuntil/test/index.js b/module/takeuntil/test/index.js index 8bda6f5..b5be138 100644 --- a/module/takeuntil/test/index.js +++ b/module/takeuntil/test/index.js @@ -11,7 +11,7 @@ describe('takeUntil', function() { var terminator = stream(); var s = takeUntil(source, terminator); flyd.map(function(v) { result.push(v); }, s); - s(1)(2)(3); + source(1)(2)(3); assert.deepEqual(result, [1, 2, 3]); }); it('ends when value emitted from second stream', function() { diff --git a/package.json b/package.json index 8b142eb..14d47cd 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,7 @@ }, "scripts": { "test-lib": "mocha", - "test": "eslint --fix lib/ test/ module/ && mocha -R dot test/*.js module/**/test/*.js", + "test": "eslint --fix lib/ test/ module/ && mocha test/*.js module/**/test/*.js", "docs": "documentation -f md lib/index.js > API.md", "perf": "./perf/run-benchmarks", "coverage": "istanbul cover _mocha -- -R spec", diff --git a/test/index.js b/test/index.js index ca4e399..74cc58e 100644 --- a/test/index.js +++ b/test/index.js @@ -279,19 +279,34 @@ describe('stream', function() { it('can create multi-level dependent streams inside a stream body', function() { var result = 0; var externalStream = stream(0); + function mapper(val) { + ++result; + return val + 1; + } stream(1).map(function() { externalStream - .map(function() { - result += 1; - return 0; - }) - .map(function() { - result += 2; - return 0; - }); + .map(mapper) + .map(mapper); return; }); - assert.equal(result, 3); + assert.equal(result, 2); + }); + it('can create multi-level dependent streams inside a stream body part 2', function() { + var result = ''; + var externalStream = stream(0); + var theStream = stream(1); + function mapper(val) { + result += '' + val; + return val + 1; + } + theStream.map(function() { + externalStream + .map(mapper) + .map(mapper); + return; + }); + theStream(1); + assert.equal(result, '0101'); }); }); @@ -1006,6 +1021,19 @@ describe('stream', function() { [], [1, 3, 2], [2, 8, 7, 6], [3, 5, 4] ]); }); + it('nested streams atomic update', function() { + var invocationCount = 0; + var mapper = function(val) { + invocationCount += 1; + return val + 1; + }; + stream(1).map(function() { + stream(0) + .map(mapper) + .map(mapper); + }); + assert.equal(invocationCount, 2); + }); }); describe('fantasy-land', function() {