Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
fix: reduce the number of concurrent requests in browser (#505)
Browse files Browse the repository at this point in the history
In the browser we can only make 5 concurrent requests to the same origin.

This also refactors the pubsub tests to use async/await and be less flakey.

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
alanshaw committed Aug 27, 2019
1 parent 3da6cd0 commit 7596634
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 431 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"ipld-dag-pb": "~0.17.3",
"is-ipfs": "~0.6.1",
"is-plain-object": "^3.0.0",
"it-pushable": "^1.2.1",
"libp2p-crypto": "~0.16.0",
"multiaddr": "^6.0.0",
"multibase": "~0.6.0",
Expand All @@ -63,6 +64,7 @@
"pull-stream": "^3.6.11",
"pump": "^3.0.0",
"readable-stream": "^3.1.1",
"streaming-iterables": "^4.1.0",
"through2": "^3.0.0"
},
"devDependencies": {
Expand Down
60 changes: 26 additions & 34 deletions src/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* eslint-env mocha */
'use strict'

const eachSeries = require('async/eachSeries')
const { getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const delay = require('../utils/delay')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -14,6 +14,7 @@ module.exports = (createCommon, options) => {
this.timeout(80 * 1000)

let ipfs
let subscribedTopics = []

before(function (done) {
// CI takes longer to instantiate the daemon, so we need to increase the
Expand All @@ -30,33 +31,32 @@ module.exports = (createCommon, options) => {
})
})

afterEach(async () => {
for (let i = 0; i < subscribedTopics.length; i++) {
await ipfs.pubsub.unsubscribe(subscribedTopics[i])
}
subscribedTopics = []
await delay(100)
})

after((done) => common.teardown(done))

it('should return an empty list when no topics are subscribed', (done) => {
ipfs.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics.length).to.equal(0)
done()
})
it('should return an empty list when no topics are subscribed', async () => {
const topics = await ipfs.pubsub.ls()
expect(topics.length).to.equal(0)
})

it('should return a list with 1 subscribed topic', (done) => {
const sub1 = (msg) => {}
it('should return a list with 1 subscribed topic', async () => {
const sub1 = () => {}
const topic = getTopic()
subscribedTopics = [topic]

ipfs.pubsub.subscribe(topic, sub1, (err) => {
expect(err).to.not.exist()

ipfs.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.eql([topic])

ipfs.pubsub.unsubscribe(topic, sub1, done)
})
})
await ipfs.pubsub.subscribe(topic, sub1)
const topics = await ipfs.pubsub.ls()
expect(topics).to.be.eql([topic])
})

it('should return a list with 3 subscribed topics', (done) => {
it('should return a list with 3 subscribed topics', async () => {
const topics = [{
name: 'one',
handler () {}
Expand All @@ -68,22 +68,14 @@ module.exports = (createCommon, options) => {
handler () {}
}]

eachSeries(topics, (t, cb) => {
ipfs.pubsub.subscribe(t.name, t.handler, cb)
}, (err) => {
expect(err).to.not.exist()
subscribedTopics = topics.map(t => t.name)

ipfs.pubsub.ls((err, list) => {
expect(err).to.not.exist()
for (let i = 0; i < topics.length; i++) {
await ipfs.pubsub.subscribe(topics[i].name, topics[i].handler)
}

expect(list.sort())
.to.eql(topics.map((t) => t.name).sort())

eachSeries(topics, (t, cb) => {
ipfs.pubsub.unsubscribe(t.name, t.handler, cb)
}, done)
})
})
const list = await ipfs.pubsub.ls()
expect(list.sort()).to.eql(topics.map(t => t.name).sort())
})
})
}
121 changes: 51 additions & 70 deletions src/pubsub/peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
'use strict'

const parallel = require('async/parallel')
const series = require('async/series')
const { spawnNodesWithId } = require('../utils/spawn')
const { waitForPeers, getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { connect } = require('../utils/swarm')
const delay = require('../utils/delay')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -19,6 +19,7 @@ module.exports = (createCommon, options) => {
let ipfs1
let ipfs2
let ipfs3
let subscribedTopics = []

before(function (done) {
// CI takes longer to instantiate the daemon, so we need to increase the
Expand All @@ -40,6 +41,16 @@ module.exports = (createCommon, options) => {
})
})

afterEach(async () => {
const nodes = [ipfs1, ipfs2, ipfs3]
for (let i = 0; i < subscribedTopics.length; i++) {
const topic = subscribedTopics[i]
await Promise.all(nodes.map(ipfs => ipfs.pubsub.unsubscribe(topic)))
}
subscribedTopics = []
await delay(100)
})

after((done) => common.teardown(done))

before((done) => {
Expand All @@ -52,94 +63,64 @@ module.exports = (createCommon, options) => {
], done)
})

it('should not error when not subscribed to a topic', (done) => {
it('should not error when not subscribed to a topic', async () => {
const topic = getTopic()
ipfs1.pubsub.peers(topic, (err, peers) => {
expect(err).to.not.exist()
// Should be empty() but as mentioned below go-ipfs returns more than it should
// expect(peers).to.be.empty()

done()
})
const peers = await ipfs1.pubsub.peers(topic)
expect(peers).to.exist()
// Should be empty() but as mentioned below go-ipfs returns more than it should
// expect(peers).to.be.empty()
})

it('should not return extra peers', (done) => {
it('should not return extra peers', async () => {
// Currently go-ipfs returns peers that have not been
// subscribed to the topic. Enable when go-ipfs has been fixed
const sub1 = (msg) => {}
const sub2 = (msg) => {}
const sub3 = (msg) => {}
const sub1 = () => {}
const sub2 = () => {}
const sub3 = () => {}

const topic = getTopic()
const topicOther = topic + 'different topic'

series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topicOther, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topicOther, sub3, cb)
], (err) => {
expect(err).to.not.exist()

ipfs1.pubsub.peers(topic, (err, peers) => {
expect(err).to.not.exist()
expect(peers).to.be.empty()

parallel([
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.unsubscribe(topicOther, sub2, cb),
(cb) => ipfs3.pubsub.unsubscribe(topicOther, sub3, cb)
], done)
})
})
subscribedTopics = [topic, topicOther]

await ipfs1.pubsub.subscribe(topic, sub1)
await ipfs2.pubsub.subscribe(topicOther, sub2)
await ipfs3.pubsub.subscribe(topicOther, sub3)

const peers = await ipfs1.pubsub.peers(topic)
expect(peers).to.be.empty()
})

it('should return peers for a topic - one peer', (done) => {
it('should return peers for a topic - one peer', async () => {
// Currently go-ipfs returns peers that have not been
// subscribed to the topic. Enable when go-ipfs has been fixed
const sub1 = (msg) => {}
const sub2 = (msg) => {}
const sub3 = (msg) => {}
const sub1 = () => {}
const sub2 = () => {}
const sub3 = () => {}
const topic = getTopic()

series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
(cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000, cb)
], (err) => {
expect(err).to.not.exist()

parallel([
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.unsubscribe(topic, sub3, cb)
], done)
})
subscribedTopics = [topic]

await ipfs1.pubsub.subscribe(topic, sub1)
await ipfs2.pubsub.subscribe(topic, sub2)
await ipfs3.pubsub.subscribe(topic, sub3)

await waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000)
})

it('should return peers for a topic - multiple peers', (done) => {
const sub1 = (msg) => {}
const sub2 = (msg) => {}
const sub3 = (msg) => {}
it('should return peers for a topic - multiple peers', async () => {
const sub1 = () => {}
const sub2 = () => {}
const sub3 = () => {}
const topic = getTopic()

series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
(cb) => waitForPeers(ipfs1, topic, [
ipfs2.peerId.id,
ipfs3.peerId.id
], 30000, cb)
], (err) => {
expect(err).to.not.exist()

parallel([
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.unsubscribe(topic, sub3, cb)
], done)
})
subscribedTopics = [topic]

await ipfs1.pubsub.subscribe(topic, sub1)
await ipfs2.pubsub.subscribe(topic, sub2)
await ipfs3.pubsub.subscribe(topic, sub3)

await waitForPeers(ipfs1, topic, [ipfs2.peerId.id, ipfs3.peerId.id], 30000)
})
})
}
24 changes: 13 additions & 11 deletions src/pubsub/publish.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* eslint-env mocha */
'use strict'

