Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
Merge f65d38f into a7bb72b
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Jan 11, 2017
2 parents a7bb72b + f65d38f commit 7101859
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 13 deletions.
22 changes: 12 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class FloodSub extends EventEmitter {
this._dialPeer = this._dialPeer.bind(this)
}

_dialPeer (peerInfo) {
_dialPeer (peerInfo, callback) {
callback = callback || function noop () {}
const idB58Str = peerInfo.id.toB58String()
log('dialing %s', idB58Str)

Expand All @@ -72,11 +73,11 @@ class FloodSub extends EventEmitter {
return log.err(err)
}

this._onDial(peerInfo, conn)
this._onDial(peerInfo, conn, callback)
})
}

_onDial (peerInfo, conn) {
_onDial (peerInfo, conn, callback) {
const idB58Str = peerInfo.id.toB58String()

// If already had a dial to me, just add the conn
Expand All @@ -89,6 +90,7 @@ class FloodSub extends EventEmitter {

// Immediately send my own subscriptions to the newly established conn
peer.sendSubscriptions(this.subscriptions)
setImmediate(() => callback())
}

_onConnection (protocol, conn) {
Expand Down Expand Up @@ -212,13 +214,13 @@ class FloodSub extends EventEmitter {
// Dial already connected peers
const peerInfos = values(this.libp2p.peerBook.getAll())

peerInfos.forEach((peerInfo) => {
this._dialPeer(peerInfo)
})

setImmediate(() => {
this.started = true
callback()
asyncEach(peerInfos, (peerInfo, cb) => {
this._dialPeer(peerInfo, cb)
}, (err) => {
setImmediate(() => {
this.started = true
callback(err)
})
})
}

Expand Down
4 changes: 3 additions & 1 deletion src/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ class Peer {
// no connection to close
}
// end the pushable pull-stream
this.stream.end()
if (this.stream) {
this.stream.end()
}
setImmediate(() => {
this.conn = null
this.stream = null
Expand Down
52 changes: 50 additions & 2 deletions test/2-nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ describe('basics between 2 nodes', () => {
})
})

describe('long running nodes (already have state)', () => {
describe('nodes send state on connection', () => {
let nodeA
let nodeB
let fsA
Expand Down Expand Up @@ -215,7 +215,7 @@ describe('basics between 2 nodes', () => {
], done)
})

it('Existing subscriptions are sent upon peer connection', (done) => {
it('existing subscriptions are sent upon peer connection', (done) => {
nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => {
expect(err).to.not.exist
setTimeout(() => {
Expand All @@ -242,6 +242,54 @@ describe('basics between 2 nodes', () => {
], done)
})
})

describe.only('dial the pubsub protocol on mount', () => {
let nodeA
let nodeB
let fsA
let fsB

before((done) => {
series([
(cb) => createNode('/ip4/127.0.0.1/tcp/0', cb),
(cb) => createNode('/ip4/127.0.0.1/tcp/0', cb)
], (cb, nodes) => {
nodeA = nodes[0]
nodeB = nodes[1]
nodeA.dialByPeerInfo(nodeB.peerInfo, () => setTimeout(done, 1000))
})
})

after((done) => {
parallel([
(cb) => nodeA.stop(cb),
(cb) => nodeB.stop(cb)
], done)
})

it('dial on floodsub on mount', (done) => {
fsA = new FloodSub(nodeA)
fsB = new FloodSub(nodeB)

parallel([
(cb) => fsA.start(cb),
(cb) => fsB.start(cb)
], next)

function next () {
expect(fsA.peers.size).to.equal(1)
expect(fsB.peers.size).to.equal(1)
done()
}
})

it('stop both FloodSubs', (done) => {
parallel([
(cb) => fsA.stop(cb),
(cb) => fsB.stop(cb)
], done)
})
})
})

function shouldNotHappen (msg) {
Expand Down

0 comments on commit 7101859

Please sign in to comment.