Permalink
Browse files

Make map and filter more concurrent

Makes Promise.map, Promise.filter, .filter and .map more concurrent. Previously
they waited for the entire input array to be fulfilled before starting to
process it.

The callback order is still the same as before, e.g. if 4th promise in the array
fulfills while 2nd one is still pending, the callback for 4th value in the array
is not called until 2nd and 3rd are fulfilled and had their callbacks executed.
  • Loading branch information...
petkaantonov committed Mar 29, 2014
1 parent e2c3412 commit 8085922fb95a9987fda0cf2337598ab4a98dc315
Showing with 174 additions and 86 deletions.
  1. +1 −1 Gruntfile.js
  2. +1 −1 src/filter.js
  3. +131 −83 src/map.js
  4. +41 −1 test/mocha/when_map.js
View
@@ -53,7 +53,7 @@ module.exports = function( grunt ) {
"call_get.js": ['Promise'],
"filter.js": ['Promise', 'Promise$_CreatePromiseArray', 'PromiseArray', 'apiRejection'],
"generators.js": ['Promise', 'apiRejection', 'INTERNAL'],
"map.js": ['Promise', 'Promise$_CreatePromiseArray', 'PromiseArray', 'apiRejection'],
"map.js": ['Promise', 'PromiseArray', 'INTERNAL', 'apiRejection'],
"nodeify.js": ['Promise'],
"promisify.js": ['Promise', 'INTERNAL'],
"props.js": ['Promise', 'PromiseArray'],
View
@@ -4,7 +4,7 @@ module.exports = function(Promise) {
var isArray = require("./util.js").isArray;
function Promise$_filter(booleans) {
var values = this._settledValue;
var values = this instanceof Promise ? this._settledValue : this;
ASSERT(isArray(values));
ASSERT(isArray(booleans));
ASSERT(values.length === booleans.length);
View
@@ -1,103 +1,151 @@
"use strict";
module.exports = function(
Promise, Promise$_CreatePromiseArray, PromiseArray, apiRejection) {
module.exports = function(Promise, PromiseArray, INTERNAL, apiRejection) {
var ASSERT = require("./assert.js");
var all = Promise.all;
var util = require("./util.js");
var canAttach = require("./errors.js").canAttach;
var isArray = util.isArray;
var _cast = Promise._cast;
function Promise$_mapper(values) {
var fn = this;
var receiver = void 0;
if (typeof fn !== "function") {
receiver = fn.receiver;
fn = fn.fn;
}
ASSERT(typeof fn === "function");
var shouldDefer = false;
var ret = new Array(values.length);
if (receiver === void 0) {
for (var i = 0, len = values.length; i < len; ++i) {
var value = fn(values[i], i, len);
if (!shouldDefer) {
var maybePromise = Promise._cast(value,
Promise$_mapper, void 0);
if (maybePromise instanceof Promise) {
if (maybePromise.isFulfilled()) {
ret[i] = maybePromise._settledValue;
continue;
}
else {
shouldDefer = true;
}
value = maybePromise;
}
}
ret[i] = value;
}
}
else {
for (var i = 0, len = values.length; i < len; ++i) {
var value = fn.call(receiver, values[i], i, len);
if (!shouldDefer) {
var maybePromise = Promise._cast(value,
Promise$_mapper, void 0);
if (maybePromise instanceof Promise) {
if (maybePromise.isFulfilled()) {
ret[i] = maybePromise._settledValue;
continue;
}
else {
shouldDefer = true;
}
value = maybePromise;
}
}
ret[i] = value;
}
}
return shouldDefer
? Promise$_CreatePromiseArray(ret, PromiseArray,
Promise$_mapper, void 0).promise()
: ret;
function unpack(values) {
ASSERT(this.length === 4);
return Promise$_Map(values, this[0], this[1], this[2], this[3]);
}
function Promise$_Map(promises, fn, useBound, caller, ref) {
if (typeof fn !== "function") {
return apiRejection(NOT_FUNCTION_ERROR);
}
if (useBound === USE_BOUND && promises._isBound()) {
fn = {
fn: fn,
receiver: promises._boundTo
};
var receiver = void 0;
if (useBound === USE_BOUND) {
if (promises._isBound()) {
receiver = promises._boundTo;
}
}
else if (useBound !== DONT_USE_BOUND) {
receiver = useBound;
}
var ret = Promise$_CreatePromiseArray(
promises,
PromiseArray,
caller,
useBound === USE_BOUND && promises._isBound()
? promises._boundTo
: void 0
).promise();
if (ref !== void 0) {
ref.ref = ret;
var shouldUnwrapItems = ref !== void 0;
if (shouldUnwrapItems) ref.ref = promises;
if (promises instanceof Promise) {
return promises._then(unpack, void 0, void 0,
[fn, receiver, caller, ref], void 0, Promise$_Map);
}
else if (!isArray(promises)) {
return apiRejection(COLLECTION_ERROR);
}
return ret._then(
Promise$_mapper,
void 0,
void 0,
fn,
void 0,
caller
);
var promise = new Promise(INTERNAL);
if (receiver !== void 0) promise._setBoundTo(receiver);
promise._setTrace(caller, void 0);
var mapping = new Mapping(promise,
fn,
promises,
receiver,
shouldUnwrapItems);
mapping.init();
return promise;
}
var pending = {};
function Mapping(promise, callback, items, receiver, shouldUnwrapItems) {
this.shouldUnwrapItems = shouldUnwrapItems;
this.index = 0;
this.items = items;
this.callback = callback;
this.receiver = receiver;
this.promise = promise;
this.result = new Array(items.length);
}
util.inherits(Mapping, PromiseArray);
Mapping.prototype.init = function Mapping$init() {
var items = this.items;
var len = items.length;
var result = this.result;
var isRejected = false;
for (var i = 0; i < len; ++i) {
var maybePromise = _cast(items[i], void 0, void 0);
if (maybePromise instanceof Promise) {
if (maybePromise.isPending()) {
result[i] = pending;
maybePromise._proxyPromiseArray(this, i);
}
else if (maybePromise.isFulfilled()) {
result[i] = maybePromise.value();
}
else {
maybePromise._unsetRejectionIsUnhandled();
if (!isRejected) {
this.reject(maybePromise.reason());
isRejected = true;
}
}
}
else {
result[i] = maybePromise;
}
}
if (!isRejected) this.iterate();
};
Mapping.prototype.isResolved = function Mapping$isResolved() {
return this.promise === null;
};
Mapping.prototype._promiseProgressed =
function Mapping$_promiseProgressed(value) {
if (this.isResolved()) return;
this.promise._progress(value);
};
Mapping.prototype._promiseFulfilled =
function Mapping$_promiseFulfilled(value, index) {
if (this.isResolved()) return;
this.result[index] = value;
if (this.shouldUnwrapItems) this.items[index] = value;
if (this.index === index) this.iterate();
};
Mapping.prototype._promiseRejected =
function Mapping$_promiseRejected(reason) {
this.reject(reason);
};
Mapping.prototype.reject = function Mapping$reject(reason) {
if (this.isResolved()) return;
var trace = canAttach(reason) ? reason : new Error(reason + "");
this.promise._attachExtraTrace(trace);
this.promise._reject(reason, trace);
};
Mapping.prototype.iterate = function Mapping$iterate() {
var i = this.index;
var items = this.items;
var result = this.result;
var len = items.length;
var result = this.result;
var receiver = this.receiver;
var callback = this.callback;
for (; i < len; ++i) {
var value = result[i];
if (value === pending) {
this.index = i;
return;
}
try { result[i] = callback.call(receiver, value, i, len); }
catch (e) { return this.reject(e); }
}
this.promise._follow(all(result));
this.items = this.result = this.callback = this.promise = null;
};
Promise.prototype.map = function Promise$map(fn, ref) {
return Promise$_Map(this, fn, USE_BOUND, this.map, ref);
};
View
@@ -209,7 +209,6 @@ describe("when.map-test", function () {
}
//This test didn't contain the mapper argument so I assume
//when.js uses identity mapper in such cases.
//In bluebird it's illegal to call Promise.map without mapper function
return when.map(input, identity).then(function () {
assert(ncall === 6);
@@ -240,4 +239,45 @@ describe("when.map-test", function () {
});
function delay(val, ms) {
return new when(function(resolve) {
setTimeout(function() {
resolve(val);
}, ms);
});
}
specify("should be concurrent but process in-order", function(done) {
var firstProcessed = false;
var secondProcessed = false;
var thirdProcessed = false;
var first = delay(1, 20);
var second = delay(2, 100).then(function(){secondProcessed = true});
var third = delay(3, 100).then(function(){thirdProcessed = true});
when.map([first, second, third], function(integer) {
if (integer === 1) {
return delay(0, 10).then(function() {
assert(!secondProcessed);
assert(!thirdProcessed);
return delay(0, 10);
}).then(function() {
assert(!secondProcessed);
assert(!thirdProcessed);
return delay(0, 10);
}).then(function(){
firstProcessed = true;
});
}
else {
assert(firstProcessed);
}
}).then(function() {
assert(firstProcessed);
assert(secondProcessed);
assert(thirdProcessed);
done();
});
});
});

0 comments on commit 8085922

Please sign in to comment.