From b8de80d5c82552a3ffb8329285d2b466b6ebd7e6 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 8 Aug 2016 15:14:28 +0200 Subject: [PATCH 1/4] feat(pull): migration to pull-streams --- package.json | 28 ++-- src/agreement.js | 77 ++++++++++ src/dialer.js | 118 ++++++++------- src/listener.js | 128 ++++++++-------- test/{multistream.spec.js => index.spec.js} | 155 ++++++++------------ 5 files changed, 281 insertions(+), 225 deletions(-) create mode 100644 src/agreement.js rename test/{multistream.spec.js => index.spec.js} (61%) diff --git a/package.json b/package.json index 5c4f54d..66e4f8c 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "multistream-select", "version": "0.10.0", "description": "JavaScript implementation of the multistream spec", - "main": "lib/index.js", + "main": "src/index.js", "jsnext:main": "src/index.js", "scripts": { "lint": "aegir-lint", @@ -41,19 +41,23 @@ "author": "David Dias ", "license": "MIT", "dependencies": { - "babel-runtime": "^6.6.1", - "length-prefixed-stream": "^1.5.0", - "lodash.range": "^3.1.5", - "run-series": "^1.1.4", - "varint": "^4.0.0" + "babel-runtime": "^6.11.6", + "debug": "^2.2.0", + "interface-connection": "^0.1.8", + "lodash.isfunction": "^3.0.8", + "lodash.range": "^3.1.7", + "pull-handshake": "^1.1.3", + "pull-length-prefixed": "^1.0.0", + "pull-stream": "^3.4.3", + "varint": "^4.0.1" }, "devDependencies": { - "aegir": "^3.1.0", - "run-parallel": "^1.1.6", - "bl": "^1.1.2", + "aegir": "^6.0.0", "chai": "^3.5.0", - "pre-commit": "^1.1.2", - "stream-pair": "^1.0.3" + "pre-commit": "^1.1.3", + "pull-pair": "^1.1.0", + "run-parallel": "^1.1.6", + "run-series": "^1.1.4" }, "contributors": [ "David Dias ", @@ -61,4 +65,4 @@ "Richard Littauer ", "dignifiedquire " ] -} \ No newline at end of file +} diff --git a/src/agreement.js b/src/agreement.js new file mode 100644 index 0000000..6f6761c --- /dev/null +++ b/src/agreement.js @@ -0,0 +1,77 @@ +'use strict' + +const handshake = require('pull-handshake') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') +const Connection = require('interface-connection').Connection +const debug = require('debug') +const log = debug('libp2p:multistream:agreement') +log.error = debug('libp2p:multistream:agreement:error') + +exports.dial = (header, cb) => { + const stream = handshake({timeout: 60 * 1000}, cb) + const shake = stream.handshake + + log('writing header %s', header) + writeEncoded(shake, new Buffer(header + '\n'), cb) + + lp.decodeFromReader(shake, (err, data) => { + if (err) return cb(err) + const protocol = data.toString().slice(0, -1) + if (protocol !== header) { + cb(new Error(`Unkown header: "${protocol}"`)) + } + + log('header ack') + cb(null, shake.rest()) + }) + + return stream +} + +exports.listen = (rawConn, handlersMap, defaultHandler) => { + const cb = (err) => { + // TODO: pass errors somewhere + log.error(err) + } + const stream = handshake({timeout: 60 * 1000}, cb) + const shake = stream.handshake + + lp.decodeFromReader(shake, (err, data) => { + if (err) return cb(err) + log('received: %s', data.toString()) + const protocol = data.toString().slice(0, -1) + const [key] = Object.keys(handlersMap).filter((id) => id === protocol) + + if (key) { + log('ack: %s', protocol) + writeEncoded(shake, data, cb) + handlersMap[key](new Connection(shake.rest(), rawConn)) + } else { + log('unkown protocol: %s', protocol) + defaultHandler(protocol, shake.rest()) + } + }) + + return stream +} + +function encode (msg, cb) { + const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)] + + pull( + pull.values(values), + lp.encode(), + pull.collect((err, encoded) => { + if (err) return cb(err) + cb(null, encoded[0]) + }) + ) +} + +function writeEncoded (writer, msg, cb) { + encode(msg, (err, msg) => { + if (err) return cb(err) + writer.write(msg) + }) +} diff --git a/src/dialer.js b/src/dialer.js index 1a0b272..eecb70f 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -1,78 +1,88 @@ 'use strict' -const lps = require('length-prefixed-stream') -const PROTOCOL_ID = require('./protocol-id') +const lp = require('pull-length-prefixed') const varint = require('varint') -const range = require('lodash.range') -const series = require('run-series') +const pull = require('pull-stream') +const Connection = require('interface-connection').Connection +const debug = require('debug') +const log = debug('libp2p:multistream:dialer') -exports = module.exports = Dialer +const PROTOCOL_ID = require('./protocol-id') +const agreement = require('./agreement') -function Dialer () { - if (!(this instanceof Dialer)) { - return new Dialer() +module.exports = class Dialer { + constructor () { + this.conn = null } - const encode = lps.encode() - const decode = lps.decode() - let conn - // perform the multistream handshake - this.handle = (_conn, callback) => { - encode.pipe(_conn) - _conn.pipe(decode) - - decode.once('data', (buffer) => { - const msg = buffer.toString().slice(0, -1) - if (msg === PROTOCOL_ID) { - encode.write(new Buffer(PROTOCOL_ID + '\n')) - conn = _conn - callback() - } else { - callback(new Error('Incompatible multistream')) + handle (rawConn, cb) { + log('handling connection') + const ms = agreement.dial(PROTOCOL_ID, (err, conn) => { + if (err) { + return cb(err) } + log('handshake success') + + this.conn = new Connection(conn, rawConn) + + cb() }) + pull(rawConn, ms, rawConn) } - this.select = (protocol, callback) => { - if (!conn) { - return callback(new Error('multistream handshake has not finalized yet')) + select (protocol, cb) { + log('selecting %s', protocol) + if (!this.conn) { + return cb(new Error('multistream handshake has not finalized yet')) } - encode.write(new Buffer(protocol + '\n')) - decode.once('data', function (msgBuffer) { - const msg = msgBuffer.toString().slice(0, -1) - if (msg === protocol) { - return callback(null, conn) - } - if (msg === 'na') { - return callback(new Error(protocol + ' not supported')) + const stream = agreement.dial(protocol, (err, conn) => { + if (err) { + return cb(err) } + // TODO: handle 'na' + cb(null, new Connection(conn, this.conn)) }) + + pull(this.conn, stream, this.conn) } - this.ls = (callback) => { - encode.write(new Buffer('ls' + '\n')) - let protos = [] - decode.once('data', function (msgBuffer) { - const size = varint.decode(msgBuffer) // eslint-disable-line - const nProtos = varint.decode(msgBuffer, varint.decode.bytes) - - timesSeries(nProtos, (n, next) => { - decode.once('data', function (msgBuffer) { - protos.push(msgBuffer.toString().slice(0, -1)) - next() + ls (cb) { + const ls = agreement.dial('ls', (err, conn) => { + if (err) return cb(err) + + pull( + conn, + lp.decode(), + collectLs(conn), + pull.map(stringify), + pull.collect((err, list) => { + if (err) return cb(err) + return cb(null, list.slice(1)) }) - }, (err) => { - if (err) { - return callback(err) - } - callback(null, protos) - }) + ) }) + + pull(this.conn, ls, this.conn) } } -function timesSeries (i, work, callback) { - series(range(i).map((i) => (callback) => work(i, callback)), callback) +function stringify (buf) { + return buf.toString().slice(0, -1) +} + +function collectLs (conn) { + let first = true + let counter = 0 + + return pull.take((msg) => { + if (first) { + const size = varint.decode(msg) // eslint-disable-line + counter = varint.decode(msg, varint.decode.bytes) + return true + } + + return counter-- > 0 + }) } diff --git a/src/listener.js b/src/listener.js index ead8ee5..9bc8d62 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,83 +1,87 @@ 'use strict' -const lps = require('length-prefixed-stream') -const PROTOCOL_ID = require('./protocol-id') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') const varint = require('varint') +const isFunction = require('lodash.isfunction') +const assert = require('assert') +const debug = require('debug') +const log = debug('libp2p:multistream:listener') -exports = module.exports = Listener +const PROTOCOL_ID = require('./protocol-id') +const agreement = require('./agreement') -function Listener () { - if (!(this instanceof Listener)) { - return new Listener() +module.exports = class Listener { + constructor () { + this.handlers = { + ls: (conn) => this._ls(conn) + } } - const handlers = {} - const encode = lps.encode() - const decode = lps.decode() - let conn - // perform the multistream handshake - this.handle = (_conn, callback) => { - encode.pipe(_conn) - _conn.pipe(decode) - - encode.write(new Buffer(PROTOCOL_ID + '\n')) + handle (conn, cb) { + log('handling connection') + const ms = agreement.listen(conn, { + [PROTOCOL_ID]: (conn) => { + log('handshake success') + const msgHandler = agreement.listen(conn, this.handlers, (protocol, conn) => { + log('unkown protocol: %s', protocol) + pull( + pull.values([new Buffer('na')]), + conn + ) + }) + pull(conn, msgHandler, conn) - decode.once('data', (buffer) => { - const msg = buffer.toString().slice(0, -1) - if (msg === PROTOCOL_ID) { - conn = _conn - decode.once('data', incMsg) - callback() - } else { - // TODO This would be where we try to support other versions - // of multistream (backwards compatible). Currently we have - // just one, so this never happens. - return callback(new Error('not supported version of multistream')) + cb() } + }, () => { + cb(new Error('unkown protocol')) }) - function incMsg (msgBuffer) { - const msg = msgBuffer.toString().slice(0, -1) - - if (msg === 'ls') { - const protos = Object.keys(handlers) - const nProtos = protos.length - // total size of the list of protocols, including varint and newline - const size = protos.reduce((size, proto) => { - var p = new Buffer(proto + '\n') - var el = varint.encodingLength(p.length) - return size + el - }, 0) + pull(conn, ms, conn) + } - var nProtoVI = new Buffer(varint.encode(nProtos)) - var sizeVI = new Buffer(varint.encode(size)) - var buf = Buffer.concat([nProtoVI, sizeVI, new Buffer('\n')]) - encode.write(buf) - protos.forEach((proto) => { - encode.write(new Buffer(proto + '\n')) - }) - } + // be ready for a given `protocol` + addHandler (protocol, handler) { + log('handling %s', protocol) - if (handlers[msg]) { - // Protocol supported, ACK back - encode.write(new Buffer(msg + '\n')) - return handlers[msg](conn) - } else { - // Protocol not supported, wait for new handshake - encode.write(new Buffer('na' + '\n')) - } + assert(isFunction(handler), 'handler must be a function') - decode.once('data', incMsg) + if (this.handlers[protocol]) { + // TODO: Do we want to handle this better? + log('overwriting handler for %s', protocol) } + + this.handlers[protocol] = handler } - // be ready for a given `protocol` - this.addHandler = (protocol, handlerFunc) => { - if ((typeof handlerFunc !== 'function')) { - throw new Error('handler function must be a function') - } + _ls (conn) { + const protos = Object.keys(this.handlers) + .filter((key) => key !== 'ls') + const nProtos = protos.length + // total size of the list of protocols, including varint and newline + const size = protos.reduce((size, proto) => { + const p = new Buffer(proto + '\n') + const el = varint.encodingLength(p.length) + return size + el + }, 0) + + const buf = Buffer.concat([ + new Buffer(varint.encode(nProtos)), + new Buffer(varint.encode(size)), + new Buffer('\n') + ]) + + const encodedProtos = protos.map((proto) => { + return new Buffer(proto + '\n') + }) + const values = [buf].concat(encodedProtos) - handlers[protocol] = handlerFunc + pull( + pull.values(values), + lp.encode(), + conn + ) } } diff --git a/test/multistream.spec.js b/test/index.spec.js similarity index 61% rename from test/multistream.spec.js rename to test/index.spec.js index 41c3cf3..e0e24eb 100644 --- a/test/multistream.spec.js +++ b/test/index.spec.js @@ -3,17 +3,17 @@ 'use strict' const expect = require('chai').expect -const streamPair = require('stream-pair') +const pull = require('pull-stream') +const pair = require('pull-pair/duplex') const multistream = require('../src') const parallel = require('run-parallel') const series = require('run-series') -const bl = require('bl') describe('multistream normal mode', function () { it('performs multistream handshake', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] parallel([ (cb) => { @@ -30,9 +30,9 @@ describe('multistream normal mode', function () { }) it('handle and select a protocol', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -53,82 +53,36 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msd.select('/monkey/1.0.0', (err, conn) => { expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - next() - })) - conn.write('banana') - conn.end() - }) - } - ], done) - }) - it('handle and select a protocol, respecting pause and resume ', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other - let handled = false - let msl - let msd - series([ - (next) => { - parallel([ - (cb) => { - msl = new multistream.Listener() - expect(msl).to.exist - msl.handle(listenerConn, cb) - }, - (cb) => { - msd = new multistream.Dialer() - expect(msd).to.exist - msd.handle(dialerConn, cb) - } - ], next) - }, - (next) => { - dialerConn.cork() - listenerConn.pause() - - msl.addHandler('/monkey/1.0.0', (conn) => { - handled = true - conn.pipe(conn) - }) - next() - }, - (next) => { - msd.select('/monkey/1.0.0', (err, conn) => { - expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - expect(handled).to.be.eql(true) - next() - })) - conn.write('banana') - conn.end() + pull( + pull.values(['banana']), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist + expect( + data + ).to.be.eql( + ['banana'] + ) + next() + }) + ) }) - setTimeout(() => { - expect(handled).to.be.eql(false) - dialerConn.uncork() - listenerConn.resume() - }, 100) } ], done) }) it('select non existing proto', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -157,9 +111,9 @@ describe('multistream normal mode', function () { }) it('ls', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -180,19 +134,19 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msl.addHandler('/giraffe/2.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msl.addHandler('/elephant/2.5.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, @@ -211,9 +165,9 @@ describe('multistream normal mode', function () { }) it('handler must be a function', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -233,20 +187,20 @@ describe('multistream normal mode', function () { ], next) }, (next) => { - try { - msd.addHandler('/monkey/1.0.0', 'potato') - } catch (err) { - expect(err).to.exist - next() - } + expect( + () => msl.addHandler('/monkey/1.0.0', 'potato') + ).to.throw( + /must be a function/ + ) + next() } ], done) }) it('racing condition resistent', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -262,7 +216,7 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() } @@ -274,13 +228,20 @@ describe('multistream normal mode', function () { expect(err).to.not.exist msd.select('/monkey/1.0.0', (err, conn) => { expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - cb() - })) - conn.write('banana') - conn.end() + + pull( + pull.values(['banana']), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist + expect( + data + ).to.be.eql( + ['banana'] + ) + cb() + }) + ) }) }) } From 47756513ea20929ce5406f14ca68d6b95588f307 Mon Sep 17 00:00:00 2001 From: David Dias Date: Sun, 4 Sep 2016 09:50:07 -0400 Subject: [PATCH 2/4] fix(protocol): fix the implementation of the protocol - add test to verify that both dialer and listener send multistream handshake right away - fix structure and func naming - not wait on listener - support multiple selects --- package.json | 1 + src/agreement.js | 84 +++++++++++++++++++++++-------------- src/constants.js | 5 +++ src/dialer.js | 54 ++++++++++++++---------- src/listener.js | 46 ++++++++++++--------- src/protocol-id.js | 3 -- test/index.spec.js | 101 ++++++++++++++++++++++++++++++++++++++++++++- 7 files changed, 217 insertions(+), 77 deletions(-) create mode 100644 src/constants.js delete mode 100644 src/protocol-id.js diff --git a/package.json b/package.json index 66e4f8c..99b8288 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "aegir": "^6.0.0", "chai": "^3.5.0", "pre-commit": "^1.1.3", + "pull-length-prefixed": "^1.1.0", "pull-pair": "^1.1.0", "run-parallel": "^1.1.6", "run-series": "^1.1.4" diff --git a/src/agreement.js b/src/agreement.js index 6f6761c..b329142 100644 --- a/src/agreement.js +++ b/src/agreement.js @@ -5,57 +5,75 @@ const lp = require('pull-length-prefixed') const pull = require('pull-stream') const Connection = require('interface-connection').Connection const debug = require('debug') -const log = debug('libp2p:multistream:agreement') -log.error = debug('libp2p:multistream:agreement:error') +const log = debug('multistream:agreement') +log.error = debug('multistream:agreement:error') + +exports.select = (multicodec, callback) => { + const stream = handshake({ + timeout: 60 * 1000 + }, callback) -exports.dial = (header, cb) => { - const stream = handshake({timeout: 60 * 1000}, cb) const shake = stream.handshake - log('writing header %s', header) - writeEncoded(shake, new Buffer(header + '\n'), cb) + log('writing multicodec %s', multicodec) + writeEncoded(shake, new Buffer(multicodec + '\n'), callback) lp.decodeFromReader(shake, (err, data) => { - if (err) return cb(err) + if (err) { + return callback(err) + } const protocol = data.toString().slice(0, -1) - if (protocol !== header) { - cb(new Error(`Unkown header: "${protocol}"`)) + + if (protocol !== multicodec) { + return callback(new Error(`"${multicodec}" not supported`), shake.rest()) } - log('header ack') - cb(null, shake.rest()) + log('multicodec ack') + callback(null, shake.rest()) }) return stream } -exports.listen = (rawConn, handlersMap, defaultHandler) => { +exports.handlerSelector = (rawConn, handlersMap) => { const cb = (err) => { - // TODO: pass errors somewhere + // incoming errors are irrelevant for the app log.error(err) } - const stream = handshake({timeout: 60 * 1000}, cb) + + const stream = handshake({ + timeout: 60 * 1000 + }, cb) + const shake = stream.handshake - lp.decodeFromReader(shake, (err, data) => { - if (err) return cb(err) - log('received: %s', data.toString()) - const protocol = data.toString().slice(0, -1) - const [key] = Object.keys(handlersMap).filter((id) => id === protocol) - - if (key) { - log('ack: %s', protocol) - writeEncoded(shake, data, cb) - handlersMap[key](new Connection(shake.rest(), rawConn)) - } else { - log('unkown protocol: %s', protocol) - defaultHandler(protocol, shake.rest()) - } - }) + next() + + function next () { + lp.decodeFromReader(shake, (err, data) => { + if (err) { + return cb(err) + } + log('received: %s', data.toString()) + const protocol = data.toString().slice(0, -1) + const [key] = Object.keys(handlersMap).filter((id) => id === protocol) + + if (key) { + log('ack: %s', protocol) + writeEncoded(shake, data, cb) + handlersMap[key](new Connection(shake.rest(), rawConn)) + } else { + log('received multicodec of not supported protocol: %s', protocol) + writeEncoded(shake, new Buffer('na\n')) + next() + } + }) + } return stream } +// prefixes a message with a varint function encode (msg, cb) { const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)] @@ -63,7 +81,9 @@ function encode (msg, cb) { pull.values(values), lp.encode(), pull.collect((err, encoded) => { - if (err) return cb(err) + if (err) { + return cb(err) + } cb(null, encoded[0]) }) ) @@ -71,7 +91,9 @@ function encode (msg, cb) { function writeEncoded (writer, msg, cb) { encode(msg, (err, msg) => { - if (err) return cb(err) + if (err) { + return cb(err) + } writer.write(msg) }) } diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..fc0d0a2 --- /dev/null +++ b/src/constants.js @@ -0,0 +1,5 @@ +'use strict' + +module.exports = { + PROTOCOL_ID: '/multistream/1.0.0' +} diff --git a/src/dialer.js b/src/dialer.js index eecb70f..0482ddd 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -5,10 +5,10 @@ const varint = require('varint') const pull = require('pull-stream') const Connection = require('interface-connection').Connection const debug = require('debug') -const log = debug('libp2p:multistream:dialer') +const log = debug('multistream:dialer') -const PROTOCOL_ID = require('./protocol-id') -const agreement = require('./agreement') +const PROTOCOL_ID = require('./constants').PROTOCOL_ID +const agrmt = require('./agreement') module.exports = class Dialer { constructor () { @@ -16,41 +16,47 @@ module.exports = class Dialer { } // perform the multistream handshake - handle (rawConn, cb) { + handle (rawConn, callback) { log('handling connection') - const ms = agreement.dial(PROTOCOL_ID, (err, conn) => { + const ms = agrmt.select(PROTOCOL_ID, (err, conn) => { if (err) { - return cb(err) + return callback(err) } log('handshake success') this.conn = new Connection(conn, rawConn) - cb() + callback() }) pull(rawConn, ms, rawConn) } - select (protocol, cb) { - log('selecting %s', protocol) + select (protocol, callback) { + log('dialer select %s', protocol) if (!this.conn) { - return cb(new Error('multistream handshake has not finalized yet')) + return callback(new Error('multistream handshake has not finalized yet')) } - const stream = agreement.dial(protocol, (err, conn) => { + const selectStream = agrmt.select(protocol, (err, conn) => { if (err) { - return cb(err) + this.conn = new Connection(conn, this.conn) + return callback(err) } - // TODO: handle 'na' - cb(null, new Connection(conn, this.conn)) + callback(null, new Connection(conn, this.conn)) }) - pull(this.conn, stream, this.conn) + pull( + this.conn, + selectStream, + this.conn + ) } - ls (cb) { - const ls = agreement.dial('ls', (err, conn) => { - if (err) return cb(err) + ls (callback) { + const lsStream = agrmt.select('ls', (err, conn) => { + if (err) { + return callback(err) + } pull( conn, @@ -58,13 +64,19 @@ module.exports = class Dialer { collectLs(conn), pull.map(stringify), pull.collect((err, list) => { - if (err) return cb(err) - return cb(null, list.slice(1)) + if (err) { + return callback(err) + } + callback(null, list.slice(1)) }) ) }) - pull(this.conn, ls, this.conn) + pull( + this.conn, + lsStream, + this.conn + ) } } diff --git a/src/listener.js b/src/listener.js index 9bc8d62..f2956f8 100644 --- a/src/listener.js +++ b/src/listener.js @@ -7,9 +7,10 @@ const isFunction = require('lodash.isfunction') const assert = require('assert') const debug = require('debug') const log = debug('libp2p:multistream:listener') +const Connection = require('interface-connection').Connection -const PROTOCOL_ID = require('./protocol-id') -const agreement = require('./agreement') +const PROTOCOL_ID = require('./constants').PROTOCOL_ID +const agrmt = require('./agreement') module.exports = class Listener { constructor () { @@ -19,27 +20,32 @@ module.exports = class Listener { } // perform the multistream handshake - handle (conn, cb) { + handle (rawConn, callback) { log('handling connection') - const ms = agreement.listen(conn, { - [PROTOCOL_ID]: (conn) => { - log('handshake success') - const msgHandler = agreement.listen(conn, this.handlers, (protocol, conn) => { - log('unkown protocol: %s', protocol) - pull( - pull.values([new Buffer('na')]), - conn - ) - }) - pull(conn, msgHandler, conn) - - cb() + + const selectStream = agrmt.select(PROTOCOL_ID, (err, conn) => { + if (err) { + return callback(err) } - }, () => { - cb(new Error('unkown protocol')) + + const hsConn = new Connection(conn, rawConn) + + const handlerSelector = agrmt.handlerSelector(hsConn, this.handlers) + + pull( + hsConn, + handlerSelector, + hsConn + ) + + callback() }) - pull(conn, ms, conn) + pull( + rawConn, + selectStream, + rawConn + ) } // be ready for a given `protocol` @@ -49,13 +55,13 @@ module.exports = class Listener { assert(isFunction(handler), 'handler must be a function') if (this.handlers[protocol]) { - // TODO: Do we want to handle this better? log('overwriting handler for %s', protocol) } this.handlers[protocol] = handler } + // inner function - handler for `ls` _ls (conn) { const protos = Object.keys(this.handlers) .filter((key) => key !== 'ls') diff --git a/src/protocol-id.js b/src/protocol-id.js deleted file mode 100644 index 9bd6c32..0000000 --- a/src/protocol-id.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict' - -module.exports = '/multistream/1.0.0' diff --git a/test/index.spec.js b/test/index.spec.js index e0e24eb..7fd456a 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -4,13 +4,55 @@ const expect = require('chai').expect const pull = require('pull-stream') +const lp = require('pull-length-prefixed') const pair = require('pull-pair/duplex') const multistream = require('../src') const parallel = require('run-parallel') const series = require('run-series') -describe('multistream normal mode', function () { - it('performs multistream handshake', (done) => { +describe('multistream dialer', () => { + it('sends the multistream multicodec', (done) => { + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] + + pull( + listenerConn, + lp.decode(), + pull.drain((data) => { + expect(data.toString()).to.equal('/multistream/1.0.0\n') + done() + }) + ) + + const msd = new multistream.Dialer() + expect(msd).to.exist + msd.handle(dialerConn, () => {}) + }) +}) +describe('multistream listener', () => { + it('sends the multistream multicodec', (done) => { + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] + + pull( + dialerConn, + lp.decode(), + pull.drain((data) => { + expect(data.toString()).to.equal('/multistream/1.0.0\n') + done() + }) + ) + + const msl = new multistream.Listener() + expect(msl).to.exist + msl.handle(listenerConn, () => {}) + }) +}) + +describe('multistream handshake', () => { + it('performs the handshake handshake', (done) => { const p = pair() const dialerConn = p[0] const listenerConn = p[1] @@ -110,6 +152,61 @@ describe('multistream normal mode', function () { ], done) }) + it('select a non existing proto and then select an existing proto', (done) => { + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] + + let msl + let msd + series([ + (next) => { + parallel([ + (cb) => { + msl = new multistream.Listener() + expect(msl).to.exist + msl.handle(listenerConn, cb) + }, + (cb) => { + msd = new multistream.Dialer() + expect(msd).to.exist + msd.handle(dialerConn, cb) + } + ], next) + }, + (next) => { + msl.addHandler('/monkey/1.0.0', (conn) => { + pull(conn, conn) + }) + next() + }, + (next) => { + msd.select('/sadpanda/1.0.0', (err) => { + expect(err).to.exist + next() + }) + }, + (next) => { + msd.select('/monkey/1.0.0', (err, conn) => { + expect(err).to.not.exist + pull( + pull.values(['banana']), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist + expect( + data + ).to.be.eql( + ['banana'] + ) + next() + }) + ) + }) + } + ], done) + }) + it('ls', (done) => { const p = pair() const dialerConn = p[0] From 5c1fc3bb3e5503639e7380cdfd0dce6528393197 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Sep 2016 15:55:16 -0400 Subject: [PATCH 3/4] docs(readme): pull-stream docs --- README.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3174ea3..7a655db 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,33 @@ ms.ls() `callback` is a function of type `function (err, protocols)` where `err` is an error object that gets passed if something wrong happend and `protocols` is an array of the supported protocols in the other end. +### This module uses `pull-streams` + +We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). + +You can learn more about pull-streams at: + +- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) +- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) +- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) +- [pull-streams documentation](https://pull-stream.github.io/) + +#### Converting `pull-streams` to Node.js Streams + +If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example: + +```js +const pullToStream = require('pull-stream-to-stream') + +const nodeStreamInstance = pullToStream(pullStreamInstance) +// nodeStreamInstance is an instance of a Node.js Stream +``` + +To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream. + + + + ## Maintainers Captain: [@diasdavid](https://github.com/diasdavid). @@ -161,4 +188,3 @@ Small note: If editing the Readme, please conform to the [standard-readme](https ## License [MIT](LICENSE) © David Dias - From bdc239f90b509940b22f9e7c44479b3991d59d1c Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Sep 2016 15:59:12 -0400 Subject: [PATCH 4/4] feat(deps, lint): update deps, remove unnecessary lint patch, remove unnsupported feature --- package.json | 9 ++++----- src/agreement.js | 3 ++- src/dialer.js | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index 99b8288..9e9ca5b 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "multistream-select", "version": "0.10.0", "description": "JavaScript implementation of the multistream spec", - "main": "src/index.js", + "main": "lib/index.js", "jsnext:main": "src/index.js", "scripts": { "lint": "aegir-lint", @@ -43,19 +43,18 @@ "dependencies": { "babel-runtime": "^6.11.6", "debug": "^2.2.0", - "interface-connection": "^0.1.8", + "interface-connection": "^0.2.1", "lodash.isfunction": "^3.0.8", "lodash.range": "^3.1.7", "pull-handshake": "^1.1.3", - "pull-length-prefixed": "^1.0.0", + "pull-length-prefixed": "^1.1.0", "pull-stream": "^3.4.3", "varint": "^4.0.1" }, "devDependencies": { - "aegir": "^6.0.0", + "aegir": "^8.0.0", "chai": "^3.5.0", "pre-commit": "^1.1.3", - "pull-length-prefixed": "^1.1.0", "pull-pair": "^1.1.0", "run-parallel": "^1.1.6", "run-series": "^1.1.4" diff --git a/src/agreement.js b/src/agreement.js index b329142..4a925d5 100644 --- a/src/agreement.js +++ b/src/agreement.js @@ -56,7 +56,8 @@ exports.handlerSelector = (rawConn, handlersMap) => { } log('received: %s', data.toString()) const protocol = data.toString().slice(0, -1) - const [key] = Object.keys(handlersMap).filter((id) => id === protocol) + const result = Object.keys(handlersMap).filter((id) => id === protocol) + const key = result && result[0] if (key) { log('ack: %s', protocol) diff --git a/src/dialer.js b/src/dialer.js index 0482ddd..a94249d 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -90,7 +90,7 @@ function collectLs (conn) { return pull.take((msg) => { if (first) { - const size = varint.decode(msg) // eslint-disable-line + varint.decode(msg) counter = varint.decode(msg, varint.decode.bytes) return true }