Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
fix: revert to serialized pubsub operations (#319)
Browse files Browse the repository at this point in the history
* fix: re-serialise operations that were serial before refactor

During the refactor I took the opportunity to parallelise some pubsub operations that didn't explicitly depend on each other. This worked perfectly while testing locally, but on CI it is a different story. I found that tests for js-ipfs-api (which are run against go-ipfs) failed for seemingly random reasons.

After much investigation I finally tried re-serialising the operations I had refactored to be parallel and the tests started to pass. It seems that the pubsub implementation in go-ipfs has some issues with concurrency.

I also found two intermittent issues with `swarm.connect` in go-ipfs (seen significantly more often on CI):

1. Issuing two calls to this function from the same node might end up in the second not actually creating a connection and no error message reported to the user
2. Even after the response to the user it takes a few milliseconds for a connection to actually be connected

I intend to open issues on go-ipfs and write examples demonstrating these problems.

I created a utility function `connect` to temporarily mitigate these issues in one place. The utility serialises calls from a single node to another and pauses after each to allow the connection to properly establish.

License: MIT
Signed-off-by: Alan Shaw <alan@tableflip.io>

* fix: revert timout increase

License: MIT
Signed-off-by: Alan Shaw <alan@tableflip.io>

* Update swarm.js
  • Loading branch information
alanshaw authored and daviddias committed Jul 3, 2018
1 parent 7660e0f commit 4b5534e
Show file tree
Hide file tree
Showing 18 changed files with 98 additions and 75 deletions.
3 changes: 2 additions & 1 deletion js/src/bitswap/unwant.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const waterfall = require('async/waterfall')
const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { waitForWantlistKey } = require('./utils')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand Down Expand Up @@ -33,7 +34,7 @@ module.exports = (createCommon, options) => {
// Add key to the wantlist for ipfsB
ipfsB.block.get(key, () => {})

ipfsA.swarm.connect(ipfsB.peerId.addresses[0], done)
connect(ipfsA, ipfsB.peerId.addresses[0], done)
})
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/bitswap/wantlist.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const waterfall = require('async/waterfall')
const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { waitForWantlistKey } = require('./utils')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand Down Expand Up @@ -33,7 +34,7 @@ module.exports = (createCommon, options) => {
// Add key to the wantlist for ipfsB
ipfsB.block.get(key, () => {})

ipfsA.swarm.connect(ipfsB.peerId.addresses[0], done)
connect(ipfsA, ipfsB.peerId.addresses[0], done)
})
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/dht/findpeer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -29,7 +30,7 @@ module.exports = (createCommon, options) => {
nodeA = nodes[0]
nodeB = nodes[1]

nodeB.swarm.connect(nodeA.peerId.addresses[0], done)
connect(nodeB, nodeA.peerId.addresses[0], done)
})
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/dht/findprovs.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const waterfall = require('async/waterfall')
const CID = require('cids')
const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -29,7 +30,7 @@ module.exports = (createCommon, options) => {
nodeA = nodes[0]
nodeB = nodes[1]

nodeB.swarm.connect(nodeA.peerId.addresses[0], done)
connect(nodeB, nodeA.peerId.addresses[0], done)
})
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/dht/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const waterfall = require('async/waterfall')
const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -30,7 +31,7 @@ module.exports = (createCommon, options) => {
nodeA = nodes[0]
nodeB = nodes[1]

nodeA.swarm.connect(nodeB.peerId.addresses[0], done)
connect(nodeA, nodeB.peerId.addresses[0], done)
})
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/dht/provide.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const CID = require('cids')
const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -26,7 +27,7 @@ module.exports = (createCommon, options) => {
spawnNodesWithId(2, factory, (err, nodes) => {
expect(err).to.not.exist()
ipfs = nodes[0]
ipfs.swarm.connect(nodes[1].peerId.addresses[0], done)
connect(ipfs, nodes[1].peerId.addresses[0], done)
})
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/dht/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -29,7 +30,7 @@ module.exports = (createCommon, options) => {
nodeA = nodes[0]
nodeB = nodes[1]

nodeB.swarm.connect(nodeA.peerId.addresses[0], done)
connect(nodeB, nodeA.peerId.addresses[0], done)
})
})
})
Expand Down
4 changes: 2 additions & 2 deletions js/src/key/list.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-env mocha */
'use strict'

const times = require('async/times')
const timesSeries = require('async/timesSeries')
const hat = require('hat')
const { getDescribe, getIt, expect } = require('../utils/mocha')