const timesSeries = require('async/timesSeries')
const hat = require('hat')
const { getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
Expand Down Expand Up @@ -33,26 +32,29 @@ module.exports = (createCommon, options) => {

after((done) => common.teardown(done))

it('should error on string messags', (done) => {
it('should error on string messags', async () => {
const topic = getTopic()
ipfs.pubsub.publish(topic, 'hello friend', (err) => {
try {
await ipfs.pubsub.publish(topic, 'hello friend')
} catch (err) {
expect(err).to.exist()
done()
})
return
}
throw new Error('did not error on string message')
})

it('should publish message from buffer', (done) => {
it('should publish message from buffer', () => {
const topic = getTopic()
ipfs.pubsub.publish(topic, Buffer.from(hat()), done)
return ipfs.pubsub.publish(topic, Buffer.from(hat()))
})

it('should publish 10 times within time limit', (done) => {
it('should publish 10 times within time limit', async () => {
const count = 10
const topic = getTopic()

timesSeries(count, (_, cb) => {
ipfs.pubsub.publish(topic, Buffer.from(hat()), cb)
}, done)
for (let i = 0; i < count; i++) {
await ipfs.pubsub.publish(topic, Buffer.from(hat()))
}
})
})
}

0 comments on commit 7596634

Please sign in to comment.