Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Various floodsub issues #51

Merged
merged 19 commits into from
Nov 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
},
"dependencies": {
"async": "^2.6.0",
"bs58": "^4.0.1",
"debug": "^3.1.0",
"length-prefixed-stream": "^1.5.1",
"libp2p-crypto": "~0.10.3",
Expand Down
94 changes: 68 additions & 26 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,53 @@ class FloodSub extends EventEmitter {
this._dialPeer = this._dialPeer.bind(this)
}

_addPeer (peer) {
const id = peer.info.id.toB58String()

/*
Always use an existing peer.

What is happening here is: "If the other peer has already dialed to me, we already have
an establish link between the two, what might be missing is a
Connection specifically between me and that Peer"
*/
let existing = this.peers.get(id)
if (existing) {
log('already existing peer', id)
++existing._references
} else {
log('new peer', id)
this.peers.set(id, peer)
existing = peer
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if can be inverted for great readability (by this I mean, avoid do negative equals with things like undefined or null)

let existing = this.peers.get(id)
if (!existing) {
  log('new peer', id)
  this.peers.set(id, peer)
  existing = peer
} else {
  log('already existing peer', id)
  ++existing._references
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, positive logic is always better.

Copy link
Contributor Author

@richardschneider richardschneider Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even better

    if (existing) {
      log('already existing peer', id)
      ++existing._references
    } else {
      log('new peer', id)
      this.peers.set(id, peer)
      existing = peer
    }


return existing
}

_removePeer (peer) {
const id = peer.info.id.toB58String()

log('remove', id, peer._references)
// Only delete when no one else is referencing this peer.
if (--peer._references === 0) {
log('delete peer', id)
this.peers.delete(id)
}

return peer
}

_dialPeer (peerInfo, callback) {
callback = callback || function noop () {}
const idB58Str = peerInfo.id.toB58String()
log('dialing %s', idB58Str)

// If already have a PubSub conn, ignore
const peer = this.peers.get(idB58Str)
if (peer && peer.isConnected) {
return setImmediate(() => callback())
}

log('dialing %s', idB58Str)
this.libp2p.dial(peerInfo, multicodec, (err, conn) => {
if (err) {
log.err(err)
Expand All @@ -82,13 +118,9 @@ class FloodSub extends EventEmitter {

_onDial (peerInfo, conn, callback) {
const idB58Str = peerInfo.id.toB58String()
log('connected', idB58Str)

// If already had a dial to me, just add the conn
if (!this.peers.has(idB58Str)) {
this.peers.set(idB58Str, new Peer(peerInfo))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important logic.

What is happening here is: "If the other peer has already dialed to me, we already have an establish link between the two, what might be missing is a Connection specifically between me and that Peer"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copy and paste from original code. will add the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was moved to _addPeer


const peer = this.peers.get(idB58Str)
const peer = this._addPeer(new Peer(peerInfo))
peer.attachConnection(conn)

// Immediately send my own subscriptions to the newly established conn
Expand All @@ -104,24 +136,20 @@ class FloodSub extends EventEmitter {
}

const idB58Str = peerInfo.id.toB58String()
const peer = this._addPeer(new Peer(peerInfo))

if (!this.peers.has(idB58Str)) {
log('new peer', idB58Str)
this.peers.set(idB58Str, new Peer(peerInfo))
}

this._processConnection(idB58Str, conn)
this._processConnection(idB58Str, conn, peer)
})
}

_processConnection (idB58Str, conn) {
_processConnection (idB58Str, conn, peer) {
pull(
conn,
lp.decode(),
pull.map((data) => pb.rpc.RPC.decode(data)),
pull.drain(
(rpc) => this._onRpc(idB58Str, rpc),
(err) => this._onConnectionEnd(idB58Str, err)
(err) => this._onConnectionEnd(idB58Str, peer, err)
)
)
}
Expand All @@ -131,11 +159,12 @@ class FloodSub extends EventEmitter {
return
}

log('rpc from', idB58Str)
const subs = rpc.subscriptions
const msgs = rpc.msgs

if (msgs && msgs.length) {
this._processRpcMessages(rpc.msgs)
this._processRpcMessages(utils.normalizeInRpcMessages(rpc.msgs))
}

if (subs && subs.length) {
Expand Down Expand Up @@ -164,13 +193,14 @@ class FloodSub extends EventEmitter {
})
}

_onConnectionEnd (idB58Str, err) {
_onConnectionEnd (idB58Str, peer, err) {
// socket hang up, means the one side canceled
if (err && err.message !== 'socket hang up') {
log.err(err)
}

this.peers.delete(idB58Str)
log('connection ended', idB58Str, err ? err.message : '')
this._removePeer(peer)
}

_emitMessages (topics, messages) {
Expand All @@ -191,7 +221,7 @@ class FloodSub extends EventEmitter {
return
}

peer.sendMessages(messages)
peer.sendMessages(utils.normalizeOutRpcMessages(messages))

log('publish msgs on topics', topics, peer.info.id.toB58String())
})
Expand Down Expand Up @@ -241,11 +271,15 @@ class FloodSub extends EventEmitter {
this.libp2p.unhandle(multicodec)
this.libp2p.removeListener('peer:connect', this._dialPeer)

log('stopping')
asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => {
if (err) {
return callback(err)
}

log('stopped')
this.peers = new Map()
this.subscriptions = new Set()
this.started = false
callback()
})
Expand Down Expand Up @@ -287,7 +321,7 @@ class FloodSub extends EventEmitter {
this._emitMessages(topics, msgObjects)

// send to all the other peers
this._forwardMessages(topics, messages.map(buildMessage))
this._forwardMessages(topics, msgObjects)
}

/**
Expand All @@ -303,14 +337,18 @@ class FloodSub extends EventEmitter {

topics.forEach((topic) => this.subscriptions.add(topic))

this.peers.forEach((peer) => checkIfReady(peer))
this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer))
// make sure that FloodSub is already mounted
function checkIfReady (peer) {
function sendSubscriptionsOnceReady (peer) {
if (peer && peer.isWritable) {
peer.sendSubscriptions(topics)
} else {
setImmediate(checkIfReady.bind(peer))
return peer.sendSubscriptions(topics)
}
const onConnection = () => {
peer.removeListener('connection', onConnection)
sendSubscriptionsOnceReady(peer)
}
peer.on('connection', onConnection)
peer.once('close', () => peer.removeListener('connection', onConnection))
}
}

Expand All @@ -321,7 +359,11 @@ class FloodSub extends EventEmitter {
* @returns {undefined}
*/
unsubscribe (topics) {
assert(this.started, 'FloodSub is not started')
// Avoid race conditions, by quietly ignoring unsub when shutdown.
if (!this.started) {
return
}

topics = ensureArray(topics)

topics.forEach((topic) => this.subscriptions.delete(topic))
Expand Down
4 changes: 2 additions & 2 deletions src/message/rpc.proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ message RPC {
}

message Message {
optional string from = 1;
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4; // CID of topic descriptor object
repeated string topicIDs = 4;
}
}`
26 changes: 20 additions & 6 deletions src/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ const lp = require('pull-length-prefixed')
const Pushable = require('pull-pushable')
const pull = require('pull-stream')
const setImmediate = require('async/setImmediate')
const EventEmitter = require('events')

const rpc = require('./message').rpc.RPC

/**
* The known state of a connected peer.
*/
class Peer {
class Peer extends EventEmitter {
/**
* @param {PeerInfo} info
*/
constructor (info) {
super()

/**
* @type {PeerInfo}
*/
Expand All @@ -31,6 +34,8 @@ class Peer {
* @type {Pushable}
*/
this.stream = null

this._references = 1
}

/**
Expand Down Expand Up @@ -80,8 +85,15 @@ class Peer {
pull(
this.stream,
lp.encode(),
conn
conn,
pull.onEnd(() => {
this.conn = null
this.stream = null
this.emit('close')
})
)

this.emit('connection')
}

_sendRawSubscriptions (topics, subscribe) {
Expand Down Expand Up @@ -155,16 +167,18 @@ class Peer {
* @returns {undefined}
*/
close (callback) {
if (!this.conn || !this.stream) {
// no connection to close
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it never invoked the callback, read the rest of the code.

This is still WIP, reviews are fine but DO NOT merge

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it back. This code does nothing.

// end the pushable pull-stream
// Force removal of peer
this._references = 1

// End the pushable
if (this.stream) {
this.stream.end()
}

setImmediate(() => {
this.conn = null
this.stream = null
this.emit('close')
callback()
})
}
Expand Down
27 changes: 27 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const crypto = require('libp2p-crypto')
const bs58 = require('bs58')

exports = module.exports

Expand Down Expand Up @@ -66,3 +67,29 @@ exports.ensureArray = (maybeArray) => {

return maybeArray
}

exports.normalizeInRpcMessages = (messages) => {
if (!messages) {
return messages
}
return messages.map((msg) => {
const m = Object.assign({}, msg)
if (Buffer.isBuffer(msg.from)) {
m.from = bs58.encode(msg.from)
}
return m
})
}

exports.normalizeOutRpcMessages = (messages) => {
if (!messages) {
return messages
}
return messages.map((msg) => {
const m = Object.assign({}, msg)
if (typeof msg.from === 'string' || msg.from instanceof String) {
m.from = bs58.decode(msg.from)
}
return m
})
}
30 changes: 27 additions & 3 deletions test/2-nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ describe('basics between 2 nodes', () => {
_times(10, () => fsB.publish('Z', new Buffer('banana')))
})

it('Publish 10 msg to a topic:Z in nodeB as array', (done) => {
let counter = 0

fsB.once('Z', shouldNotHappen)

fsA.on('Z', receivedMsg)

function receivedMsg (msg) {
expect(msg.data.toString()).to.equal('banana')
expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String())
expect(Buffer.isBuffer(msg.seqno)).to.be.true()
expect(msg.topicIDs).to.be.eql(['Z'])

if (++counter === 10) {
fsA.removeListener('Z', receivedMsg)
done()
}
}

let msgs = []
_times(10, () => msgs.push(new Buffer('banana')))
fsB.publish('Z', msgs)
})

it('Unsubscribe from topic:Z in nodeA', (done) => {
fsA.unsubscribe('Z')
expect(fsA.subscriptions.size).to.equal(0)
Expand Down Expand Up @@ -291,11 +315,11 @@ describe('basics between 2 nodes', () => {
nodeA.dial(nodeB.peerInfo, (err) => {
expect(err).to.not.exist()
setTimeout(() => {
expect(fsA.peers.size).to.equal(1)
expect(fsB.peers.size).to.equal(1)
expect(first(fsA.peers)._references).to.equal(2)
expect(first(fsB.peers)._references).to.equal(2)

fsA.stop(() => setTimeout(() => {
expect(fsB.peers.size).to.equal(0)
expect(first(fsB.peers)._references).to.equal(1)
done()
}, 250))
}, 1000)
Expand Down
Loading