Skip to content

Commit

Permalink
added ability to limit number of outstanding processes for each, map …
Browse files Browse the repository at this point in the history
…and select
  • Loading branch information
refractalize committed Jul 12, 2011
1 parent dfa36ff commit b2b39da
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 35 deletions.
64 changes: 50 additions & 14 deletions specs.js
@@ -1,24 +1,58 @@
var assert = require('assert');
var zo = require('zo').zo;
var zo = require('./zo').zo;
require('../cupoftea/cupoftea');

spec('integers', function () {
var i = 0;
var assertCorrectResult = function (result) {
assert.deepEqual(result, [1, 2, 3, 4]);
};

spec('should be zero', function () {
assert.equal(i, 0);
});

spec('should be greater than one', function () {
assert.equal(i, 0);
});
spec('each', function () {
var list = [1, 2, 3, 4];

spec('sync', function () {
var resultItems = [];

zo([1, 2, 3, 4]).each(function (item, done) {
resultItems.push(item);
done();
}).results(shouldCall(function (items) {
assertCorrectResult(resultItems);
}));
});

spec('async', function () {
var resultItems = [];

zo([1, 2, 3, 4]).each(function (item, done) {
process.nextTick(function () {
resultItems.push(item);
done();
})
}).results(shouldCall(function (items) {
assertCorrectResult(resultItems);
}));
});

spec('limits outstanding functions', function () {
var resultItems = [];
var outstanding = 0;

zo([1, 2, 3, 4]).each(function (item, done) {
outstanding++;
assert.ok(outstanding <= 2, 'expected maximum of 2 outstanding functions, got ' + outstanding);

process.nextTick(function () {
resultItems.push(item);
outstanding--;
done();
})
}, {limit: 2}).results(shouldCall(function (items) {
assertCorrectResult(resultItems);
}));
});
});

spec('reduce left', function () {
var assertCorrectResult = function (result) {
assert.deepEqual(result, [1, 2, 3, 4]);
};

spec('async', function () {
zo([1, 2, 3, 4]).reduce([], function(memo, item, into) {
process.nextTick(function() {
Expand Down Expand Up @@ -65,6 +99,8 @@ spec('callstack', function () {
items.push(n);
}

console.log('items');

zo(items).reduce(0, function (memo, item, into) {
into(memo + 1);
}).results(shouldCall(function (res) {
Expand Down
57 changes: 36 additions & 21 deletions zo-function.js
Expand Up @@ -8,20 +8,42 @@ function (items, pipeline) {
}
}

var pipelineElement = function (processItem, addItem) {
var pipelineElement = function (processItem, addItem, options) {
options = (options || {});
var maxOutstandingProcesses = options.limit;
pipeline.push(function (items, next) {
var n = items.length;
var processedItems = [];
var pendingProcesses = [];
var outstandingProcesses = 0;

var canStartAnotherProcess = function () {
return !maxOutstandingProcesses || outstandingProcesses < maxOutstandingProcesses;
}

if (n > 0) {
_(items).each(function (item) {
processItem(item, function (processedItem) {
addItem(processedItems, item, processedItem);
n--;
if (n == 0) {
next(processedItems);
}
});
var process = function (doPending) {
outstandingProcesses++;
processItem(item, function (processedItem) {
addItem(processedItems, item, processedItem);
n--;
outstandingProcesses--;
var pendingProcess = pendingProcesses.shift();
if (canStartAnotherProcess() && pendingProcess) {
pendingProcess();
}
if (n == 0) {
next(processedItems);
}
});
};

if (canStartAnotherProcess()) {
process();
} else {
pendingProcesses.push(process);
}
});
} else {
next(processedItems);
Expand All @@ -30,13 +52,6 @@ function (items, pipeline) {
return zo(items, pipeline);
};

var trampoline = function (f) {
var next;
do {
next = f();
} while (next);
};

var foldl = function (first, folder) {
pipeline.push(function (items, next) {
var fold = function (foldedResult, index, items, next) {
Expand Down Expand Up @@ -82,24 +97,24 @@ function (items, pipeline) {
});
runPipeline(items, pipeline);
},
map: function (mapper) {
map: function (mapper, options) {
return pipelineElement(mapper, function (mappedItems, item, mappedItem) {
mappedItems.push(mappedItem);
});
}, options);
},
foldr: foldr,
foldl: foldl,
reduce: foldl,
reduceRight: foldr,
select: function (selector) {
select: function (selector, options) {
return pipelineElement(selector, function (selectedItems, item, itemSelected) {
if (itemSelected) selectedItems.push(item);
});
}, options);
},
each: function (doForEach) {
each: function (doForEach, options) {
return pipelineElement(doForEach, function (selectedItems, item, itemSelected) {
selectedItems.push(item);
});
}, options);
},
};
};

0 comments on commit b2b39da

Please sign in to comment.