From a68dc87bbdeb3905f30d620523c09c97c3e1a132 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 23 Jan 2019 15:06:11 +0000 Subject: [PATCH 1/3] feat: initial implementation --- .gitignore | 40 ++++ LICENSE | 21 ++ README.md | 74 +++++- ci/Jenkinsfile | 2 + package.json | 65 ++++++ src/index.js | 226 ++++++++++++++++++ src/message/index.js | 10 + src/message/rpc.proto.js | 18 ++ src/message/topic-descriptor.proto.js | 30 +++ src/peer.js | 187 +++++++++++++++ test/node.js | 3 + test/nodejs-bundle.js | 24 ++ test/pubsub.js | 321 ++++++++++++++++++++++++++ test/utils.js | 18 ++ 14 files changed, 1038 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 ci/Jenkinsfile create mode 100644 package.json create mode 100644 src/index.js create mode 100644 src/message/index.js create mode 100644 src/message/rpc.proto.js create mode 100644 src/message/topic-descriptor.proto.js create mode 100644 src/peer.js create mode 100644 test/node.js create mode 100644 test/nodejs-bundle.js create mode 100644 test/pubsub.js create mode 100644 test/utils.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000..649e624d2e --- /dev/null +++ b/.gitignore @@ -0,0 +1,40 @@ +docs +**/node_modules/ +**/*.log +test/repo-tests* + +# Logs +logs +*.log + +coverage + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +build + +# Dependency directory +# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git +node_modules + +dist + +docs + +package-lock.json +yarn.lock diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000..58b2056933 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2019 Protocol Labs, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index bcfd8c96bb..efe74c1db7 100644 --- a/README.md +++ b/README.md @@ -1 +1,73 @@ -# js-libp2p-pubsub +js-libp2p-pubsub +================== + +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) +[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://libp2p.io/) +[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) +[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-pubsub/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-pubsub?branch=master) +[![Travis CI](https://travis-ci.org/libp2p/js-libp2p-pubsub.svg?branch=master)](https://travis-ci.org/libp2p/js-libp2p-pubsub) +[![Circle CI](https://circleci.com/gh/libp2p/js-libp2p-pubsub.svg?style=svg)](https://circleci.com/gh/libp2p/js-libp2p-pubsub) +[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-pubsub.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-pubsub) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) +[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) +[![](https://img.shields.io/badge/pm-waffle-yellow.svg?style=flat-square)](https://waffle.io/libp2p/js-libp2p-pubsub) + +> libp2p-pubsub consits on the base protocol for libp2p pubsub implementation. This module is responsible for all the logic regarding peer connections. + +## Lead Maintainer + +[Vasco Santos](https://github.com/vasco-santos). + +## Table of Contents + +- [Install](#install) +- [Usage](#usage) +- [Contribute](#contribute) +- [License](#license) + +## Install + +```sh +> npm install libp2p-pubsub +``` + +## Usage + +Create your pubsub implementation extending the base protocol. + +```JavaScript +const Pubsub = require('libp2p-pubsub') + +class PubsubImplementation extends Pubsub { + constructor(libp2p) { + super('libp2p:pubsub', '/pubsub-implementation/1.0.0', libp2p) + } + + _processConnection(idB58Str, conn, peer) { + // Process each message accordingly + } + + publish() { + + } + + subscribe() { + + } + + unsubscribe() { + + } +} +``` + +## Contribute + +Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/js-libp2p-pubsub/issues)! + +This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). + +[![](https://cdn.rawgit.com/jbenet/contribute-ipfs-gif/master/img/contribute.gif)](https://github.com/ipfs/community/blob/master/contributing.md) + +## License + +Copyright (c) Protocol Labs, Inc. under the **MIT License**. See [LICENSE file](./LICENSE) for details. diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile new file mode 100644 index 0000000000..a7da2e54f3 --- /dev/null +++ b/ci/Jenkinsfile @@ -0,0 +1,2 @@ +// Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. +javascript() diff --git a/package.json b/package.json new file mode 100644 index 0000000000..a3099ed15d --- /dev/null +++ b/package.json @@ -0,0 +1,65 @@ +{ + "name": "libp2p-pubsub", + "version": "0.0.0", + "description": "Pubsub base protocol for libp2p pubsub routers", + "leadMaintainer": "Vasco Santos ", + "main": "src/index.js", + "scripts": { + "lint": "aegir lint", + "test": "aegir test -t node", + "test:node": "aegir test -t node", + "build": "aegir build", + "docs": "aegir-docs", + "release": "aegir release --docs", + "release-minor": "aegir release --type minor --docs", + "release-major": "aegir release --type major --docs", + "coverage": "aegir coverage", + "coverage-publish": "aegir coverage --provider coveralls" + }, + "files": [ + "src", + "dist" + ], + "pre-push": [ + "lint" + ], + "repository": { + "type": "git", + "url": "git+https://github.com/libp2p/js-libp2p-pubsub.git" + }, + "keywords": [ + "IPFS", + "libp2p", + "pubsub", + "gossip", + "flood", + "flooding" + ], + "license": "MIT", + "bugs": { + "url": "https://github.com/libp2p/js-libp2p-pubsub/issues" + }, + "homepage": "https://github.com/libp2p/js-libp2p-pubsub#readme", + "devDependencies": { + "aegir": "^18.0.3", + "benchmark": "^2.1.4", + "chai": "^4.2.0", + "chai-spies": "^1.0.0", + "dirty-chai": "^2.0.1", + "libp2p": "~0.24.4", + "libp2p-secio": "~0.11.0", + "libp2p-spdy": "~0.13.1", + "libp2p-tcp": "~0.13.0", + "lodash": "^4.17.11", + "peer-id": "~0.12.2", + "peer-info": "~0.15.1" + }, + "dependencies": { + "async": "^2.6.1", + "debug": "^4.1.1", + "length-prefixed-stream": "^1.6.0", + "protons": "^1.0.1", + "pull-pushable": "^2.2.0" + }, + "contributors": [] +} diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000000..af7f03d247 --- /dev/null +++ b/src/index.js @@ -0,0 +1,226 @@ +'use strict' + +const EventEmitter = require('events') +const pull = require('pull-stream/pull') +const empty = require('pull-stream/sources/empty') +const asyncEach = require('async/each') +const debug = require('debug') + +const Peer = require('./peer') +const message = require('./message') + +const nextTick = require('async/nextTick') + +/** + * PubsubBaseProtocol handles the peers and connections logic for pubsub routers + */ +class PubsubBaseProtocol extends EventEmitter { + /** + * @param {String} debugName + * @param {String} multicodec + * @param {Object} libp2p + * @constructor + */ + constructor (debugName, multicodec, libp2p) { + super() + + this.log = debug(debugName) + this.log.err = debug(`${debugName}:error`) + this.multicodec = multicodec + this.libp2p = libp2p + this.started = false + + /** + * Map of peers. + * + * @type {Map} + */ + this.peers = new Map() + + // Dials that are currently in progress + this._dials = new Set() + + this._onConnection = this._onConnection.bind(this) + this._dialPeer = this._dialPeer.bind(this) + } + + _addPeer (peer) { + const id = peer.info.id.toB58String() + + /* + Always use an existing peer. + + What is happening here is: "If the other peer has already dialed to me, we already have + an establish link between the two, what might be missing is a + Connection specifically between me and that Peer" + */ + let existing = this.peers.get(id) + if (!existing) { + this.log('new peer', id) + this.peers.set(id, peer) + existing = peer + + peer.once('close', () => this._removePeer(peer)) + } + ++existing._references + + return existing + } + + _removePeer (peer) { + const id = peer.info.id.toB58String() + + this.log('remove', id, peer._references) + // Only delete when no one else is referencing this peer. + if (--peer._references === 0) { + this.log('delete peer', id) + this.peers.delete(id) + } + + return peer + } + + _dialPeer (peerInfo, callback) { + callback = callback || function noop () { } + const idB58Str = peerInfo.id.toB58String() + + // If already have a PubSub conn, ignore + const peer = this.peers.get(idB58Str) + if (peer && peer.isConnected) { + return nextTick(() => callback()) + } + + // If already dialing this peer, ignore + if (this._dials.has(idB58Str)) { + this.log('already dialing %s, ignoring dial attempt', idB58Str) + return nextTick(() => callback()) + } + this._dials.add(idB58Str) + + this.log('dialing %s', idB58Str) + this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { + this.log('dial to %s complete', idB58Str) + + // If the dial is not in the set, it means that floodsub has been + // stopped + const floodsubStopped = !this._dials.has(idB58Str) + this._dials.delete(idB58Str) + + if (err) { + this.log.err(err) + return callback() + } + + // Floodsub has been stopped, so we should just bail out + if (floodsubStopped) { + this.log('floodsub was stopped, not processing dial to %s', idB58Str) + return callback() + } + + this._onDial(peerInfo, conn, callback) + }) + } + + _onDial (peerInfo, conn, callback) { + const idB58Str = peerInfo.id.toB58String() + this.log('connected', idB58Str) + + const peer = this._addPeer(new Peer(peerInfo)) + peer.attachConnection(conn) + + nextTick(() => callback()) + } + + _onConnection (protocol, conn) { + conn.getPeerInfo((err, peerInfo) => { + if (err) { + this.log.err('Failed to identify incomming conn', err) + return pull(empty(), conn) + } + + const idB58Str = peerInfo.id.toB58String() + const peer = this._addPeer(new Peer(peerInfo)) + + this._processConnection(idB58Str, conn, peer) + }) + } + + _processConnection (idB58Str, conn, peer) { + throw new Error('_processConnection must be implemented by the subclass') + } + + _onConnectionEnd (idB58Str, peer, err) { + // socket hang up, means the one side canceled + if (err && err.message !== 'socket hang up') { + this.log.err(err) + } + + this.log('connection ended', idB58Str, err ? err.message : '') + this._removePeer(peer) + } + + /** + * Mounts the floodsub protocol onto the libp2p node and sends our + * subscriptions to every peer conneceted + * + * @param {Function} callback + * @returns {undefined} + * + */ + start (callback) { + if (this.started) { + return nextTick(() => callback(new Error('already started'))) + } + this.log('starting') + + this.libp2p.handle(this.multicodec, this._onConnection) + + // Speed up any new peer that comes in my way + this.libp2p.on('peer:connect', this._dialPeer) + + // Dial already connected peers + const peerInfos = Object.values(this.libp2p.peerBook.getAll()) + + asyncEach(peerInfos, (peer, cb) => this._dialPeer(peer, cb), (err) => { + nextTick(() => { + this.log('started') + this.started = true + callback(err) + }) + }) + } + + /** + * Unmounts the floodsub protocol and shuts down every connection + * + * @param {Function} callback + * @returns {undefined} + * + */ + stop (callback) { + if (!this.started) { + return nextTick(() => callback(new Error('not started yet'))) + } + + this.libp2p.unhandle(this.multicodec) + this.libp2p.removeListener('peer:connect', this._dialPeer) + + // Prevent any dials that are in flight from being processed + this._dials = new Set() + + this.log('stopping') + asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => { + if (err) { + return callback(err) + } + + this.log('stopped') + this.peers = new Map() + this.started = false + callback() + }) + } +} + +module.exports = PubsubBaseProtocol +module.exports.message = message diff --git a/src/message/index.js b/src/message/index.js new file mode 100644 index 0000000000..ed860a60ef --- /dev/null +++ b/src/message/index.js @@ -0,0 +1,10 @@ +'use strict' + +const protons = require('protons') + +const rpcProto = protons(require('./rpc.proto.js')) +const topicDescriptorProto = protons(require('./topic-descriptor.proto.js')) + +exports = module.exports +exports.rpc = rpcProto +exports.td = topicDescriptorProto diff --git a/src/message/rpc.proto.js b/src/message/rpc.proto.js new file mode 100644 index 0000000000..50eb9506dc --- /dev/null +++ b/src/message/rpc.proto.js @@ -0,0 +1,18 @@ +'use strict' +module.exports = ` +message RPC { + repeated SubOpts subscriptions = 1; + repeated Message msgs = 2; + + message SubOpts { + optional bool subscribe = 1; // subscribe or unsubcribe + optional string topicCID = 2; + } + + message Message { + optional bytes from = 1; + optional bytes data = 2; + optional bytes seqno = 3; + repeated string topicIDs = 4; + } +}` diff --git a/src/message/topic-descriptor.proto.js b/src/message/topic-descriptor.proto.js new file mode 100644 index 0000000000..6e829ca579 --- /dev/null +++ b/src/message/topic-descriptor.proto.js @@ -0,0 +1,30 @@ +'use strict' +module.exports = ` +// topicCID = cid(merkledag_protobuf(topicDescriptor)); (not the topic.name) +message TopicDescriptor { + optional string name = 1; + optional AuthOpts auth = 2; + optional EncOpts enc = 2; + + message AuthOpts { + optional AuthMode mode = 1; + repeated bytes keys = 2; // root keys to trust + + enum AuthMode { + NONE = 0; // no authentication, anyone can publish + KEY = 1; // only messages signed by keys in the topic descriptor are accepted + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } + + message EncOpts { + optional EncMode mode = 1; + repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) + + enum EncMode { + NONE = 0; // no encryption, anyone can read + SHAREDKEY = 1; // messages are encrypted with shared key + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } +}` diff --git a/src/peer.js b/src/peer.js new file mode 100644 index 0000000000..c0af044273 --- /dev/null +++ b/src/peer.js @@ -0,0 +1,187 @@ +'use strict' + +const lp = require('pull-length-prefixed') +const Pushable = require('pull-pushable') +const pull = require('pull-stream') +const setImmediate = require('async/setImmediate') +const EventEmitter = require('events') + +const rpc = require('./message').rpc.RPC + +/** + * The known state of a connected peer. + */ +class Peer extends EventEmitter { + /** + * @param {PeerInfo} info + */ + constructor (info) { + super() + + /** + * @type {PeerInfo} + */ + this.info = info + /** + * @type {Connection} + */ + this.conn = null + /** + * @type {Set} + */ + this.topics = new Set() + /** + * @type {Pushable} + */ + this.stream = null + + this._references = 0 + } + + /** + * Is the peer connected currently? + * + * @type {boolean} + */ + get isConnected () { + return Boolean(this.conn) + } + + /** + * Do we have a connection to write on? + * + * @type {boolean} + */ + get isWritable () { + return Boolean(this.stream) + } + + /** + * Send a message to this peer. + * Throws if there is no `stream` to write to available. + * + * @param {Buffer} msg + * @returns {undefined} + */ + write (msg) { + if (!this.isWritable) { + const id = this.info.id.toB58String() + throw new Error('No writable connection to ' + id) + } + + this.stream.push(msg) + } + + /** + * Attach the peer to a connection and setup a write stream + * + * @param {Connection} conn + * @returns {undefined} + */ + attachConnection (conn) { + this.conn = conn + this.stream = new Pushable() + + pull( + this.stream, + lp.encode(), + conn, + pull.onEnd(() => { + this.conn = null + this.stream = null + this.emit('close') + }) + ) + + this.emit('connection') + } + + _sendRawSubscriptions (topics, subscribe) { + if (topics.size === 0) { + return + } + + const subs = [] + topics.forEach((topic) => { + subs.push({ + subscribe: subscribe, + topicCID: topic + }) + }) + + this.write(rpc.encode({ + subscriptions: subs + })) + } + + /** + * Send the given subscriptions to this peer. + * @param {Set|Array} topics + * @returns {undefined} + */ + sendSubscriptions (topics) { + this._sendRawSubscriptions(topics, true) + } + + /** + * Send the given unsubscriptions to this peer. + * @param {Set|Array} topics + * @returns {undefined} + */ + sendUnsubscriptions (topics) { + this._sendRawSubscriptions(topics, false) + } + + /** + * Send messages to this peer. + * + * @param {Array} msgs + * @returns {undefined} + */ + sendMessages (msgs) { + this.write(rpc.encode({ + msgs: msgs + })) + } + + /** + * Bulk process subscription updates. + * + * @param {Array} changes + * @returns {undefined} + */ + updateSubscriptions (changes) { + changes.forEach((subopt) => { + if (subopt.subscribe) { + this.topics.add(subopt.topicCID) + } else { + this.topics.delete(subopt.topicCID) + } + }) + } + + /** + * Closes the open connection to peer + * + * @param {Function} callback + * @returns {undefined} + */ + close (callback) { + // Force removal of peer + this._references = 1 + + // End the pushable + if (this.stream) { + this.stream.end() + } + + setImmediate(() => { + this.conn = null + this.stream = null + this.emit('close') + callback() + }) + } +} + +module.exports = Peer diff --git a/test/node.js b/test/node.js new file mode 100644 index 0000000000..4d8e1688dc --- /dev/null +++ b/test/node.js @@ -0,0 +1,3 @@ +'use strict' + +require('./pubsub.js') diff --git a/test/nodejs-bundle.js b/test/nodejs-bundle.js new file mode 100644 index 0000000000..c5e840a84d --- /dev/null +++ b/test/nodejs-bundle.js @@ -0,0 +1,24 @@ +'use strict' + +const TCP = require('libp2p-tcp') +const spdy = require('libp2p-spdy') +const secio = require('libp2p-secio') +const libp2p = require('libp2p') + +class Node extends libp2p { + constructor ({ peerInfo, peerBook }) { + const modules = { + transport: [TCP], + streamMuxer: [spdy], + connEncryption: [secio] + } + + super({ + modules, + peerInfo, + peerBook + }) + } +} + +module.exports = Node diff --git a/test/pubsub.js b/test/pubsub.js new file mode 100644 index 0000000000..726a8f715d --- /dev/null +++ b/test/pubsub.js @@ -0,0 +1,321 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-spies')) +const expect = chai.expect +const series = require('async/series') +const parallel = require('async/parallel') + +const PubsubBaseProtocol = require('../src') +const utils = require('./utils') +const createNode = utils.createNode + +class PubsubImplementation extends PubsubBaseProtocol { + constructor (libp2p) { + super('libp2p:floodsub', 'libp2p:pubsub-implementation', libp2p) + } + + _processConnection (idB58Str, conn, peer) { + // ... + } +} + +describe('pubsub base protocol', () => { + describe('fresh nodes', () => { + let nodeA + let nodeB + let psA + let psB + + before((done) => { + series([ + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (err, nodes) => { + if (err) { + return done(err) + } + nodeA = nodes[0] + nodeB = nodes[1] + done() + }) + }) + + after((done) => { + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], done) + }) + + it('mount the pubsub protocol', (done) => { + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) + + setTimeout(() => { + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + done() + }, 50) + }) + + it('start both Pubsub', (done) => { + parallel([ + (cb) => psA.start(cb), + (cb) => psB.start(cb) + ], done) + }) + + it('Dial from nodeA to nodeB', (done) => { + series([ + (cb) => nodeA.dial(nodeB.peerInfo, cb), + (cb) => setTimeout(() => { + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) + cb() + }, 1000) + ], done) + }) + }) + + describe('dial the pubsub protocol on mount', () => { + let nodeA + let nodeB + let psA + let psB + + before((done) => { + series([ + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (cb, nodes) => { + nodeA = nodes[0] + nodeB = nodes[1] + nodeA.dial(nodeB.peerInfo, () => setTimeout(done, 1000)) + }) + }) + + after((done) => { + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], done) + }) + + it('dial on pubsub on mount', (done) => { + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) + + parallel([ + (cb) => psA.start(cb), + (cb) => psB.start(cb) + ], next) + + function next () { + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) + done() + } + }) + + it('stop both pubsubs', (done) => { + parallel([ + (cb) => psA.stop(cb), + (cb) => psB.stop(cb) + ], done) + }) + }) + + describe('prevent concurrent dials', () => { + let sandbox + let nodeA + let nodeB + let psA + let psB + + before((done) => { + sandbox = chai.spy.sandbox() + + series([ + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (err, nodes) => { + if (err) return done(err) + + nodeA = nodes[0] + nodeB = nodes[1] + + // Put node B in node A's peer book + nodeA.peerBook.put(nodeB.peerInfo) + + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) + + psB.start(done) + }) + }) + + after((done) => { + sandbox.restore() + + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], (ignoreErr) => { + done() + }) + }) + + it('does not dial twice to same peer', (done) => { + sandbox.on(psA, ['_onDial']) + + // When node A starts, it will dial all peers in its peer book, which + // is just peer B + psA.start(startComplete) + + // Simulate a connection coming in from peer B at the same time. This + // causes pubsub to dial peer B + nodeA.emit('peer:connect', nodeB.peerInfo) + + function startComplete () { + // Check that only one dial was made + setTimeout(() => { + expect(psA._onDial).to.have.been.called.once() + done() + }, 1000) + } + }) + }) + + describe('allow dials even after error', () => { + let sandbox + let nodeA + let nodeB + let psA + let psB + + before((done) => { + sandbox = chai.spy.sandbox() + + series([ + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (err, nodes) => { + if (err) return done(err) + + nodeA = nodes[0] + nodeB = nodes[1] + + // Put node B in node A's peer book + nodeA.peerBook.put(nodeB.peerInfo) + + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) + + psB.start(done) + }) + }) + + after((done) => { + sandbox.restore() + + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], (ignoreErr) => { + done() + }) + }) + + it('can dial again after error', (done) => { + let firstTime = true + const dialProtocol = psA.libp2p.dialProtocol.bind(psA.libp2p) + sandbox.on(psA.libp2p, 'dialProtocol', (peerInfo, multicodec, cb) => { + // Return an error for the first dial + if (firstTime) { + firstTime = false + return cb(new Error('dial error')) + } + + // Subsequent dials proceed as normal + dialProtocol(peerInfo, multicodec, cb) + }) + + // When node A starts, it will dial all peers in its peer book, which + // is just peer B + psA.start(startComplete) + + function startComplete () { + // Simulate a connection coming in from peer B. This causes pubsub + // to dial peer B + nodeA.emit('peer:connect', nodeB.peerInfo) + + // Check that both dials were made + setTimeout(() => { + expect(psA.libp2p.dialProtocol).to.have.been.called.twice() + done() + }, 1000) + } + }) + }) + + describe('prevent processing dial after stop', () => { + let sandbox + let nodeA + let nodeB + let psA + let psB + + before((done) => { + sandbox = chai.spy.sandbox() + + series([ + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (err, nodes) => { + if (err) return done(err) + + nodeA = nodes[0] + nodeB = nodes[1] + + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) + + parallel([ + (cb) => psA.start(cb), + (cb) => psB.start(cb) + ], done) + }) + }) + + after((done) => { + sandbox.restore() + + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], (ignoreErr) => { + done() + }) + }) + + it('does not process dial after stop', (done) => { + sandbox.on(psA, ['_onDial']) + + // Simulate a connection coming in from peer B at the same time. This + // causes pubsub to dial peer B + nodeA.emit('peer:connect', nodeB.peerInfo) + + // Stop floodsub before the dial can complete + psA.stop(() => { + // Check that the dial was not processed + setTimeout(() => { + expect(psA._onDial).to.not.have.been.called() + done() + }, 1000) + }) + }) + }) +}) diff --git a/test/utils.js b/test/utils.js new file mode 100644 index 0000000000..ee1915014c --- /dev/null +++ b/test/utils.js @@ -0,0 +1,18 @@ +'use strict' + +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const Node = require('./nodejs-bundle') +const waterfall = require('async/waterfall') + +exports.createNode = (maddr, callback) => { + waterfall([ + (cb) => PeerId.create({ bits: 1024 }, cb), + (id, cb) => PeerInfo.create(id, cb), + (peerInfo, cb) => { + peerInfo.multiaddrs.add(maddr) + cb(null, new Node({ peerInfo })) + }, + (node, cb) => node.start((err) => cb(err, node)) + ], callback) +} From 7ca7f06674d04caabb68fac97d555f0af20d9e86 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 24 Jan 2019 10:49:18 +0000 Subject: [PATCH 2/3] fix: code review --- src/index.js | 14 +++++++------- test/pubsub.js | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/index.js b/src/index.js index af7f03d247..6c18db1515 100644 --- a/src/index.js +++ b/src/index.js @@ -101,9 +101,9 @@ class PubsubBaseProtocol extends EventEmitter { this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { this.log('dial to %s complete', idB58Str) - // If the dial is not in the set, it means that floodsub has been + // If the dial is not in the set, it means that pubsub has been // stopped - const floodsubStopped = !this._dials.has(idB58Str) + const pubsubStopped = !this._dials.has(idB58Str) this._dials.delete(idB58Str) if (err) { @@ -111,9 +111,9 @@ class PubsubBaseProtocol extends EventEmitter { return callback() } - // Floodsub has been stopped, so we should just bail out - if (floodsubStopped) { - this.log('floodsub was stopped, not processing dial to %s', idB58Str) + // pubsub has been stopped, so we should just bail out + if (pubsubStopped) { + this.log('pubsub was stopped, not processing dial to %s', idB58Str) return callback() } @@ -160,7 +160,7 @@ class PubsubBaseProtocol extends EventEmitter { } /** - * Mounts the floodsub protocol onto the libp2p node and sends our + * Mounts the pubsub protocol onto the libp2p node and sends our * subscriptions to every peer conneceted * * @param {Function} callback @@ -191,7 +191,7 @@ class PubsubBaseProtocol extends EventEmitter { } /** - * Unmounts the floodsub protocol and shuts down every connection + * Unmounts the pubsub protocol and shuts down every connection * * @param {Function} callback * @returns {undefined} diff --git a/test/pubsub.js b/test/pubsub.js index 726a8f715d..50a0789610 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -14,7 +14,7 @@ const createNode = utils.createNode class PubsubImplementation extends PubsubBaseProtocol { constructor (libp2p) { - super('libp2p:floodsub', 'libp2p:pubsub-implementation', libp2p) + super('libp2p:pubsub', 'libp2p:pubsub-implementation', libp2p) } _processConnection (idB58Str, conn, peer) { @@ -308,7 +308,7 @@ describe('pubsub base protocol', () => { // causes pubsub to dial peer B nodeA.emit('peer:connect', nodeB.peerInfo) - // Stop floodsub before the dial can complete + // Stop pubsub before the dial can complete psA.stop(() => { // Check that the dial was not processed setTimeout(() => { From 326d73d899517b7964d3058f4336f6b71299fe7a Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 24 Jan 2019 11:39:20 +0000 Subject: [PATCH 3/3] chore: improve docs --- README.md | 17 +++++++--- package.json | 1 + src/index.js | 90 ++++++++++++++++++++++++++++++++++++++++++++++++-- test/pubsub.js | 12 +++++++ 4 files changed, 114 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index efe74c1db7..753a070920 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,9 @@ js-libp2p-pubsub ## Usage -Create your pubsub implementation extending the base protocol. +A pubsub implementation **MUST** override the `_processConnection`, `publish`, `subscribe` and `unsubscribe` functions. `add_peer` and `remove_peer` may be overwritten if the pubsub implementation needs to add custom logic when peers are added and remove. All the remaining functions **MUST NOT** be overwritten. + +The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic. ```JavaScript const Pubsub = require('libp2p-pubsub') @@ -43,23 +45,30 @@ class PubsubImplementation extends Pubsub { } _processConnection(idB58Str, conn, peer) { + // Required to be implemented by the subclass // Process each message accordingly } publish() { - + // Required to be implemented by the subclass } subscribe() { - + // Required to be implemented by the subclass } unsubscribe() { - + // Required to be implemented by the subclass } } ``` +## Implementations using this base protocol + +You can use the following implementations as examples for building your own pubsub implementation. + +- [libp2p/js-libp2p-floodsub](https://github.com/libp2p/js-libp2p-floodsub) + ## Contribute Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/js-libp2p-pubsub/issues)! diff --git a/package.json b/package.json index a3099ed15d..3068b3f0c9 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "dependencies": { "async": "^2.6.1", "debug": "^4.1.1", + "err-code": "^1.1.2", "length-prefixed-stream": "^1.6.0", "protons": "^1.0.1", "pull-pushable": "^2.2.0" diff --git a/src/index.js b/src/index.js index 6c18db1515..a51c68ae7f 100644 --- a/src/index.js +++ b/src/index.js @@ -4,7 +4,9 @@ const EventEmitter = require('events') const pull = require('pull-stream/pull') const empty = require('pull-stream/sources/empty') const asyncEach = require('async/each') + const debug = require('debug') +const errcode = require('err-code') const Peer = require('./peer') const message = require('./message') @@ -18,7 +20,7 @@ class PubsubBaseProtocol extends EventEmitter { /** * @param {String} debugName * @param {String} multicodec - * @param {Object} libp2p + * @param {Object} libp2p libp2p implementation * @constructor */ constructor (debugName, multicodec, libp2p) { @@ -44,6 +46,12 @@ class PubsubBaseProtocol extends EventEmitter { this._dialPeer = this._dialPeer.bind(this) } + /** + * Add a new connected peer to the peers map. + * @private + * @param {PeerInfo} peer peer info + * @returns {PeerInfo} + */ _addPeer (peer) { const id = peer.info.id.toB58String() @@ -67,6 +75,12 @@ class PubsubBaseProtocol extends EventEmitter { return existing } + /** + * Remove a peer from the peers map if it has no references. + * @private + * @param {Peer} peer peer state + * @returns {PeerInfo} + */ _removePeer (peer) { const id = peer.info.id.toB58String() @@ -80,6 +94,13 @@ class PubsubBaseProtocol extends EventEmitter { return peer } + /** + * Dial a received peer. + * @private + * @param {PeerInfo} peerInfo peer info + * @param {function} callback + * @returns {void} + */ _dialPeer (peerInfo, callback) { callback = callback || function noop () { } const idB58Str = peerInfo.id.toB58String() @@ -121,6 +142,13 @@ class PubsubBaseProtocol extends EventEmitter { }) } + /** + * Dial a received peer. + * @private + * @param {PeerInfo} peerInfo peer info + * @param {Connection} conn connection to the peer + * @param {function} callback + */ _onDial (peerInfo, conn, callback) { const idB58Str = peerInfo.id.toB58String() this.log('connected', idB58Str) @@ -131,6 +159,12 @@ class PubsubBaseProtocol extends EventEmitter { nextTick(() => callback()) } + /** + * On successful connection event. + * @private + * @param {String} protocol connection protocol + * @param {Connection} conn connection to the peer + */ _onConnection (protocol, conn) { conn.getPeerInfo((err, peerInfo) => { if (err) { @@ -145,10 +179,27 @@ class PubsubBaseProtocol extends EventEmitter { }) } + /** + * Overriding the implementation of _processConnection should keep the connection and is + * responsible for processing each RPC message received by other peers. + * @abstract + * @param {string} idB58Str peer id string in base58 + * @param {Connection} conn connection + * @param {PeerInfo} peer peer info + * @returns {undefined} + * + */ _processConnection (idB58Str, conn, peer) { - throw new Error('_processConnection must be implemented by the subclass') + throw errcode('_processConnection must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') } + /** + * On connection end event. + * @private + * @param {string} idB58Str peer id string in base58 + * @param {PeerInfo} peer peer info + * @param {Error} err error for connection end + */ _onConnectionEnd (idB58Str, peer, err) { // socket hang up, means the one side canceled if (err && err.message !== 'socket hang up') { @@ -159,6 +210,41 @@ class PubsubBaseProtocol extends EventEmitter { this._removePeer(peer) } + /** + * Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. + * For example, a Floodsub implementation might simply publish each message to each topic for every peer + * @abstract + * @param {Array|string} topics + * @param {Array|any} messages + * @returns {undefined} + * + */ + publish (topics, messages) { + throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + } + + /** + * Overriding the implementation of subscribe should handle the appropriate algorithms for the publish/subscriber implementation. + * For example, a Floodsub implementation might simply send a message for every peer showing interest in the topics + * @abstract + * @param {Array|string} topics + * @returns {undefined} + */ + subscribe (topics) { + throw errcode('subscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + } + + /** + * Overriding the implementation of unsubscribe should handle the appropriate algorithms for the publish/subscriber implementation. + * For example, a Floodsub implementation might simply send a message for every peer revoking interest in the topics + * @abstract + * @param {Array|string} topics + * @returns {undefined} + */ + unsubscribe (topics) { + throw errcode('unsubscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + } + /** * Mounts the pubsub protocol onto the libp2p node and sends our * subscriptions to every peer conneceted diff --git a/test/pubsub.js b/test/pubsub.js index 50a0789610..5691d85607 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -17,6 +17,18 @@ class PubsubImplementation extends PubsubBaseProtocol { super('libp2p:pubsub', 'libp2p:pubsub-implementation', libp2p) } + publish (topics, messages) { + // ... + } + + subscribe (topics) { + // ... + } + + unsubscribe (topics) { + // ... + } + _processConnection (idB58Str, conn, peer) { // ... }