Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Various floodsub issues #51

Merged
merged 19 commits into from
Nov 22, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ class FloodSub extends EventEmitter {
_dialPeer (peerInfo, callback) {
callback = callback || function noop () {}
const idB58Str = peerInfo.id.toB58String()
log('dialing %s', idB58Str)

// If already have a PubSub conn, ignore
const peer = this.peers.get(idB58Str)
if (peer && peer.isConnected) {
return setImmediate(() => callback())
}

log('dialing %s', idB58Str)
this.libp2p.dial(peerInfo, multicodec, (err, conn) => {
if (err) {
log.err(err)
Expand All @@ -82,6 +82,7 @@ class FloodSub extends EventEmitter {

_onDial (peerInfo, conn, callback) {
const idB58Str = peerInfo.id.toB58String()
log('connected', idB58Str)

// If already had a dial to me, just add the conn
if (!this.peers.has(idB58Str)) {
Expand Down Expand Up @@ -121,7 +122,7 @@ class FloodSub extends EventEmitter {
pull.map((data) => pb.rpc.RPC.decode(data)),
pull.drain(
(rpc) => this._onRpc(idB58Str, rpc),
(err) => this._onConnectionEnd(idB58Str, err)
(err) => this._onConnectionEnd(idB58Str, conn, err)
)
)
}
Expand All @@ -131,6 +132,7 @@ class FloodSub extends EventEmitter {
return
}

log('rpc from', idB58Str, rpc)
const subs = rpc.subscriptions
const msgs = rpc.msgs

Expand All @@ -157,20 +159,25 @@ class FloodSub extends EventEmitter {
this.cache.put(seqno)

// 2. emit to self
this._emitMessages(msg.topicIDs, [msg])
this._emitMessages(msg.topicCIDs, [msg])

// 3. propagate msg to others
this._forwardMessages(msg.topicIDs, [msg])
this._forwardMessages(msg.topicCIDs, [msg])
})
}

_onConnectionEnd (idB58Str, err) {
_onConnectionEnd (idB58Str, conn, err) {
// socket hang up, means the one side canceled
if (err && err.message !== 'socket hang up') {
log.err(err)
}

this.peers.delete(idB58Str)
log('connection ended', idB58Str)
const peer = this.peers.get(idB58Str)
if (peer && peer.conn === conn) {
log('delete peer', idB58Str)
this.peers.delete(idB58Str)
}
}

_emitMessages (topics, messages) {
Expand All @@ -180,7 +187,14 @@ class FloodSub extends EventEmitter {
}

messages.forEach((message) => {
this.emit(topic, message)
// Convert RPC message to API message
const m = {
from: message.from,
data: message.data,
seqno: message.seqno,
topicIDs: message.topicCIDs
}
this.emit(topic, m)
})
})
}
Expand Down Expand Up @@ -277,7 +291,7 @@ class FloodSub extends EventEmitter {
from: from,
data: msg,
seqno: new Buffer(seqno),
topicIDs: topics
topicCIDs: topics
}
}

Expand All @@ -287,7 +301,7 @@ class FloodSub extends EventEmitter {
this._emitMessages(topics, msgObjects)

// send to all the other peers
this._forwardMessages(topics, messages.map(buildMessage))
this._forwardMessages(topics, msgObjects)
}

/**
Expand Down Expand Up @@ -321,7 +335,11 @@ class FloodSub extends EventEmitter {
* @returns {undefined}
*/
unsubscribe (topics) {
assert(this.started, 'FloodSub is not started')
// Avoid race conditions, by quietly ignoring unsub when shutdown.
if (!this.started) {
return
}

topics = ensureArray(topics)

topics.forEach((topic) => this.subscriptions.delete(topic))
Expand Down
2 changes: 1 addition & 1 deletion src/message/rpc.proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ message RPC {
optional string from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4; // CID of topic descriptor object
repeated string topicCIDs = 4; // CID of topic descriptor object
}
}`
3 changes: 0 additions & 3 deletions src/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ class Peer {
* @returns {undefined}
*/
close (callback) {
if (!this.conn || !this.stream) {
// no connection to close
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it never invoked the callback, read the rest of the code.

This is still WIP, reviews are fine but DO NOT merge

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it back. This code does nothing.

// end the pushable pull-stream
if (this.stream) {
this.stream.end()
Expand Down