diff --git a/.aegir.js b/.aegir.js index 53d3b66..a0b5c3d 100644 --- a/.aegir.js +++ b/.aegir.js @@ -60,6 +60,9 @@ const esbuild = { /** @type {import('aegir').PartialOptions} */ module.exports = { + build: { + bundlesizeMax: '143KB', + }, test: { before, after, diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f0ca77f..e9c3ef2 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -39,14 +39,14 @@ jobs: steps: - uses: actions/checkout@v2 - run: npm install - - run: npx aegir test -t browser -t webworker --bail + - run: npx aegir test -t browser -t webworker --bail --timeout 60000 test-firefox: needs: check runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - run: npm install - - run: npx aegir test -t browser -t webworker --bail -- --browser firefox + - run: npx aegir test -t browser -t webworker --bail --timeout 60000 -- --browser firefox test-webkit: needs: check runs-on: ubuntu-latest @@ -54,18 +54,18 @@ jobs: - uses: actions/checkout@v2 - uses: microsoft/playwright-github-action@v1 - run: npm install - - run: npx aegir test -t browser -t webworker --bail -- --browser webkit + - run: npx aegir test -t browser -t webworker --bail --timeout 60000 -- --browser webkit test-electron-main: needs: check runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - run: npm install - - run: npx xvfb-maybe aegir test -t electron-main --bail + - run: npx xvfb-maybe aegir test -t electron-main --bail --timeout 60000 test-electron-renderer: needs: check runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - run: npm install - - run: npx xvfb-maybe aegir test -t electron-renderer --bail \ No newline at end of file + - run: npx xvfb-maybe aegir test -t electron-renderer --bail --timeout 60000 diff --git a/package.json b/package.json index 651596d..4d9d368 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "main": "src/index.js", "types": "dist/src/index.d.ts", "scripts": { - "lint": "aegir lint", + "lint": "aegir ts -p check && aegir lint", "test": "aegir test", "test:node": "aegir test -t node", "test:browser": "aegir test -t browser", @@ -50,20 +50,22 @@ "chai": "^4.3.4", "ipfs-utils": "^8.1.0", "it-pair": "^1.0.0", - "libp2p": "^0.31.6", + "libp2p": "next", + "libp2p-interfaces-compliance-tests": "^1.0.1", "libp2p-mplex": "^0.10.3", - "libp2p-noise": "^3.0.0", - "libp2p-websockets": "^0.15.7", - "multiaddr": "^9.0.1", + "libp2p-noise": "^4.0.0", + "libp2p-websockets": "^0.16.0", + "multiaddr": "^10.0.0", + "multiformats": "^9.2.0", "os": "^0.1.1", "p-wait-for": "^3.1.0", - "peer-id": "^0.14.8", + "peer-id": "^0.15.0", "sinon": "^11.1.1", "util": "^0.12.3" }, "dependencies": { "debug": "^4.2.0", - "libp2p-interfaces": "^0.11.0", + "libp2p-interfaces": "^1.0.1", "time-cache": "^0.3.0", "uint8arrays": "^2.1.5" }, diff --git a/src/index.js b/src/index.js index 7e007be..73b15df 100644 --- a/src/index.js +++ b/src/index.js @@ -53,7 +53,7 @@ class FloodSub extends BaseProtocol { */ async _processRpcMessage (message) { // Check if I've seen the message, if yes, ignore - const seqno = this.getMsgId(message) + const seqno = await this.getMsgId(message) const msgIdStr = toString(seqno, 'base64') if (this.seenCache.has(msgIdStr)) { @@ -90,7 +90,7 @@ class FloodSub extends BaseProtocol { } peers.forEach((id) => { this.log('publish msgs on topics', message.topicIDs, id) - if (id !== this.peerId.toB58String()) { + if (id !== this.peerId.toB58String() && id !== message.receivedFrom) { this._sendRpc(id, { msgs: [utils.normalizeOutRpcMessage(message)] }) } }) diff --git a/test/compliance.spec.js b/test/compliance.spec.js index f652b9f..15d0a56 100644 --- a/test/compliance.spec.js +++ b/test/compliance.spec.js @@ -1,7 +1,7 @@ /* eslint-env mocha */ 'use strict' -const tests = require('libp2p-interfaces/src/pubsub/tests') +const tests = require('libp2p-interfaces-compliance-tests/src/pubsub') const Floodsub = require('../src') const { createPeers } = require('./utils/create-peer') diff --git a/test/floodsub.spec.js b/test/floodsub.spec.js index 4bfebd4..37dc440 100644 --- a/test/floodsub.spec.js +++ b/test/floodsub.spec.js @@ -4,107 +4,137 @@ const { expect } = require('aegir/utils/chai') const sinon = require('sinon') const uint8ArrayFromString = require('uint8arrays/from-string') - +const uint8ArrayToString = require('uint8arrays/to-string') +const { sha256 } = require('multiformats/hashes/sha2') const { utils } = require('libp2p-interfaces/src/pubsub') -const pWaitFor = require('p-wait-for') +const { SignaturePolicy } = require('libp2p-interfaces/src/pubsub/signature-policy') +const PeerStreams = require('libp2p-interfaces/src/pubsub/peer-streams') +const PeerId = require('peer-id') const Floodsub = require('../src') -const { createPeers } = require('./utils/create-peer') const defOptions = { - emitSelf: true + emitSelf: true, + globalSignaturePolicy: SignaturePolicy.StrictNoSign } const topic = 'my-topic' const message = uint8ArrayFromString('a neat message') describe('floodsub', () => { - let floodsub1, floodsub2 - let peer1, peer2 + let floodsub before(async () => { expect(Floodsub.multicodec).to.exist() - ;[peer1, peer2] = await createPeers({ number: 2 }) - floodsub1 = new Floodsub(peer1, defOptions) - floodsub2 = new Floodsub(peer2, defOptions) + const libp2p = { + peerId: await PeerId.create(), + registrar: { + handle: () => {}, + register: () => {}, + unregister: () => {} + } + } + + floodsub = new Floodsub(libp2p, defOptions) }) beforeEach(() => { - return Promise.all([ - floodsub1.start(), - floodsub2.start() - ]) + floodsub.start() }) - afterEach(async () => { + afterEach(() => { sinon.restore() - await floodsub1.stop() - await floodsub2.stop() - await peer1.stop() - await peer2.stop() + floodsub.stop() }) - it('checks cache when processing incoming message', async () => { - sinon.spy(floodsub2.seenCache, 'has') - sinon.spy(floodsub2.seenCache, 'put') - sinon.spy(floodsub2, '_processRpcMessage') - sinon.spy(floodsub2, '_publish') + it('checks cache when processing incoming message', async function () { + const otherPeer = await PeerId.create() + const sig = await sha256.encode(message) + const key = uint8ArrayToString(sig, 'base64') + let callCount = 0 - let messageReceived = false - function checkMessage (msg) { - messageReceived = true + const peerStream = new PeerStreams({ + id: otherPeer, + protocol: 'test' + }) + const rpc = { + subscriptions: [], + msgs: [{ + receivedFrom: peerStream.id.toB58String(), + data: message, + topicIDs: [topic] + }] } - // connect peers - await floodsub1._libp2p.dial(floodsub2._libp2p.peerId) - - // subscribe and wait for subscription to be received in the other peer - floodsub2.subscribe(topic) - floodsub2.on(topic, checkMessage) - await pWaitFor(() => { - const subs = floodsub1.getSubscribers(topic) - - return subs.length === 1 + floodsub.subscribe(topic) + floodsub.on(topic, () => { + callCount++ }) - await floodsub1.publish(topic, message) - await pWaitFor(() => messageReceived === true) + // the message should not be in the cache + expect(floodsub.seenCache.has(key)).to.be.false() + + // receive the message once + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) - expect(floodsub2.seenCache.has.callCount).to.eql(2) // Put also calls .has - expect(floodsub2.seenCache.put.callCount).to.eql(1) - expect(floodsub2._publish.callCount).to.eql(1) // Forward message + // should have received the message + expect(callCount).to.equal(1) - const [msgProcessed] = floodsub2._processRpcMessage.getCall(0).args + // should be in the cache now + expect(floodsub.seenCache.has(key)).to.be.true() - // Force a second process for the message - await floodsub2._processRpcMessage(msgProcessed) + // receive the message multiple times + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) - expect(floodsub2.seenCache.has.callCount).to.eql(3) - expect(floodsub2.seenCache.put.callCount).to.eql(1) // No new put - expect(floodsub2._publish.callCount).to.eql(1) // Not forwarded + // should only have emitted the message once + expect(callCount).to.equal(1) }) it('forwards normalized messages on publish', async () => { - sinon.spy(floodsub1, '_forwardMessage') - sinon.spy(utils, 'randomSeqno') + sinon.spy(floodsub, '_forwardMessage') - await floodsub1.publish(topic, message) - expect(floodsub1._forwardMessage.callCount).to.eql(1) - const [messageToEmit] = floodsub1._forwardMessage.getCall(0).args - - const computedSeqno = utils.randomSeqno.getCall(0).returnValue - utils.randomSeqno.restore() - sinon.stub(utils, 'randomSeqno').returns(computedSeqno) + await floodsub.publish(topic, message) + expect(floodsub._forwardMessage.callCount).to.equal(1) + const [messageToEmit] = floodsub._forwardMessage.getCall(0).args const expected = utils.normalizeInRpcMessage( - await floodsub1._buildMessage({ - receivedFrom: peer1.peerId.toB58String(), - from: peer1.peerId.toB58String(), + await floodsub._buildMessage({ + receivedFrom: floodsub.peerId.toB58String(), data: message, topicIDs: [topic] })) expect(messageToEmit).to.eql(expected) }) + + it('does not send received message back to original sender', async () => { + sinon.spy(floodsub, '_sendRpc') + + const sender = await PeerId.create() + + const peerStream = new PeerStreams({ + id: sender, + protocol: 'test' + }) + const rpc = { + subscriptions: [], + msgs: [{ + receivedFrom: peerStream.id.toB58String(), + data: message, + topicIDs: [topic] + }] + } + + // otherPeer is subscribed to the topic + floodsub.topics.set(topic, new Set([sender.toB58String()])) + + // receive the message + await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) + + // should not forward back to the sender + expect(floodsub._sendRpc.called).to.be.false() + }) })