Expand Down Expand Up @@ -33,7 +33,7 @@ module.exports = (createCommon, options) => {
it('should list all the keys', function (done) {
this.timeout(60 * 1000)

times(3, (n, cb) => {
timesSeries(3, (n, cb) => {
ipfs.key.gen(hat(), { type: 'rsa', size: 2048 }, cb)
}, (err, keys) => {
expect(err).to.not.exist()
Expand Down
3 changes: 2 additions & 1 deletion js/src/ping/ping-pull-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const series = require('async/series')
const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { expectIsPingResponse, isPong } = require('./utils')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand Down Expand Up @@ -33,7 +34,7 @@ module.exports = (createCommon, options) => {
cb()
})
},
(cb) => ipfsA.swarm.connect(ipfsB.peerId.addresses[0], cb)
(cb) => connect(ipfsA, ipfsB.peerId.addresses[0], cb)
], done)
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/ping/ping-readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const series = require('async/series')
const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { expectIsPingResponse, isPong } = require('./utils')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand Down Expand Up @@ -34,7 +35,7 @@ module.exports = (createCommon, options) => {
cb()
})
},
(cb) => ipfsA.swarm.connect(ipfsB.peerId.addresses[0], cb)
(cb) => connect(ipfsA, ipfsB.peerId.addresses[0], cb)
], done)
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/ping/ping.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const series = require('async/series')
const { spawnNodesWithId } = require('../utils/spawn')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { expectIsPingResponse, isPong } = require('./utils')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand Down Expand Up @@ -32,7 +33,7 @@ module.exports = (createCommon, options) => {
cb()
})
},
(cb) => ipfsA.swarm.connect(ipfsB.peerId.addresses[0], cb)
(cb) => connect(ipfsA, ipfsB.peerId.addresses[0], cb)
], done)
})
})
Expand Down
3 changes: 2 additions & 1 deletion js/src/pubsub/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
'use strict'

const each = require('async/each')
const eachSeries = require('async/eachSeries')
const { getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')

Expand Down Expand Up @@ -68,7 +69,7 @@ module.exports = (createCommon, options) => {
handler () {}
}]

each(topics, (t, cb) => {
eachSeries(topics, (t, cb) => {
ipfs.pubsub.subscribe(t.name, t.handler, cb)
}, (err) => {
expect(err).to.not.exist()
Expand Down
44 changes: 20 additions & 24 deletions js/src/pubsub/peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
'use strict'

const parallel = require('async/parallel')
const auto = require('async/auto')
const series = require('async/series')
const { spawnNodesWithId } = require('../utils/spawn')
const { waitForPeers, getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { connect } = require('../utils/swarm')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand Down Expand Up @@ -46,9 +47,8 @@ module.exports = (createCommon, options) => {
const ipfs3Addr = ipfs3.peerId.addresses.find((a) => a.includes('127.0.0.1'))

parallel([
(cb) => ipfs1.swarm.connect(ipfs2Addr, cb),
(cb) => ipfs1.swarm.connect(ipfs3Addr, cb),
(cb) => ipfs2.swarm.connect(ipfs3Addr, cb)
(cb) => connect(ipfs1, [ipfs2Addr, ipfs3Addr], cb),
(cb) => connect(ipfs2, ipfs3Addr, cb)
], done)
})

Expand All @@ -73,7 +73,7 @@ module.exports = (createCommon, options) => {
const topic = getTopic()
const topicOther = topic + 'different topic'

parallel([
series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topicOther, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topicOther, sub3, cb)
Expand Down Expand Up @@ -101,14 +101,12 @@ module.exports = (createCommon, options) => {
const sub3 = (msg) => {}
const topic = getTopic()

auto({
sub1: (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
sub2: (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
sub3: (cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
peers: ['sub1', 'sub2', 'sub3', (_, cb) => {
waitForPeers(ipfs1, topic, [ipfs2.peerId.id], cb)
}]
}, (err) => {
series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
(cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000, cb)
], (err) => {
expect(err).to.not.exist()

parallel([
Expand All @@ -125,17 +123,15 @@ module.exports = (createCommon, options) => {
const sub3 = (msg) => {}
const topic = getTopic()

auto({
sub1: (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
sub2: (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
sub3: (cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
peers: ['sub1', 'sub2', 'sub3', (_, cb) => {
waitForPeers(ipfs1, topic, [
ipfs2.peerId.id,
ipfs3.peerId.id
], cb)
}]
}, (err) => {
series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
(cb) => waitForPeers(ipfs1, topic, [
ipfs2.peerId.id,
ipfs3.peerId.id
], 30000, cb)
], (err) => {
expect(err).to.not.exist()

parallel([
Expand Down
4 changes: 2 additions & 2 deletions js/src/pubsub/publish.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-env mocha */
'use strict'

const times = require('async/times')
const timesSeries = require('async/timesSeries')
const hat = require('hat')
const { getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
Expand Down Expand Up @@ -50,7 +50,7 @@ module.exports = (createCommon, options) => {
const count = 10
const topic = getTopic()

times(count, (_, cb) => {
timesSeries(count, (_, cb) => {
ipfs.pubsub.publish(topic, Buffer.from(hat()), cb)
}, done)
})
Expand Down

0 comments on commit 4b5534e

Please sign in to comment.