Skip to content

Commit

Permalink
Merge pull request #180 from nordfjord/fix/atomic-update
Browse files Browse the repository at this point in the history
Fixes #179
  • Loading branch information
paldepind committed Jul 11, 2018
2 parents 190d68a + 1f831a7 commit 98b3a70
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 57 deletions.
8 changes: 3 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
language: node_js
node_js:
- "0.12"
- "0.10"
- "iojs"
- "4"
- "5"
- "6"
- "7"
- "8"
after_success:
- "npm run post-to-coveralls-io"
91 changes: 57 additions & 34 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ var toUpdate = [];
var inStream;
var order = [];
var orderNextIdx = -1;
var flushing = false;
var flushingUpdateQueue = false;
var flushingStreamValue = false;

function flushing() {
return flushingUpdateQueue || flushingStreamValue;
}


/** @namespace */
var flyd = {}
Expand Down Expand Up @@ -180,9 +186,10 @@ flyd.endsOn = function(endS, s) {
* var squaredNumbers = flyd.map(function(n) { return n*n; }, numbers);
*/
// Library functions use self callback to accept (null, undefined) update triggers.
flyd.map = curryN(2, function(f, s) {
function map(f, s) {
return combine(function(s, self) { self(f(s.val)); }, [s]);
})
}
flyd.map = curryN(2, map)

/**
* Chain a stream
Expand Down Expand Up @@ -379,7 +386,7 @@ flyd.curryN = curryN
* var numbers = flyd.stream(0);
* var squaredNumbers = numbers.map(function(n) { return n*n; });
*/
function boundMap(f) { return flyd.map(f, this); }
function boundMap(f) { return map(f, this); }

/**
* Returns the result of applying function `fn` to this stream
Expand Down Expand Up @@ -421,7 +428,7 @@ function chain(f, s) {
internalEnded(newS.end);

// Update self on call -- newS is never handed out so deps don't matter
last = flyd.map(own, newS);
last = map(own, newS);
}, [s]);

flyd.endsOn(flatEnd.end, flatStream);
Expand Down Expand Up @@ -524,12 +531,12 @@ function streamToString() {
function createStream() {
function s(n) {
if (arguments.length === 0) return s.val
updateStreamValue(s, n)
updateStreamValue(n, s)
return s
}
s.hasVal = false;
s.val = undefined;
s.vals = [];
s.updaters = [];
s.listeners = [];
s.queued = false;
s.end = undefined;
Expand Down Expand Up @@ -580,11 +587,23 @@ function createDependentStream(deps, fn) {
* @param {stream} stream - the stream to check depencencies from
* @return {Boolean} `true` if all dependencies have vales, `false` otherwise
*/
function initialDepsNotMet(stream) {
function initialDependenciesMet(stream) {
stream.depsMet = stream.deps.every(function(s) {
return s.hasVal;
});
return !stream.depsMet;
return stream.depsMet;
}

function dependenciesAreMet(stream) {
return stream.depsMet === true || initialDependenciesMet(stream);
}

function isEnded(stream) {
return stream.end && stream.end.val === true;
}

function listenersNeedUpdating(s) {
return s.listeners.some(function(s) { return s.shouldUpdate; });
}

/**
Expand All @@ -593,19 +612,9 @@ function initialDepsNotMet(stream) {
* @param {stream} stream - the stream to update
*/
function updateStream(s) {
if ((s.depsMet !== true && initialDepsNotMet(s)) ||
(s.end !== undefined && s.end.val === true)) {
if (toUpdate.length > 0 && inStream !== undefined) {
toUpdate.push(function() {
updateStream(s);
});
}
return;
}
if (isEnded(s) || !dependenciesAreMet(s)) return;
if (inStream !== undefined) {
toUpdate.push(function() {
updateStream(s);
});
updateLaterUsing(updateStream, s);
return;
}
inStream = s;
Expand All @@ -617,15 +626,23 @@ function updateStream(s) {
inStream = undefined;
if (s.depsChanged !== undefined) s.depsChanged = [];
s.shouldUpdate = false;
if (flushing === false) flushUpdate();
if (flushing() === false) flushUpdate();
if (listenersNeedUpdating(s)) {
if (!flushingStreamValue) s(s.val)
else {
s.listeners.forEach(function(listener) {
if (listener.shouldUpdate) updateLaterUsing(updateStream, listener);
});
}
}
}

/**
* @private
* Update the dependencies of a stream
* @param {stream} stream
*/
function updateDeps(s) {
function updateListeners(s) {
var i, o, list
var listeners = s.listeners;
for (i = 0; i < listeners.length; ++i) {
Expand Down Expand Up @@ -663,16 +680,23 @@ function findDeps(s) {
}
}

function updateLaterUsing(updater, stream) {
toUpdate.push(stream);
stream.updaters.push(updater);
stream.shouldUpdate = true;
}

/**
* @private
*/
function flushUpdate() {
flushing = true;
flushingUpdateQueue = true;
while (toUpdate.length > 0) {
var updater = toUpdate.shift();
updater();
var stream = toUpdate.shift();
var nextUpdateFn = stream.updaters.shift();
if (nextUpdateFn && stream.shouldUpdate) nextUpdateFn(stream);
}
flushing = false;
flushingUpdateQueue = false;
}

/**
Expand All @@ -681,19 +705,18 @@ function flushUpdate() {
* @param {stream} stream
* @param {*} value
*/
function updateStreamValue(s, n) {
function updateStreamValue(n, s) {
s.val = n;
s.hasVal = true;
if (inStream === undefined) {
flushing = true;
updateDeps(s);
if (toUpdate.length > 0) flushUpdate(); else flushing = false;
flushingStreamValue = true;
updateListeners(s);
if (toUpdate.length > 0) flushUpdate();
flushingStreamValue = false;
} else if (inStream === s) {
markListeners(s, s.listeners);
} else {
toUpdate.push(function() {
updateStreamValue(s, n);
});
updateLaterUsing(function(s) { updateStreamValue(n, s); }, s);
}
}

Expand Down
13 changes: 7 additions & 6 deletions module/switchlatest/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
var flyd = require('../../lib');
var takeUntil = require('../takeuntil');
var drop = require('ramda/src/drop');

var dropCurrentValue = flyd.transduce(drop(1));

module.exports = function(s) {
var inner;
return flyd.combine(function(s, self) {
inner = s();
flyd.endsOn(flyd.merge(s, inner.end), flyd.combine(function(inner) {
self(inner());
}, [inner]));
return flyd.combine(function(stream$, self) {
var value$ = stream$();
flyd.on(self, takeUntil(value$, dropCurrentValue(stream$)));
}, [s]);
};
2 changes: 1 addition & 1 deletion module/switchlatest/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ var assert = require('assert');

var switchLatest = require('../index.js');

describe('takeUntil', function() {
describe('switchLatest', function() {
it('emits values from first stream in stream', function() {
var result = [];
var source = stream();
Expand Down
2 changes: 1 addition & 1 deletion module/takeuntil/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('takeUntil', function() {
var terminator = stream();
var s = takeUntil(source, terminator);
flyd.map(function(v) { result.push(v); }, s);
s(1)(2)(3);
source(1)(2)(3);
assert.deepEqual(result, [1, 2, 3]);
});
it('ends when value emitted from second stream', function() {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
},
"scripts": {
"test-lib": "mocha",
"test": "eslint --fix lib/ test/ module/ && mocha -R dot test/*.js module/**/test/*.js",
"test": "eslint --fix lib/ test/ module/ && mocha test/*.js module/**/test/*.js",
"docs": "documentation -f md lib/index.js > API.md",
"perf": "./perf/run-benchmarks",
"coverage": "istanbul cover _mocha -- -R spec",
Expand Down
46 changes: 37 additions & 9 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,19 +279,34 @@ describe('stream', function() {
it('can create multi-level dependent streams inside a stream body', function() {
var result = 0;
var externalStream = stream(0);
function mapper(val) {
++result;
return val + 1;
}
stream(1).map(function() {
externalStream
.map(function() {
result += 1;
return 0;
})
.map(function() {
result += 2;
return 0;
});
.map(mapper)
.map(mapper);
return;
});
assert.equal(result, 3);
assert.equal(result, 2);
});
it('can create multi-level dependent streams inside a stream body part 2', function() {
var result = '';
var externalStream = stream(0);
var theStream = stream(1);
function mapper(val) {
result += '' + val;
return val + 1;
}
theStream.map(function() {
externalStream
.map(mapper)
.map(mapper);
return;
});
theStream(1);
assert.equal(result, '0101');
});
});

Expand Down Expand Up @@ -1006,6 +1021,19 @@ describe('stream', function() {
[], [1, 3, 2], [2, 8, 7, 6], [3, 5, 4]
]);
});
it('nested streams atomic update', function() {
var invocationCount = 0;
var mapper = function(val) {
invocationCount += 1;
return val + 1;
};
stream(1).map(function() {
stream(0)
.map(mapper)
.map(mapper);
});
assert.equal(invocationCount, 2);
});
});

describe('fantasy-land', function() {
Expand Down

0 comments on commit 98b3a70

Please sign in to comment.