Skip to content

Commit

Permalink
Support custom runner
Browse files Browse the repository at this point in the history
  • Loading branch information
zoubin committed Dec 9, 2015
1 parent 97e5ee9 commit 800e497
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 46 deletions.
89 changes: 86 additions & 3 deletions README.md
Expand Up @@ -7,8 +7,6 @@

Run async or sync callbacks, such as [gulp tasks](https://github.com/gulpjs/gulp/blob/master/docs/API.md#fn).

The main ideas are borrowed from [orchestrator](https://github.com/orchestrator/orchestrator/blob/master/lib/runTask.js).

## Usage

```javascript
Expand Down Expand Up @@ -92,7 +90,7 @@ run(function (a, b, next) {

```

### run.thunkify(fn, context)
### run.thunkify(fn)

Return a new function to run `fn` later with a list of arguments.

Expand All @@ -110,3 +108,88 @@ task(2, 1).then(function (res) {

```

### Runner = run.Runner

`var runner = Runner(opts)`

Create a custom `Runner` instance to run callbacks.

#### opts

By default, callbacks returning a stream is thought to be in progress before the stream ends.
However, if `opts.stream` is `false`,
callbacks returning a stream wil be treated as synchronous.

If `opts.promise` is `false`,
callbacks returning a promise will be treated as synchronous.

If `opts.async` is `false`,
callbacks can only be made asynchronous
by returning a promise (when `opts.promise` is `true`),
or returning a stream (when `opts.stream` is `true`).

```javascript
var Stream = require('stream')
var Readable = Stream.Readable
var Writable = Stream.Writable

;(function handleStream() {
var runner = require('..').Runner({ stream: true })

var outputs = []
var rs = createReadable([1, 2])

return runner.thunkify(function () {
setTimeout(function() {
var ws = createWritable(outputs)
rs.pipe(ws)
}, 0)
return rs
})().then(function (res) {
// []
console.log(res)
})
})()
.then(function doNotHandleStream() {
var runner = require('..').Runner({ stream: false })

var outputs = []
var rs = createReadable([1, 2])

runner.thunkify(function () {
setTimeout(function() {
var ws = createWritable(outputs)
rs.pipe(ws)
}, 0)
return rs
})().then(function (res) {
// true
console.log(res[0] === rs)
})
})

function createReadable(input) {
var stream = Readable({ objectMode: true })
var i = 0
stream._read = function () {
if (i < input.length) {
this.push(input[i++])
} else {
this.push(null)
}
}
return stream
}

function createWritable(output) {
var stream = Writable({ objectMode: true })
var i = 0
stream._write = function (data, _, next) {
output.push(data)
next()
}
return stream
}

```

61 changes: 61 additions & 0 deletions example/runner.js
@@ -0,0 +1,61 @@
var Stream = require('stream')
var Readable = Stream.Readable
var Writable = Stream.Writable

;(function handleStream() {
var runner = require('..').Runner({ stream: true })

var outputs = []
var rs = createReadable([1, 2])

return runner.thunkify(function () {
setTimeout(function() {
var ws = createWritable(outputs)
rs.pipe(ws)
}, 0)
return rs
})().then(function (res) {
// []
console.log(res)
})
})()
.then(function doNotHandleStream() {
var runner = require('..').Runner({ stream: false })

var outputs = []
var rs = createReadable([1, 2])

runner.thunkify(function () {
setTimeout(function() {
var ws = createWritable(outputs)
rs.pipe(ws)
}, 0)
return rs
})().then(function (res) {
// true
console.log(res[0] === rs)
})
})

function createReadable(input) {
var stream = Readable({ objectMode: true })
var i = 0
stream._read = function () {
if (i < input.length) {
this.push(input[i++])
} else {
this.push(null)
}
}
return stream
}

function createWritable(output) {
var stream = Writable({ objectMode: true })
var i = 0
stream._write = function (data, _, next) {
output.push(data)
next()
}
return stream
}
103 changes: 63 additions & 40 deletions index.js
@@ -1,59 +1,78 @@
var eos = require('end-of-stream')
var consume = require('stream-consume')

exports = module.exports = function (cb) {
return thunkify(cb).apply(null, slice(arguments, 1))
function Runner(opts) {
if (!(this instanceof Runner)) {
return new Runner(opts)
}
opts = opts || {}
// should handle functions returning a stream
this.stream = opts.stream !== false
// should handle functions returning a promise
this.promise = opts.promise !== false
// should handle asynchronous functions accepting a callback
this.async = opts.async !== false
}

exports.thunkify = thunkify

function thunkify(fn, ctx) {
if (typeof fn === 'string') {
fn = ctx[fn]
}
Runner.prototype.thunkify = function(fn) {
var self = this

return function () {
var ctx = this
var args = slice(arguments)

// For async callbacks,
// For async functions,
// the number of arguments provided should be at least one less than declared.
var maybeAsync = fn.length > args.length
var async = self.async && fn.length > args.length

return new Promise(function (resolve, reject) {
function done(err) {
if (err) {
return reject(err)
}
resolve(normalizeResult(slice(arguments, 1)))
if (err) return reject(err)
resolve(self.normalize(slice(arguments, 1)))
}

var r = fn.apply(ctx, maybeAsync ? args.concat(done) : args)

if (isPromise(r)) {
r.then(done.bind(null, null), done)
return
if (async) {
args = args.concat(done)
}

if (isStream(r)) {
eos(r, {
error: true,
readable: r.readable,
writable: r.writable && !r.readable,
}, function (err) {
done(err)
})
consume(r)
return
var r = fn.apply(ctx, args)

if (self.promise && isPromise(r)) {
return r.then(done.bind(null, null), done)
}

if (!maybeAsync) {
done(null, r)
if (self.stream && isStream(r)) {
return self.eos(r, done)
}

if (!async) done(null, r)
})
}
}

Runner.prototype.eos = function(stream, done) {
eos(stream, {
error: true,
readable: stream.readable,
writable: stream.writable && !stream.readable,
}, function (err) {
done(err)
})
consume(stream)
}

Runner.prototype.normalize = function(res) {
var top
while (res.length) {
top = res.pop()
if (typeof top !== 'undefined') {
res.push(top)
break
}
}
return res
}

function isPromise(o) {
return o && typeof o.then === 'function'
}
Expand All @@ -66,15 +85,19 @@ function slice(o, from, to) {
return Array.prototype.slice.call(o, from, to)
}

function normalizeResult(res) {
var top
while (res.length) {
top = res.pop()
if (typeof top !== 'undefined') {
res.push(top)
break
}
var runner = new Runner()
var thunkify = runner.thunkify.bind(runner)

exports = module.exports = function (ctx, cb) {
if (typeof ctx === 'function') {
return thunkify(ctx).apply(null, slice(arguments, 1))
}
return res
if (typeof cb === 'string') {
cb = ctx[cb]
}
return thunkify(cb).apply(ctx, slice(arguments, 2))
}

exports.thunkify = thunkify
exports.Runner = Runner

6 changes: 3 additions & 3 deletions test/index.js
Expand Up @@ -36,11 +36,11 @@ test('context', function(t) {
},
}

thunkify(function () {
run(ctx, function () {
t.equal(this, ctx)
}, ctx)()
})

thunkify('fn', ctx)()
run(ctx, 'fn')
.then(function (res) {
t.equal(res[0], ctx)
})
Expand Down
48 changes: 48 additions & 0 deletions test/runner.js
@@ -0,0 +1,48 @@
var test = require('tap').test
var Runner = require('..').Runner
var Stream = require('./lib/stream')

test('stream', function(t) {
t.plan(2)

var runner = Runner({ stream: false })
var outputs = []
var rs = Stream.Readable([1, 2])
runner.thunkify(function () {
setTimeout(function() {
var ws = Stream.Writable(outputs)
rs.pipe(ws)
}, 0)
return rs
})()
.then(function (res) {
t.same(outputs, [])
t.equal(res[0], rs)
})
})

test('promise', function(t) {
t.plan(1)

var runner = Runner({ promise: false })
var p = new Promise(function (resolve) {
setTimeout(resolve, 0)
})
runner.thunkify(function () {
return p
})()
.then(function (res) {
t.equal(res[0], p)
})
})

test('async', function(t) {
t.plan(2)

var runner = Runner({ async: false })
runner.thunkify(function (cb) {
t.notOk(cb)
t.equal(arguments.length, 0)
})()
})

0 comments on commit 800e497

Please sign in to comment.