diff --git a/.travis.yml b/.travis.yml index e1d6320..a31f82e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: node_js node_js: - 4 - 5 + - 6 # Make sure we have new NPM. before_install: diff --git a/package.json b/package.json index 0c03eb4..c1348c6 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,10 @@ "coverage": "aegir-coverage", "coverage-publish": "aegir-coverage publish" }, + "repository": { + "type": "git", + "url": "git+https://github.com/diasdavid/js-multistream.git" + }, "pre-commit": [ "lint", "test" @@ -38,7 +42,7 @@ "license": "MIT", "dependencies": { "babel-runtime": "^6.6.1", - "length-prefixed-message": "^3.0.3", + "length-prefixed-stream": "^1.5.0", "lodash.range": "^3.1.5", "run-series": "^1.1.4", "varint": "^4.0.0" @@ -57,4 +61,4 @@ "Richard Littauer ", "dignifiedquire " ] -} \ No newline at end of file +} diff --git a/src/dialer.js b/src/dialer.js index b0426ac..1a0b272 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -1,10 +1,10 @@ 'use strict' -const lpm = require('length-prefixed-message') +const lps = require('length-prefixed-stream') const PROTOCOL_ID = require('./protocol-id') const varint = require('varint') const range = require('lodash.range') -const parallel = require('run-parallel') +const series = require('run-series') exports = module.exports = Dialer @@ -12,14 +12,20 @@ function Dialer () { if (!(this instanceof Dialer)) { return new Dialer() } + + const encode = lps.encode() + const decode = lps.decode() let conn // perform the multistream handshake this.handle = (_conn, callback) => { - lpm.read(_conn, (buffer) => { + encode.pipe(_conn) + _conn.pipe(decode) + + decode.once('data', (buffer) => { const msg = buffer.toString().slice(0, -1) if (msg === PROTOCOL_ID) { - lpm.write(_conn, new Buffer(PROTOCOL_ID + '\n')) + encode.write(new Buffer(PROTOCOL_ID + '\n')) conn = _conn callback() } else { @@ -33,8 +39,8 @@ function Dialer () { return callback(new Error('multistream handshake has not finalized yet')) } - lpm.write(conn, new Buffer(protocol + '\n')) - lpm.read(conn, function (msgBuffer) { + encode.write(new Buffer(protocol + '\n')) + decode.once('data', function (msgBuffer) { const msg = msgBuffer.toString().slice(0, -1) if (msg === protocol) { return callback(null, conn) @@ -46,14 +52,14 @@ function Dialer () { } this.ls = (callback) => { - lpm.write(conn, new Buffer('ls' + '\n')) + encode.write(new Buffer('ls' + '\n')) let protos = [] - lpm.read(conn, function (msgBuffer) { + decode.once('data', function (msgBuffer) { const size = varint.decode(msgBuffer) // eslint-disable-line const nProtos = varint.decode(msgBuffer, varint.decode.bytes) - times(nProtos, (n, next) => { - lpm.read(conn, function (msgBuffer) { + timesSeries(nProtos, (n, next) => { + decode.once('data', function (msgBuffer) { protos.push(msgBuffer.toString().slice(0, -1)) next() }) @@ -67,6 +73,6 @@ function Dialer () { } } -function times (i, work, callback) { - parallel(range(i).map((i) => (callback) => work(i, callback)), callback) +function timesSeries (i, work, callback) { + series(range(i).map((i) => (callback) => work(i, callback)), callback) } diff --git a/src/listener.js b/src/listener.js index fcec23f..ead8ee5 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,6 +1,6 @@ 'use strict' -const lpm = require('length-prefixed-message') +const lps = require('length-prefixed-stream') const PROTOCOL_ID = require('./protocol-id') const varint = require('varint') @@ -12,28 +12,30 @@ function Listener () { } const handlers = {} + const encode = lps.encode() + const decode = lps.decode() let conn // perform the multistream handshake this.handle = (_conn, callback) => { - lpm.write(_conn, new Buffer(PROTOCOL_ID + '\n')) - listenerMultistreamHandshakeCheck() + encode.pipe(_conn) + _conn.pipe(decode) - function listenerMultistreamHandshakeCheck () { - lpm.read(_conn, (buffer) => { - const msg = buffer.toString().slice(0, -1) - if (msg === PROTOCOL_ID) { - conn = _conn - lpm.read(conn, incMsg) - callback() - } else { - // TODO This would be where we try to support other versions - // of multistream (backwards compatible). Currently we have - // just one, so this never happens. - return callback(new Error('not supported version of multistream')) - } - }) - } + encode.write(new Buffer(PROTOCOL_ID + '\n')) + + decode.once('data', (buffer) => { + const msg = buffer.toString().slice(0, -1) + if (msg === PROTOCOL_ID) { + conn = _conn + decode.once('data', incMsg) + callback() + } else { + // TODO This would be where we try to support other versions + // of multistream (backwards compatible). Currently we have + // just one, so this never happens. + return callback(new Error('not supported version of multistream')) + } + }) function incMsg (msgBuffer) { const msg = msgBuffer.toString().slice(0, -1) @@ -51,23 +53,22 @@ function Listener () { var nProtoVI = new Buffer(varint.encode(nProtos)) var sizeVI = new Buffer(varint.encode(size)) var buf = Buffer.concat([nProtoVI, sizeVI, new Buffer('\n')]) - lpm.write(conn, buf) + encode.write(buf) protos.forEach((proto) => { - lpm.write(conn, new Buffer(proto + '\n')) + encode.write(new Buffer(proto + '\n')) }) } if (handlers[msg]) { // Protocol supported, ACK back - lpm.write(conn, new Buffer(msg + '\n')) + encode.write(new Buffer(msg + '\n')) return handlers[msg](conn) } else { // Protocol not supported, wait for new handshake - lpm.write(conn, new Buffer('na' + '\n')) + encode.write(new Buffer('na' + '\n')) } - // continue listening - lpm.read(conn, incMsg) + decode.once('data', incMsg) } } diff --git a/test/multistream.spec.js b/test/multistream.spec.js index ed2186a..41c3cf3 100644 --- a/test/multistream.spec.js +++ b/test/multistream.spec.js @@ -72,6 +72,59 @@ describe('multistream normal mode', function () { ], done) }) + it('handle and select a protocol, respecting pause and resume ', (done) => { + const sp = streamPair.create() + const dialerConn = sp + const listenerConn = sp.other + let handled = false + let msl + let msd + series([ + (next) => { + parallel([ + (cb) => { + msl = new multistream.Listener() + expect(msl).to.exist + msl.handle(listenerConn, cb) + }, + (cb) => { + msd = new multistream.Dialer() + expect(msd).to.exist + msd.handle(dialerConn, cb) + } + ], next) + }, + (next) => { + dialerConn.cork() + listenerConn.pause() + + msl.addHandler('/monkey/1.0.0', (conn) => { + handled = true + conn.pipe(conn) + }) + next() + }, + (next) => { + msd.select('/monkey/1.0.0', (err, conn) => { + expect(err).to.not.exist + conn.pipe(bl((err, data) => { + expect(err).to.not.exist + expect(data.toString()).to.equal('banana') + expect(handled).to.be.eql(true) + next() + })) + conn.write('banana') + conn.end() + }) + setTimeout(() => { + expect(handled).to.be.eql(false) + dialerConn.uncork() + listenerConn.resume() + }, 100) + } + ], done) + }) + it('select non existing proto', (done) => { const sp = streamPair.create() const dialerConn = sp