diff --git a/README.md b/README.md index 3174ea3..7a655db 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,33 @@ ms.ls() `callback` is a function of type `function (err, protocols)` where `err` is an error object that gets passed if something wrong happend and `protocols` is an array of the supported protocols in the other end. +### This module uses `pull-streams` + +We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). + +You can learn more about pull-streams at: + +- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) +- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) +- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) +- [pull-streams documentation](https://pull-stream.github.io/) + +#### Converting `pull-streams` to Node.js Streams + +If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example: + +```js +const pullToStream = require('pull-stream-to-stream') + +const nodeStreamInstance = pullToStream(pullStreamInstance) +// nodeStreamInstance is an instance of a Node.js Stream +``` + +To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream. + + + + ## Maintainers Captain: [@diasdavid](https://github.com/diasdavid). @@ -161,4 +188,3 @@ Small note: If editing the Readme, please conform to the [standard-readme](https ## License [MIT](LICENSE) © David Dias - diff --git a/package.json b/package.json index 5c4f54d..9e9ca5b 100644 --- a/package.json +++ b/package.json @@ -41,19 +41,23 @@ "author": "David Dias ", "license": "MIT", "dependencies": { - "babel-runtime": "^6.6.1", - "length-prefixed-stream": "^1.5.0", - "lodash.range": "^3.1.5", - "run-series": "^1.1.4", - "varint": "^4.0.0" + "babel-runtime": "^6.11.6", + "debug": "^2.2.0", + "interface-connection": "^0.2.1", + "lodash.isfunction": "^3.0.8", + "lodash.range": "^3.1.7", + "pull-handshake": "^1.1.3", + "pull-length-prefixed": "^1.1.0", + "pull-stream": "^3.4.3", + "varint": "^4.0.1" }, "devDependencies": { - "aegir": "^3.1.0", - "run-parallel": "^1.1.6", - "bl": "^1.1.2", + "aegir": "^8.0.0", "chai": "^3.5.0", - "pre-commit": "^1.1.2", - "stream-pair": "^1.0.3" + "pre-commit": "^1.1.3", + "pull-pair": "^1.1.0", + "run-parallel": "^1.1.6", + "run-series": "^1.1.4" }, "contributors": [ "David Dias ", @@ -61,4 +65,4 @@ "Richard Littauer ", "dignifiedquire " ] -} \ No newline at end of file +} diff --git a/src/agreement.js b/src/agreement.js new file mode 100644 index 0000000..4a925d5 --- /dev/null +++ b/src/agreement.js @@ -0,0 +1,100 @@ +'use strict' + +const handshake = require('pull-handshake') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') +const Connection = require('interface-connection').Connection +const debug = require('debug') +const log = debug('multistream:agreement') +log.error = debug('multistream:agreement:error') + +exports.select = (multicodec, callback) => { + const stream = handshake({ + timeout: 60 * 1000 + }, callback) + + const shake = stream.handshake + + log('writing multicodec %s', multicodec) + writeEncoded(shake, new Buffer(multicodec + '\n'), callback) + + lp.decodeFromReader(shake, (err, data) => { + if (err) { + return callback(err) + } + const protocol = data.toString().slice(0, -1) + + if (protocol !== multicodec) { + return callback(new Error(`"${multicodec}" not supported`), shake.rest()) + } + + log('multicodec ack') + callback(null, shake.rest()) + }) + + return stream +} + +exports.handlerSelector = (rawConn, handlersMap) => { + const cb = (err) => { + // incoming errors are irrelevant for the app + log.error(err) + } + + const stream = handshake({ + timeout: 60 * 1000 + }, cb) + + const shake = stream.handshake + + next() + + function next () { + lp.decodeFromReader(shake, (err, data) => { + if (err) { + return cb(err) + } + log('received: %s', data.toString()) + const protocol = data.toString().slice(0, -1) + const result = Object.keys(handlersMap).filter((id) => id === protocol) + const key = result && result[0] + + if (key) { + log('ack: %s', protocol) + writeEncoded(shake, data, cb) + handlersMap[key](new Connection(shake.rest(), rawConn)) + } else { + log('received multicodec of not supported protocol: %s', protocol) + writeEncoded(shake, new Buffer('na\n')) + next() + } + }) + } + + return stream +} + +// prefixes a message with a varint +function encode (msg, cb) { + const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)] + + pull( + pull.values(values), + lp.encode(), + pull.collect((err, encoded) => { + if (err) { + return cb(err) + } + cb(null, encoded[0]) + }) + ) +} + +function writeEncoded (writer, msg, cb) { + encode(msg, (err, msg) => { + if (err) { + return cb(err) + } + writer.write(msg) + }) +} diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..fc0d0a2 --- /dev/null +++ b/src/constants.js @@ -0,0 +1,5 @@ +'use strict' + +module.exports = { + PROTOCOL_ID: '/multistream/1.0.0' +} diff --git a/src/dialer.js b/src/dialer.js index 1a0b272..a94249d 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -1,78 +1,100 @@ 'use strict' -const lps = require('length-prefixed-stream') -const PROTOCOL_ID = require('./protocol-id') +const lp = require('pull-length-prefixed') const varint = require('varint') -const range = require('lodash.range') -const series = require('run-series') +const pull = require('pull-stream') +const Connection = require('interface-connection').Connection +const debug = require('debug') +const log = debug('multistream:dialer') -exports = module.exports = Dialer +const PROTOCOL_ID = require('./constants').PROTOCOL_ID +const agrmt = require('./agreement') -function Dialer () { - if (!(this instanceof Dialer)) { - return new Dialer() +module.exports = class Dialer { + constructor () { + this.conn = null } - const encode = lps.encode() - const decode = lps.decode() - let conn - // perform the multistream handshake - this.handle = (_conn, callback) => { - encode.pipe(_conn) - _conn.pipe(decode) - - decode.once('data', (buffer) => { - const msg = buffer.toString().slice(0, -1) - if (msg === PROTOCOL_ID) { - encode.write(new Buffer(PROTOCOL_ID + '\n')) - conn = _conn - callback() - } else { - callback(new Error('Incompatible multistream')) + handle (rawConn, callback) { + log('handling connection') + const ms = agrmt.select(PROTOCOL_ID, (err, conn) => { + if (err) { + return callback(err) } + log('handshake success') + + this.conn = new Connection(conn, rawConn) + + callback() }) + pull(rawConn, ms, rawConn) } - this.select = (protocol, callback) => { - if (!conn) { + select (protocol, callback) { + log('dialer select %s', protocol) + if (!this.conn) { return callback(new Error('multistream handshake has not finalized yet')) } - encode.write(new Buffer(protocol + '\n')) - decode.once('data', function (msgBuffer) { - const msg = msgBuffer.toString().slice(0, -1) - if (msg === protocol) { - return callback(null, conn) - } - if (msg === 'na') { - return callback(new Error(protocol + ' not supported')) + const selectStream = agrmt.select(protocol, (err, conn) => { + if (err) { + this.conn = new Connection(conn, this.conn) + return callback(err) } + callback(null, new Connection(conn, this.conn)) }) + + pull( + this.conn, + selectStream, + this.conn + ) } - this.ls = (callback) => { - encode.write(new Buffer('ls' + '\n')) - let protos = [] - decode.once('data', function (msgBuffer) { - const size = varint.decode(msgBuffer) // eslint-disable-line - const nProtos = varint.decode(msgBuffer, varint.decode.bytes) - - timesSeries(nProtos, (n, next) => { - decode.once('data', function (msgBuffer) { - protos.push(msgBuffer.toString().slice(0, -1)) - next() + ls (callback) { + const lsStream = agrmt.select('ls', (err, conn) => { + if (err) { + return callback(err) + } + + pull( + conn, + lp.decode(), + collectLs(conn), + pull.map(stringify), + pull.collect((err, list) => { + if (err) { + return callback(err) + } + callback(null, list.slice(1)) }) - }, (err) => { - if (err) { - return callback(err) - } - callback(null, protos) - }) + ) }) + + pull( + this.conn, + lsStream, + this.conn + ) } } -function timesSeries (i, work, callback) { - series(range(i).map((i) => (callback) => work(i, callback)), callback) +function stringify (buf) { + return buf.toString().slice(0, -1) +} + +function collectLs (conn) { + let first = true + let counter = 0 + + return pull.take((msg) => { + if (first) { + varint.decode(msg) + counter = varint.decode(msg, varint.decode.bytes) + return true + } + + return counter-- > 0 + }) } diff --git a/src/listener.js b/src/listener.js index ead8ee5..f2956f8 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,83 +1,93 @@ 'use strict' -const lps = require('length-prefixed-stream') -const PROTOCOL_ID = require('./protocol-id') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') const varint = require('varint') +const isFunction = require('lodash.isfunction') +const assert = require('assert') +const debug = require('debug') +const log = debug('libp2p:multistream:listener') +const Connection = require('interface-connection').Connection -exports = module.exports = Listener +const PROTOCOL_ID = require('./constants').PROTOCOL_ID +const agrmt = require('./agreement') -function Listener () { - if (!(this instanceof Listener)) { - return new Listener() +module.exports = class Listener { + constructor () { + this.handlers = { + ls: (conn) => this._ls(conn) + } } - const handlers = {} - const encode = lps.encode() - const decode = lps.decode() - let conn - // perform the multistream handshake - this.handle = (_conn, callback) => { - encode.pipe(_conn) - _conn.pipe(decode) - - encode.write(new Buffer(PROTOCOL_ID + '\n')) - - decode.once('data', (buffer) => { - const msg = buffer.toString().slice(0, -1) - if (msg === PROTOCOL_ID) { - conn = _conn - decode.once('data', incMsg) - callback() - } else { - // TODO This would be where we try to support other versions - // of multistream (backwards compatible). Currently we have - // just one, so this never happens. - return callback(new Error('not supported version of multistream')) - } - }) + handle (rawConn, callback) { + log('handling connection') - function incMsg (msgBuffer) { - const msg = msgBuffer.toString().slice(0, -1) - - if (msg === 'ls') { - const protos = Object.keys(handlers) - const nProtos = protos.length - // total size of the list of protocols, including varint and newline - const size = protos.reduce((size, proto) => { - var p = new Buffer(proto + '\n') - var el = varint.encodingLength(p.length) - return size + el - }, 0) - - var nProtoVI = new Buffer(varint.encode(nProtos)) - var sizeVI = new Buffer(varint.encode(size)) - var buf = Buffer.concat([nProtoVI, sizeVI, new Buffer('\n')]) - encode.write(buf) - protos.forEach((proto) => { - encode.write(new Buffer(proto + '\n')) - }) + const selectStream = agrmt.select(PROTOCOL_ID, (err, conn) => { + if (err) { + return callback(err) } - if (handlers[msg]) { - // Protocol supported, ACK back - encode.write(new Buffer(msg + '\n')) - return handlers[msg](conn) - } else { - // Protocol not supported, wait for new handshake - encode.write(new Buffer('na' + '\n')) - } + const hsConn = new Connection(conn, rawConn) - decode.once('data', incMsg) - } + const handlerSelector = agrmt.handlerSelector(hsConn, this.handlers) + + pull( + hsConn, + handlerSelector, + hsConn + ) + + callback() + }) + + pull( + rawConn, + selectStream, + rawConn + ) } // be ready for a given `protocol` - this.addHandler = (protocol, handlerFunc) => { - if ((typeof handlerFunc !== 'function')) { - throw new Error('handler function must be a function') + addHandler (protocol, handler) { + log('handling %s', protocol) + + assert(isFunction(handler), 'handler must be a function') + + if (this.handlers[protocol]) { + log('overwriting handler for %s', protocol) } - handlers[protocol] = handlerFunc + this.handlers[protocol] = handler + } + + // inner function - handler for `ls` + _ls (conn) { + const protos = Object.keys(this.handlers) + .filter((key) => key !== 'ls') + const nProtos = protos.length + // total size of the list of protocols, including varint and newline + const size = protos.reduce((size, proto) => { + const p = new Buffer(proto + '\n') + const el = varint.encodingLength(p.length) + return size + el + }, 0) + + const buf = Buffer.concat([ + new Buffer(varint.encode(nProtos)), + new Buffer(varint.encode(size)), + new Buffer('\n') + ]) + + const encodedProtos = protos.map((proto) => { + return new Buffer(proto + '\n') + }) + const values = [buf].concat(encodedProtos) + + pull( + pull.values(values), + lp.encode(), + conn + ) } } diff --git a/src/protocol-id.js b/src/protocol-id.js deleted file mode 100644 index 9bd6c32..0000000 --- a/src/protocol-id.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict' - -module.exports = '/multistream/1.0.0' diff --git a/test/multistream.spec.js b/test/index.spec.js similarity index 60% rename from test/multistream.spec.js rename to test/index.spec.js index 41c3cf3..7fd456a 100644 --- a/test/multistream.spec.js +++ b/test/index.spec.js @@ -3,17 +3,59 @@ 'use strict' const expect = require('chai').expect -const streamPair = require('stream-pair') +const pull = require('pull-stream') +const lp = require('pull-length-prefixed') +const pair = require('pull-pair/duplex') const multistream = require('../src') const parallel = require('run-parallel') const series = require('run-series') -const bl = require('bl') -describe('multistream normal mode', function () { - it('performs multistream handshake', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other +describe('multistream dialer', () => { + it('sends the multistream multicodec', (done) => { + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] + + pull( + listenerConn, + lp.decode(), + pull.drain((data) => { + expect(data.toString()).to.equal('/multistream/1.0.0\n') + done() + }) + ) + + const msd = new multistream.Dialer() + expect(msd).to.exist + msd.handle(dialerConn, () => {}) + }) +}) +describe('multistream listener', () => { + it('sends the multistream multicodec', (done) => { + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] + + pull( + dialerConn, + lp.decode(), + pull.drain((data) => { + expect(data.toString()).to.equal('/multistream/1.0.0\n') + done() + }) + ) + + const msl = new multistream.Listener() + expect(msl).to.exist + msl.handle(listenerConn, () => {}) + }) +}) + +describe('multistream handshake', () => { + it('performs the handshake handshake', (done) => { + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] parallel([ (cb) => { @@ -30,9 +72,9 @@ describe('multistream normal mode', function () { }) it('handle and select a protocol', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -53,30 +95,37 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msd.select('/monkey/1.0.0', (err, conn) => { expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - next() - })) - conn.write('banana') - conn.end() + + pull( + pull.values(['banana']), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist + expect( + data + ).to.be.eql( + ['banana'] + ) + next() + }) + ) }) } ], done) }) - it('handle and select a protocol, respecting pause and resume ', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other - let handled = false + it('select non existing proto', (done) => { + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] + let msl let msd series([ @@ -95,40 +144,18 @@ describe('multistream normal mode', function () { ], next) }, (next) => { - dialerConn.cork() - listenerConn.pause() - - msl.addHandler('/monkey/1.0.0', (conn) => { - handled = true - conn.pipe(conn) - }) - next() - }, - (next) => { - msd.select('/monkey/1.0.0', (err, conn) => { - expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - expect(handled).to.be.eql(true) - next() - })) - conn.write('banana') - conn.end() + msd.select('/panda/1.0.0', (err) => { + expect(err).to.exist + next() }) - setTimeout(() => { - expect(handled).to.be.eql(false) - dialerConn.uncork() - listenerConn.resume() - }, 100) } ], done) }) - it('select non existing proto', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + it('select a non existing proto and then select an existing proto', (done) => { + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -148,18 +175,42 @@ describe('multistream normal mode', function () { ], next) }, (next) => { - msd.select('/panda/1.0.0', (err) => { + msl.addHandler('/monkey/1.0.0', (conn) => { + pull(conn, conn) + }) + next() + }, + (next) => { + msd.select('/sadpanda/1.0.0', (err) => { expect(err).to.exist next() }) + }, + (next) => { + msd.select('/monkey/1.0.0', (err, conn) => { + expect(err).to.not.exist + pull( + pull.values(['banana']), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist + expect( + data + ).to.be.eql( + ['banana'] + ) + next() + }) + ) + }) } ], done) }) it('ls', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -180,19 +231,19 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msl.addHandler('/giraffe/2.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msl.addHandler('/elephant/2.5.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, @@ -211,9 +262,9 @@ describe('multistream normal mode', function () { }) it('handler must be a function', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -233,20 +284,20 @@ describe('multistream normal mode', function () { ], next) }, (next) => { - try { - msd.addHandler('/monkey/1.0.0', 'potato') - } catch (err) { - expect(err).to.exist - next() - } + expect( + () => msl.addHandler('/monkey/1.0.0', 'potato') + ).to.throw( + /must be a function/ + ) + next() } ], done) }) it('racing condition resistent', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -262,7 +313,7 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() } @@ -274,13 +325,20 @@ describe('multistream normal mode', function () { expect(err).to.not.exist msd.select('/monkey/1.0.0', (err, conn) => { expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - cb() - })) - conn.write('banana') - conn.end() + + pull( + pull.values(['banana']), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist + expect( + data + ).to.be.eql( + ['banana'] + ) + cb() + }) + ) }) }) }