Skip to content

Commit

Permalink
rewrite for simple streams
Browse files Browse the repository at this point in the history
  • Loading branch information
mirkokiefer committed Sep 9, 2013
1 parent ad1de4c commit 9916e87
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 142 deletions.
187 changes: 95 additions & 92 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,85 +1,84 @@

var EventEmitter = require('events').EventEmitter

var iterators = {}
var stream = {}

iterators.forEach = function(iterator, fn, cb) {
iterator.next(function(err, res) {
if (res === undefined) return cb(err, undefined)
fn(err, res)
iterators.forEach(iterator, fn, cb)
})
stream.forEach = function(inputStream, fn) {
return continuable
function continuable(cb) {
inputStream.read(function(err, res) {
if (res === undefined) return cb(err, undefined)
fn(res)
continuable(cb)
})
}
}

iterators.forEachAsync = function(iterator, fn, cb) {
iterator.next(function(err, res) {
if (res === undefined) return cb(err, undefined)
fn(err, res, function() {
iterators.forEachAsync(iterator, fn, cb)
stream.forEachAsync = function(inputStream, fn) {
return continuable
function continuable(cb) {
inputStream.read(function(err, res) {
if (res === undefined) return cb(err, undefined)
fn(res, function(err) {
if (err) return cb(err)
continuable(cb)
})
})
})
}
}

iterators.map = function(iterator, fn) {
return {
next: function(cb) {
iterator.next(function(err, res) {
if ((res === undefined) || err) return cb(err, undefined)
var mappedRes = fn(err, res)
cb(err, mappedRes)
})
}
stream.map = function(inputStream, fn) {
return {read: read}
function read(cb) {
inputStream.read(function(err, res) {
if (res === undefined) return cb(err, undefined)
cb(null, fn(res))
})
}
}

iterators.mapAsync = function(iterator, fn) {
return {
next: function(cb) {
iterator.next(function(err, res) {
if ((res === undefined) || err) return cb(err, undefined)
fn(err, res, function(err, mappedRes) {
cb(err, mappedRes)
})
})
}
stream.mapAsync = function(inputStream, fn) {
return {read: read}
function read(cb) {
inputStream.read(function(err, res) {
if (res === undefined) return cb(err, undefined)
fn(res, cb)
})
}
}

iterators.filter = function(iterator, fn) {
var next = function(cb) {
iterator.next(function(err, res) {
stream.filter = function(inputStream, fn) {
return {read: read}
function read(cb) {
inputStream.read(function(err, res) {
if (res === undefined) return cb(err, undefined)
if (fn(err, res)) {
if (fn(res)) {
cb(null, res)
} else {
next(cb)
read(cb)
}
})
}
return {
next: next
}
}

iterators.filterAsync = function(iterator, fn) {
var next = function(cb) {
iterator.next(function(err, res) {
stream.filterAsync = function(inputStream, fn) {
return {read: read}
function read(cb) {
inputStream.read(function(err, res) {
if (res === undefined) return cb(err, undefined)
fn(err, res, function(err, passedFilter) {
if (passedFilter) {
fn(res, function(err, passedFilter) {
if (err) return cb(err)
if (passedFilter) {
cb(null, res)
} else {
next(cb)
read(cb)
}
})
})
}
return {
next: next
}
}

iterators.buffer = function(iterator, size) {
stream.buffer = function(inputStream, size) {
var buffer = []
var bufferingInProgress = false
var hasEnded = false
Expand All @@ -104,7 +103,7 @@ iterators.buffer = function(iterator, size) {
bufferingInProgress = false
return
}
iterator.next(function(err, res) {
inputStream.read(function(err, res) {
if (res === undefined) hasEnded = true
buffer.push(res)
bufferEvents.emit('data')
Expand All @@ -114,7 +113,7 @@ iterators.buffer = function(iterator, size) {

var publicObj = {
bufferFillRatio: function() { return buffer.length / size },
next: function(cb) {
read: function(cb) {
readBuffer(function(err, res) {
cb(err, res)
})
Expand All @@ -123,19 +122,18 @@ iterators.buffer = function(iterator, size) {
return publicObj
}

iterators.fromArray = function(array, cb) {
stream.fromArray = function(array, cb) {
var i = 0
return {
next: function(cb) {
if (i == array.length) return cb(null, undefined)
var value = array[i]
i++
cb(null, value)
}
return {read: read}
function read(cb) {
if (i == array.length) return cb(null, undefined)
var value = array[i]
i++
cb(null, value)
}
}

iterators.fromReadableStream = function(readable) {
stream.fromReadableStream = function(readable) {
var isReadable = true
var hasEnded = false

Expand All @@ -146,66 +144,71 @@ iterators.fromReadableStream = function(readable) {
hasEnded = true
})

var next = function(cb) {
function read(cb) {
if (hasEnded) {
return cb(null, undefined)
}
if (isReadable) {
var res = readable.read()
if (res === null) {
isReadable = false
return next(cb)
return read(cb)
}
cb(null, res)
} else {
var onEnd = function() { next(cb) }
var onEnd = function() { read(cb) }
readable.once('readable', function() {
readable.removeListener('end', onEnd)
next(cb)
read(cb)
})
readable.once('end', onEnd)
}
}

return {next: next}
return {read: read}
}

iterators.toArray = function(iterator, cb) {
var array = []
iterators.forEach(iterator, function(err, each) {
array.push(each)
}, function() {
cb(null, array)
})
stream.toArray = function(inputStream) {
return function(cb) {
var array = []
stream.forEach(inputStream, function(each) {
array.push(each)
})(function(err) {
if (err) return cb(err)
cb(null, array)
})
}
}

iterators.range = function(iterator, opts) {
stream.range = function(inputStream, opts) {
var from = opts.from
var to = opts.to
var pos = -1
var next = function(cb) {
iterator.next(function(err, value) {
return {read: read}
function read(cb) {
inputStream.read(function(err, value) {
if (value === undefined || pos >= to) return cb(err, undefined)
pos++
if (pos < from) return next(cb)
if (pos > to) return cb(null, undefined)
cb(err, value)
if (pos < from) return read(cb)
cb(null, value)
})
}
return {next: next}
}

iterators.toWritableStream = function(iterator, writeStream, encoding, cb) {
write(cb);
function write(cb) {
iterator.next(function(err, res) {
if (res === undefined) return writeStream.write('', encoding, cb)
if (writeStream.write(res, encoding)) {
write(cb)
} else {
writeStream.once('drain', function() { write(cb) })
}
})
stream.toWritableStream = function(inputStream, writeStream, encoding) {
return function(cb) {
write(cb)
function write(cb) {
inputStream.read(function(err, res) {
if (res === undefined) return writeStream.write('', encoding, cb)
if (writeStream.write(res, encoding)) {
write(cb)
} else {
writeStream.once('drain', function() { write(cb) })
}
})
}
}
}

module.exports = iterators
module.exports = stream
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"homepage": "https://github.com/mirkokiefer/async-iterators",
"main": "index.js",
"scripts": {
"test": "node_modules/.bin/mocha -R spec -t 60000"
"test": "node_modules/.bin/mocha -R spec -t 5000"
},
"author": "Mirko Kiefer <mail@mirkokiefer.com>",
"license": "MIT",
Expand Down
Loading

0 comments on commit 9916e87

Please sign in to comment.