Skip to content

Commit

Permalink
Fix multiple end callback in sync mode
Browse files Browse the repository at this point in the history
  • Loading branch information
wdavidw committed Mar 10, 2012
1 parent 9a1ca9a commit 67d8ca8
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 32 deletions.
16 changes: 9 additions & 7 deletions lib/each.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module.exports = (elements) ->
then args = [next, keys[started], elements[keys[started]]]
else args = [next, elements[started], started]
started++
# console.log 'start'
try
eacher.emit 'item', args...
catch e
Expand All @@ -54,6 +55,7 @@ module.exports = (elements) ->
run = () ->
return if eacher.paused
# This is the end
# console.log (done is total or (errors.length and started is done)), '=====', done, total, 'OR', errors.length isnt 0, started, done
if done is total or (errors.length and started is done)
eacher.readable = false
if errors.length
Expand All @@ -74,15 +76,15 @@ module.exports = (elements) ->
args = []
eacher.emit 'end'
return eacher.emit 'both', args...
return if errors.length
# Dont use for... since done may change in sync mode
call() while Math.min( (parallel - started + done), (total - started) )
return if errors.length isnt 0
while Math.min( (parallel - started + done), (total - started) )
# Stop on synchronously sent error
break if errors.length isnt 0
call()
# call()
next = (err) ->
errors.push err if err? and err instanceof Error
done++
run()
process.nextTick ->
# Empty iteration
return run() if total is 0
run() for key in [0 ... Math.min(parallel, total)]
process.nextTick run
eacher
65 changes: 41 additions & 24 deletions test/error.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ describe 'Error', ->
errs[1].message.should.eql 'Testing error in 7'
each( [ {id: 1}, {id: 2}, {id: 3}, {id: 4}, {id: 5}, {id: 6}, {id: 7}, {id: 8}, {id: 9}, {id: 10}, {id: 11} ] )
.parallel( 4 )
.on 'item', (n, element, index) ->
.on 'item', (next, element, index) ->
index.should.eql current
current++
setTimeout ->
if element.id is 6 or element.id is 7
n new Error "Testing error in #{element.id}"
next new Error "Testing error in #{element.id}"
else
n()
next()
, 100
.on 'error', (err, errs) ->
error_assert.call null, err, errs
Expand All @@ -40,14 +40,14 @@ describe 'Error', ->
current = 0
each( [{id: 1}, {id: 2}, {id: 3}, {id: 4}] )
.parallel( true )
.on 'item', (n, element, index) ->
.on 'item', (next, element, index) ->
index.should.eql current
current++
setTimeout ->
if element.id is 1 or element.id is 3
n( new Error "Testing error in #{element.id}" )
next( new Error "Testing error in #{element.id}" )
else
n()
next()
, 100
.on 'error', (err, errs) ->
err.message.should.eql 'Multiple errors (2)'
Expand All @@ -64,14 +64,14 @@ describe 'Error', ->
current = 0
each( [{id: 1}, {id: 2}, {id: 3}, {id: 4}] )
.parallel( true )
.on 'item', (n, element, index) ->
.on 'item', (next, element, index) ->
index.should.eql current
current++
setTimeout ->
if element.id is 3
n( new Error "Testing error in #{element.id}" )
next( new Error "Testing error in #{element.id}" )
else
n()
next()
, 100
.on 'error', (err, errs) ->
err.message.should.eql 'Testing error in 3'
Expand All @@ -84,14 +84,14 @@ describe 'Error', ->
current = 0
each( [{id: 1}, {id: 2}, {id: 3}, {id: 4}] )
.parallel( true )
.on 'item', (n, element, index) ->
.on 'item', (next, element, index) ->
index.should.eql current
current++
setTimeout ->
if element.id is 1 or element.id is 3
n( new Error "Testing error in #{element.id}" )
next( new Error "Testing error in #{element.id}" )
else
n()
next()
, 100
.on 'both', (err, errs) ->
err.message.should.eql 'Multiple errors (2)'
Expand All @@ -105,27 +105,44 @@ describe 'Error', ->
current = 0
each( [{id: 1}, {id: 2}, {id: 3}, {id: 4}] )
.parallel( true )
.on 'item', (n, element, index) ->
.on 'item', (next, element, index) ->
index.should.eql current
current++
if element.id is 1 or element.id is 3
n( new Error "Testing error in #{element.id}" )
else setTimeout n, 100
next( new Error "Testing error in #{element.id}" )
else setTimeout next, 100
.on 'both', (err, errs) ->
err.message.should.eql 'Multiple errors (2)'
errs.length.should.eql 2
errs[0].message.should.eql 'Testing error in 1'
errs[1].message.should.eql 'Testing error in 3'
return next()
it 'Sequential # error callback', (next) ->
# In this specific case, since the item handler
# send error sequentially, we are only receiving
# one error
err.message.should.eql 'Testing error in 1'
errs.length.should.eql 1
next()
it 'Sequential # sync # error callback', (next) ->
current = 0
each( [ {id: 1}, {id: 2}, {id: 3} ] )
.on 'item', (next, element, index) ->
index.should.eql current
current++
if element.id is 2
next( new Error 'Testing error' )
else next()
.on 'error', (err) ->
err.message.should.eql 'Testing error'
next()
.on 'end', (err, errs) ->
false.should.be.ok
it 'Sequential # async # error callback', (next) ->
current = 0
each( [ {id: 1}, {id: 2}, {id: 3} ] )
.on 'item', (n, element, index) ->
.on 'item', (next, element, index) ->
index.should.eql current
current++
if element.id is 2
n( new Error 'Testing error' )
else setTimeout n, 100
setTimeout ->
next( new Error 'Testing error' )
, 100
else setTimeout next, 100
.on 'error', (err) ->
err.message.should.eql 'Testing error'
next()
Expand Down
2 changes: 1 addition & 1 deletion test/readable_stream.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe 'Readable Stream', ->
it 'multiple pause # next before resume', (next) ->
eacher = each( [ {id: 1}, {id: 2}, {id: 3}, {id: 4}, {id: 5}, {id: 6}, {id: 7}, {id: 8}, {id: 9} ] )
.parallel( 4 )
.on 'item', (n, element, index) ->
.on 'item', (next, element, index) ->
if element.id % 2 is 0
eacher.pause()
setTimeout ->
Expand Down

0 comments on commit 67d8ca8

Please sign in to comment.