diff --git a/README.md b/README.md index 067a220..94467f6 100644 --- a/README.md +++ b/README.md @@ -35,12 +35,17 @@ const FloodSub = require('libp2p-floodsub') const fsub = new FloodSub(libp2pNodeInstance) -fsub.on('fruit', (data) => { - console.log(data) +fsub.start((err) => { + if (err) { + console.log('Upsy', err) + } + fsub.on('fruit', (data) => { + console.log(data) + }) + fsub.subscribe('fruit') + + fsub.publish('fruit', new Buffer('banana')) }) -fsub.subscribe('fruit') - -fsub.publish('fruit', new Buffer('banana')) ``` ## API diff --git a/src/index.js b/src/index.js index df78e3c..612eb9d 100644 --- a/src/index.js +++ b/src/index.js @@ -5,6 +5,8 @@ const TimeCache = require('time-cache') const values = require('lodash.values') const pull = require('pull-stream') const lp = require('pull-length-prefixed') +const assert = require('assert') +const asyncEach = require('async/each') const Peer = require('./peer') const utils = require('./utils') @@ -16,20 +18,20 @@ const multicodec = config.multicodec const ensureArray = utils.ensureArray /** - * PubSubGossip, also known as pubsub-flood or just dumbsub, - * this implementation of pubsub focused on delivering an API - * for Publish/Subscribe, but with no CastTree Forming + * FloodSub (aka dumbsub is an implementation of pubsub focused on + * delivering an API for Publish/Subscribe, but with no CastTree Forming * (it just floods the network). */ class FloodSub extends EventEmitter { /** * @param {Object} libp2p - * @returns {PubSubGossip} + * @returns {FloodSub} */ constructor (libp2p) { super() this.libp2p = libp2p + this.started = false /** * Time based cache for sequence numbers. @@ -51,18 +53,8 @@ class FloodSub extends EventEmitter { */ this.subscriptions = new Set() - const onConnection = this._onConnection.bind(this) - this.libp2p.handle(multicodec, onConnection) - - // Speed up any new peer that comes in my way - this.libp2p.swarm.on('peer-mux-established', (p) => { - this._dialPeer(p) - }) - - // Dial already connected peers - values(this.libp2p.peerBook.getAll()).forEach((p) => { - this._dialPeer(p) - }) + this._onConnection = this._onConnection.bind(this) + this._dialPeer = this._dialPeer.bind(this) } _dialPeer (peerInfo) { @@ -199,6 +191,62 @@ class FloodSub extends EventEmitter { }) } + /** + * Mounts the floodsub protocol onto the libp2p node and sends our + * subscriptions to every peer conneceted + * + * @param {Function} callback + * @returns {undefined} + * + */ + start (callback) { + if (this.started) { + return setImmediate(() => callback(new Error('already started'))) + } + + this.libp2p.handle(multicodec, this._onConnection) + + // Speed up any new peer that comes in my way + this.libp2p.swarm.on('peer-mux-established', this._dialPeer) + + // Dial already connected peers + const peerInfos = values(this.libp2p.peerBook.getAll()) + + peerInfos.forEach((peerInfo) => { + this._dialPeer(peerInfo) + }) + + setImmediate(() => { + this.started = true + callback() + }) + } + + /** + * Unmounts the floodsub protocol and shuts down every connection + * + * @param {Function} callback + * @returns {undefined} + * + */ + stop (callback) { + if (!this.started) { + return setImmediate(() => callback(new Error('not started yet'))) + } + + this.libp2p.unhandle(multicodec) + this.libp2p.swarm.removeListener('peer-mux-established', this._dialPeer) + + asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => { + if (err) { + return callback(err) + } + this.peers = new Map() + this.started = false + callback() + }) + } + /** * Publish messages to the given topics. * @@ -208,6 +256,8 @@ class FloodSub extends EventEmitter { * */ publish (topics, messages) { + assert(this.started, 'FloodSub is not started') + log('publish', topics, messages) topics = ensureArray(topics) @@ -243,6 +293,8 @@ class FloodSub extends EventEmitter { * @returns {undefined} */ subscribe (topics) { + assert(this.started, 'FloodSub is not started') + topics = ensureArray(topics) topics.forEach((topic) => { @@ -261,6 +313,7 @@ class FloodSub extends EventEmitter { * @returns {undefined} */ unsubscribe (topics) { + assert(this.started, 'FloodSub is not started') topics = ensureArray(topics) topics.forEach((topic) => { diff --git a/src/peer.js b/src/peer.js index 16880e4..df8fcfd 100644 --- a/src/peer.js +++ b/src/peer.js @@ -146,6 +146,25 @@ class Peer { } }) } + + /** + * Closes the open connection to peer + * + * @param {Function} callback + * @returns {undefined} + */ + close (callback) { + if (!this.conn || !this.stream) { + // no connection to close + } + // end the pushable pull-stream + this.stream.end() + setImmediate(() => { + this.conn = null + this.stream = null + callback() + }) + } } module.exports = Peer diff --git a/test/2-nodes.js b/test/2-nodes.js index e8e5938..1a8d7f4 100644 --- a/test/2-nodes.js +++ b/test/2-nodes.js @@ -7,19 +7,19 @@ const parallel = require('async/parallel') const series = require('async/series') const _times = require('lodash.times') -const PSG = require('../src') +const FloodSub = require('../src') const utils = require('./utils') const first = utils.first const createNode = utils.createNode const expectSet = utils.expectSet -describe('basics', () => { - let nodeA - let nodeB - let psA - let psB - +describe('basics between 2 nodes', () => { describe('fresh nodes', () => { + let nodeA + let nodeB + let fsA + let fsB + before((done) => { series([ (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), @@ -42,124 +42,143 @@ describe('basics', () => { }) it('Mount the pubsub protocol', (done) => { - psA = new PSG(nodeA) - psB = new PSG(nodeB) + fsA = new FloodSub(nodeA) + fsB = new FloodSub(nodeB) setTimeout(() => { - expect(psA.peers.size).to.be.eql(0) - expect(psA.subscriptions.size).to.eql(0) - expect(psB.peers.size).to.be.eql(0) - expect(psB.subscriptions.size).to.eql(0) + expect(fsA.peers.size).to.be.eql(0) + expect(fsA.subscriptions.size).to.eql(0) + expect(fsB.peers.size).to.be.eql(0) + expect(fsB.subscriptions.size).to.eql(0) done() }, 50) }) + it('start both FloodSubs', (done) => { + parallel([ + (cb) => fsA.start(cb), + (cb) => fsB.start(cb) + ], done) + }) + it('Dial from nodeA to nodeB', (done) => { series([ (cb) => nodeA.dialByPeerInfo(nodeB.peerInfo, cb), (cb) => setTimeout(() => { - expect(psA.peers.size).to.equal(1) - expect(psB.peers.size).to.equal(1) + expect(fsA.peers.size).to.equal(1) + expect(fsB.peers.size).to.equal(1) cb() }, 250) ], done) }) it('Subscribe to a topic:Z in nodeA', (done) => { - psA.subscribe('Z') + fsA.subscribe('Z') setTimeout(() => { - expectSet(psA.subscriptions, ['Z']) - expect(psB.peers.size).to.equal(1) - expectSet(first(psB.peers).topics, ['Z']) + expectSet(fsA.subscriptions, ['Z']) + expect(fsB.peers.size).to.equal(1) + expectSet(first(fsB.peers).topics, ['Z']) done() }, 100) }) it('Publish to a topic:Z in nodeA', (done) => { - psB.once('Z', shouldNotHappen) + fsB.once('Z', shouldNotHappen) function shouldNotHappen (msg) { expect.fail() } - psA.once('Z', (msg) => { + fsA.once('Z', (msg) => { expect(msg.data.toString()).to.equal('hey') - psB.removeListener('Z', shouldNotHappen) + fsB.removeListener('Z', shouldNotHappen) done() }) - psB.once('Z', shouldNotHappen) + fsB.once('Z', shouldNotHappen) - psA.publish('Z', new Buffer('hey')) + fsA.publish('Z', new Buffer('hey')) }) it('Publish to a topic:Z in nodeB', (done) => { - psB.once('Z', shouldNotHappen) + fsB.once('Z', shouldNotHappen) - psA.once('Z', (msg) => { - psA.once('Z', shouldNotHappen) + fsA.once('Z', (msg) => { + fsA.once('Z', shouldNotHappen) expect(msg.data.toString()).to.equal('banana') setTimeout(() => { - psA.removeListener('Z', shouldNotHappen) - psB.removeListener('Z', shouldNotHappen) + fsA.removeListener('Z', shouldNotHappen) + fsB.removeListener('Z', shouldNotHappen) done() }, 100) }) - psB.once('Z', shouldNotHappen) + fsB.once('Z', shouldNotHappen) - psB.publish('Z', new Buffer('banana')) + fsB.publish('Z', new Buffer('banana')) }) it('Publish 10 msg to a topic:Z in nodeB', (done) => { let counter = 0 - psB.once('Z', shouldNotHappen) + fsB.once('Z', shouldNotHappen) - psA.on('Z', receivedMsg) + fsA.on('Z', receivedMsg) function receivedMsg (msg) { expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(psB.libp2p.peerInfo.id.toB58String()) + expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String()) expect(Buffer.isBuffer(msg.seqno)).to.be.true expect(msg.topicCIDs).to.be.eql(['Z']) if (++counter === 10) { - psA.removeListener('Z', receivedMsg) + fsA.removeListener('Z', receivedMsg) done() } } _times(10, () => { - psB.publish('Z', new Buffer('banana')) + fsB.publish('Z', new Buffer('banana')) }) }) it('Unsubscribe from topic:Z in nodeA', (done) => { - psA.unsubscribe('Z') - expect(psA.subscriptions.size).to.equal(0) + fsA.unsubscribe('Z') + expect(fsA.subscriptions.size).to.equal(0) setTimeout(() => { - expect(psB.peers.size).to.equal(1) - expectSet(first(psB.peers).topics, []) + expect(fsB.peers.size).to.equal(1) + expectSet(first(fsB.peers).topics, []) done() }, 100) }) it('Publish to a topic:Z in nodeA nodeB', (done) => { - psA.once('Z', shouldNotHappen) - psB.once('Z', shouldNotHappen) + fsA.once('Z', shouldNotHappen) + fsB.once('Z', shouldNotHappen) setTimeout(() => { - psA.removeListener('Z', shouldNotHappen) - psB.removeListener('Z', shouldNotHappen) + fsA.removeListener('Z', shouldNotHappen) + fsB.removeListener('Z', shouldNotHappen) done() }, 100) - psB.publish('Z', new Buffer('banana')) - psA.publish('Z', new Buffer('banana')) + fsB.publish('Z', new Buffer('banana')) + fsA.publish('Z', new Buffer('banana')) + }) + + it('stop both FloodSubs', (done) => { + parallel([ + (cb) => fsA.stop(cb), + (cb) => fsB.stop(cb) + ], done) }) }) describe('long running nodes (already have state)', () => { + let nodeA + let nodeB + let fsA + let fsB + before((done) => { series([ (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), @@ -168,20 +187,24 @@ describe('basics', () => { nodeA = nodes[0] nodeB = nodes[1] - psA = new PSG(nodeA) - psB = new PSG(nodeB) + fsA = new FloodSub(nodeA) + fsB = new FloodSub(nodeB) - psA.subscribe('Za') - psB.subscribe('Zb') + parallel([ + (cb) => fsA.start(cb), + (cb) => fsB.start(cb) + ], next) - setTimeout(() => { - expect(psA.peers.size).to.equal(0) - expectSet(psA.subscriptions, ['Za']) - expect(psB.peers.size).to.equal(0) - expectSet(psB.subscriptions, ['Zb']) + function next () { + fsA.subscribe('Za') + fsB.subscribe('Zb') + expect(fsA.peers.size).to.equal(0) + expectSet(fsA.subscriptions, ['Za']) + expect(fsB.peers.size).to.equal(0) + expectSet(fsB.subscriptions, ['Zb']) done() - }, 50) + } }) }) @@ -196,21 +219,28 @@ describe('basics', () => { nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { expect(err).to.not.exist setTimeout(() => { - expect(psA.peers.size).to.equal(1) - expect(psB.peers.size).to.equal(1) + expect(fsA.peers.size).to.equal(1) + expect(fsB.peers.size).to.equal(1) - expectSet(psA.subscriptions, ['Za']) - expect(psB.peers.size).to.equal(1) - expectSet(first(psB.peers).topics, ['Za']) + expectSet(fsA.subscriptions, ['Za']) + expect(fsB.peers.size).to.equal(1) + expectSet(first(fsB.peers).topics, ['Za']) - expectSet(psB.subscriptions, ['Zb']) - expect(psA.peers.size).to.equal(1) - expectSet(first(psA.peers).topics, ['Zb']) + expectSet(fsB.subscriptions, ['Zb']) + expect(fsA.peers.size).to.equal(1) + expectSet(first(fsA.peers).topics, ['Zb']) done() }, 250) }) }) + + it('stop both FloodSubs', (done) => { + parallel([ + (cb) => fsA.stop(cb), + (cb) => fsB.stop(cb) + ], done) + }) }) }) diff --git a/test/multiple-nodes.js b/test/multiple-nodes.js index e139603..6f7ed07 100644 --- a/test/multiple-nodes.js +++ b/test/multiple-nodes.js @@ -5,13 +5,13 @@ const expect = require('chai').expect const parallel = require('async/parallel') -const PSG = require('../src') +const FloodSub = require('../src') const utils = require('./utils') const first = utils.first const createNode = utils.createNode const expectSet = utils.expectSet -describe('multiple nodes', () => { +describe('multiple nodes (more than 2)', () => { describe('every peer subscribes to the topic', () => { describe('line', () => { // line @@ -330,10 +330,15 @@ function spawnPubSubNode (callback) { if (err) { return callback(err) } - - callback(null, { - libp2p: node, - ps: new PSG(node) + const ps = new FloodSub(node) + ps.start((err) => { + if (err) { + return callback(err) + } + callback(null, { + libp2p: node, + ps: ps + }) }) }) }