Permalink
Browse files

Add times function

  • Loading branch information...
1 parent a4e1d44 commit ef66d519fd9b94dcac2f1e645df6b9fd4aed8335 @wdavidw committed Nov 29, 2012
Showing with 167 additions and 24 deletions.
  1. +22 −15 lib/each.js
  2. +16 −9 src/each.coffee
  3. +129 −0 test/times.coffee
View
37 lib/each.js
@@ -10,7 +10,7 @@ Chained and parallel async iterator in one elegant function
*/
module.exports = function(elements) {
- var eacher, errors, events, isObject, keys, next, parallel, run, type;
+ var eacher, errors, events, isObject, keys, next, parallel, run, times, type;
type = typeof elements;
if (elements === null || type === 'undefined' || type === 'number' || type === 'string' || type === 'function' || type === 'boolean') {
elements = [elements];
@@ -28,10 +28,12 @@ module.exports = function(elements) {
end: [],
both: []
};
+ times = [];
eacher = {};
eacher.total = keys ? keys.length : elements.length;
eacher.started = 0;
eacher.done = 0;
+ times = 1;
eacher.paused = 0;
eacher.readable = true;
eacher.pause = function() {
@@ -45,22 +47,26 @@ module.exports = function(elements) {
if (typeof mode === 'number') {
parallel = mode;
} else if (mode) {
- parallel = eacher.total;
+ parallel = mode;
} else {
parallel = 1;
}
return eacher;
};
+ eacher.times = function(t) {
+ times = t;
+ return eacher;
+ };
eacher.on = function(ev, callback) {
events[ev].push(callback);
return eacher;
};
run = function() {
- var args, e, emitError, lboth, lerror, _i, _j, _k, _l, _len, _len1, _len2, _len3, _ref, _ref1, _ref2, _ref3;
+ var args, emit, emitError, index, lboth, lerror, _i, _j, _k, _l, _len, _len1, _len2, _len3, _ref, _ref1, _ref2, _ref3;
if (eacher.paused) {
return;
}
- if (eacher.done === eacher.total || (errors.length && eacher.started === eacher.done)) {
+ if (eacher.done === eacher.total * times || (errors.length && eacher.started === eacher.done)) {
eacher.readable = false;
if (errors.length) {
if (parallel !== 1) {
@@ -77,44 +83,45 @@ module.exports = function(elements) {
emitError = lerror || (!lerror && !lboth);
_ref = events.error;
for (_i = 0, _len = _ref.length; _i < _len; _i++) {
- e = _ref[_i];
+ emit = _ref[_i];
if (emitError) {
- e.apply(null, args);
+ emit.apply(null, args);
}
}
} else {
args = [];
_ref1 = events.end;
for (_j = 0, _len1 = _ref1.length; _j < _len1; _j++) {
- e = _ref1[_j];
- e();
+ emit = _ref1[_j];
+ emit();
}
}
_ref2 = events.both;
for (_k = 0, _len2 = _ref2.length; _k < _len2; _k++) {
- e = _ref2[_k];
- e.apply(null, args);
+ emit = _ref2[_k];
+ emit.apply(null, args);
}
return;
}
if (errors.length !== 0) {
return;
}
- while (Math.min(parallel - eacher.started + eacher.done, eacher.total - eacher.started)) {
+ while ((parallel === true ? (eacher.total * times - eacher.started) > 0 : Math.min(parallel - eacher.started + eacher.done, eacher.total * times - eacher.started))) {
if (errors.length !== 0) {
break;
}
+ index = Math.floor(eacher.started / times);
if (keys) {
- args = [next, keys[eacher.started], elements[keys[eacher.started]]];
+ args = [next, keys[index], elements[keys[index]]];
} else {
- args = [next, elements[eacher.started], eacher.started];
+ args = [next, elements[index], index];
}
eacher.started++;
try {
_ref3 = events.item;
for (_l = 0, _len3 = _ref3.length; _l < _len3; _l++) {
- e = _ref3[_l];
- e.apply(null, args);
+ emit = _ref3[_l];
+ emit.apply(null, args);
}
} catch (e) {
if (eacher.readable) {
View
25 src/each.coffee
@@ -22,10 +22,12 @@ module.exports = (elements) ->
error: []
end: []
both: []
+ times = []
eacher = {}
eacher.total = if keys then keys.length else elements.length
eacher.started = 0
eacher.done = 0
+ times = 1
eacher.paused = 0
eacher.readable = true
eacher.pause = ->
@@ -37,17 +39,21 @@ module.exports = (elements) ->
# Concurrent
if typeof mode is 'number' then parallel = mode
# Parallel
- else if mode then parallel = eacher.total
+ # else if mode then parallel = eacher.total
+ else if mode then parallel = mode
# Sequential (in case parallel is called multiple times)
else parallel = 1
eacher
+ eacher.times = (t) ->
+ times = t
+ eacher
eacher.on = (ev, callback) ->
events[ev].push callback
eacher
run = () ->
return if eacher.paused
# This is the end
- if eacher.done is eacher.total or (errors.length and eacher.started is eacher.done)
+ if eacher.done is eacher.total * times or (errors.length and eacher.started is eacher.done)
eacher.readable = false
if errors.length
if parallel isnt 1
@@ -62,26 +68,27 @@ module.exports = (elements) ->
lerror = events.error.length
lboth = events.both.length
emitError = lerror or (not lerror and not lboth)
- for e in events.error then e args... if emitError
+ for emit in events.error then emit args... if emitError
else
args = []
# eacher.emit 'end'
- for e in events.end then e()
+ for emit in events.end then emit()
# return eacher.emit 'both', args...
- for e in events.both then e args...
+ for emit in events.both then emit args...
return
return if errors.length isnt 0
- while Math.min( (parallel - eacher.started + eacher.done), (eacher.total - eacher.started) )
+ while (if parallel is true then (eacher.total * times - eacher.started) > 0 else Math.min( (parallel - eacher.started + eacher.done), (eacher.total * times - eacher.started) ) )
# Stop on synchronously sent error
break if errors.length isnt 0
# Time to call our iterator
+ index = Math.floor(eacher.started / times)
if keys
- then args = [next, keys[eacher.started], elements[keys[eacher.started]]]
- else args = [next, elements[eacher.started], eacher.started]
+ then args = [next, keys[index], elements[keys[index]]]
+ else args = [next, elements[index], index]
eacher.started++
try
# eacher.emit 'item', args...
- for e in events.item then e args...
+ for emit in events.item then emit args...
catch e
# prevent next to be called if an error occurend inside the
# error, end or both callbacks
View
129 test/times.coffee
@@ -0,0 +1,129 @@
+
+should = require 'should'
+each = if process.env.EACH_COV then require '../lib-cov/each' else require '../lib/each'
+
+describe 'Sequential', ->
+ it 'should run nothing 10 times', (next) ->
+ started = ended = 0
+ each()
+ .parallel(null)
+ .times(10)
+ .on 'item', (next, element, index) ->
+ # Check provided values
+ started.should.eql ended
+ should.not.exist element
+ index.should.eql 0
+ started++
+ setTimeout ->
+ ended++
+ started.should.eql ended
+ next()
+ , 10
+ .on 'end', ->
+ started.should.eql 10
+ next()
+ it 'should run an array 10 times', (next) ->
+ started = ended = 0
+ data = ['a', 'b', 'c']
+ each(data)
+ .parallel(null)
+ .times(10)
+ .on 'item', (next, element, index) ->
+ # Check provided values
+ started.should.eql ended
+ element.should.eql data[Math.floor started / 10]
+ index.should.eql Math.floor started / 10
+ started++
+ setTimeout ->
+ ended++
+ started.should.eql ended
+ next()
+ , 10
+ .on 'end', ->
+ started.should.eql 30
+ next()
+
+describe 'Parallel', ->
+ it 'should run nothing 10 times', (next) ->
+ started = ended = 0
+ each()
+ .parallel(true)
+ .times(10)
+ .on 'item', (next, element, index) ->
+ started.should.eql 0
+ ended.should.eql 0
+ process.nextTick -> started++
+ setTimeout ->
+ ended++
+ started.should.eql 10
+ next()
+ , 10
+ .on 'end', ->
+ started.should.eql 10
+ next()
+ it 'should run an array 10 times', (next) ->
+ started = ended = 0
+ each(['a', 'b', 'c'])
+ .parallel(true)
+ .times(10)
+ .on 'item', (next, element, index) ->
+ started.should.eql 0
+ ended.should.eql 0
+ process.nextTick -> started++
+ setTimeout ->
+ ended++
+ started.should.eql 30
+ next()
+ , 10
+ .on 'end', ->
+ started.should.eql 30
+ next()
+
+describe 'Concurrent', ->
+ it 'should run nothing 10 times', (next) ->
+ started = ended = 0
+ runnings = []
+ interval = null
+ interval = setInterval ->
+ running = started - ended
+ runnings[running] ?= 0
+ runnings[running]++
+ , 10
+ each()
+ .parallel(3)
+ .times(10)
+ .on 'item', (next, element, index) ->
+ process.nextTick -> started++
+ setTimeout ->
+ ended++
+ next()
+ , 100
+ .on 'end', ->
+ clearInterval interval
+ (
+ (typeof runnings[0] is 'undefined' or runnings[0] is 1) and
+ (runnings[1] >= 9 and runnings[1] <= 11) and
+ (typeof runnings[2] is 'undefined' or runnings[2] is 1) and
+ (runnings[3] >= 26 and runnings[3] <= 28)
+ ).should.be.ok
+ started.should.eql 10
+ next()
+ it 'should run an array 10 times', (next) ->
+ started = ended = 0
+ each(['a', 'b', 'c'])
+ .parallel(3)
+ .times(10)
+ .on 'item', (next, element, index) ->
+ started++
+ setTimeout ->
+ running = started - ended
+ total = 10 * 3
+ if started is 30
+ then running.should.eql started - ended
+ else running.should.eql 3
+ ended++
+ next()
+ , 100
+ .on 'end', ->
+ ended.should.eql 30
+ next()

0 comments on commit ef66d51

Please sign in to comment.