Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
chore: update dependencies (#116)
Browse files Browse the repository at this point in the history
* chore: update dependencies

BREAKING CHANGE: uses new multiaddr, libp2p-interfaces, etc
  • Loading branch information
achingbrain committed Jul 9, 2021
1 parent 9b62572 commit 7103cf2
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 74 deletions.
3 changes: 3 additions & 0 deletions .aegir.js
Expand Up @@ -60,6 +60,9 @@ const esbuild = {

/** @type {import('aegir').PartialOptions} */
module.exports = {
build: {
bundlesizeMax: '143KB',
},
test: {
before,
after,
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/main.yml
Expand Up @@ -39,33 +39,33 @@ jobs:
steps:
- uses: actions/checkout@v2
- run: npm install
- run: npx aegir test -t browser -t webworker --bail
- run: npx aegir test -t browser -t webworker --bail --timeout 60000
test-firefox:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: npm install
- run: npx aegir test -t browser -t webworker --bail -- --browser firefox
- run: npx aegir test -t browser -t webworker --bail --timeout 60000 -- --browser firefox
test-webkit:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: microsoft/playwright-github-action@v1
- run: npm install
- run: npx aegir test -t browser -t webworker --bail -- --browser webkit
- run: npx aegir test -t browser -t webworker --bail --timeout 60000 -- --browser webkit
test-electron-main:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: npm install
- run: npx xvfb-maybe aegir test -t electron-main --bail
- run: npx xvfb-maybe aegir test -t electron-main --bail --timeout 60000
test-electron-renderer:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: npm install
- run: npx xvfb-maybe aegir test -t electron-renderer --bail
- run: npx xvfb-maybe aegir test -t electron-renderer --bail --timeout 60000
16 changes: 9 additions & 7 deletions package.json
Expand Up @@ -6,7 +6,7 @@
"main": "src/index.js",
"types": "dist/src/index.d.ts",
"scripts": {
"lint": "aegir lint",
"lint": "aegir ts -p check && aegir lint",
"test": "aegir test",
"test:node": "aegir test -t node",
"test:browser": "aegir test -t browser",
Expand Down Expand Up @@ -50,20 +50,22 @@
"chai": "^4.3.4",
"ipfs-utils": "^8.1.0",
"it-pair": "^1.0.0",
"libp2p": "^0.31.6",
"libp2p": "next",
"libp2p-interfaces-compliance-tests": "^1.0.1",
"libp2p-mplex": "^0.10.3",
"libp2p-noise": "^3.0.0",
"libp2p-websockets": "^0.15.7",
"multiaddr": "^9.0.1",
"libp2p-noise": "^4.0.0",
"libp2p-websockets": "^0.16.0",
"multiaddr": "^10.0.0",
"multiformats": "^9.2.0",
"os": "^0.1.1",
"p-wait-for": "^3.1.0",
"peer-id": "^0.14.8",
"peer-id": "^0.15.0",
"sinon": "^11.1.1",
"util": "^0.12.3"
},
"dependencies": {
"debug": "^4.2.0",
"libp2p-interfaces": "^0.11.0",
"libp2p-interfaces": "^1.0.1",
"time-cache": "^0.3.0",
"uint8arrays": "^2.1.5"
},
Expand Down
4 changes: 2 additions & 2 deletions src/index.js
Expand Up @@ -53,7 +53,7 @@ class FloodSub extends BaseProtocol {
*/
async _processRpcMessage (message) {
// Check if I've seen the message, if yes, ignore
const seqno = this.getMsgId(message)
const seqno = await this.getMsgId(message)
const msgIdStr = toString(seqno, 'base64')

if (this.seenCache.has(msgIdStr)) {
Expand Down Expand Up @@ -90,7 +90,7 @@ class FloodSub extends BaseProtocol {
}
peers.forEach((id) => {
this.log('publish msgs on topics', message.topicIDs, id)
if (id !== this.peerId.toB58String()) {
if (id !== this.peerId.toB58String() && id !== message.receivedFrom) {
this._sendRpc(id, { msgs: [utils.normalizeOutRpcMessage(message)] })
}
})
Expand Down
2 changes: 1 addition & 1 deletion test/compliance.spec.js
@@ -1,7 +1,7 @@
/* eslint-env mocha */
'use strict'

const tests = require('libp2p-interfaces/src/pubsub/tests')
const tests = require('libp2p-interfaces-compliance-tests/src/pubsub')

const Floodsub = require('../src')
const { createPeers } = require('./utils/create-peer')
Expand Down
148 changes: 89 additions & 59 deletions test/floodsub.spec.js
Expand Up @@ -4,107 +4,137 @@
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const uint8ArrayFromString = require('uint8arrays/from-string')

const uint8ArrayToString = require('uint8arrays/to-string')
const { sha256 } = require('multiformats/hashes/sha2')
const { utils } = require('libp2p-interfaces/src/pubsub')
const pWaitFor = require('p-wait-for')
const { SignaturePolicy } = require('libp2p-interfaces/src/pubsub/signature-policy')
const PeerStreams = require('libp2p-interfaces/src/pubsub/peer-streams')
const PeerId = require('peer-id')

const Floodsub = require('../src')
const { createPeers } = require('./utils/create-peer')

const defOptions = {
emitSelf: true
emitSelf: true,
globalSignaturePolicy: SignaturePolicy.StrictNoSign
}

const topic = 'my-topic'
const message = uint8ArrayFromString('a neat message')

describe('floodsub', () => {
let floodsub1, floodsub2
let peer1, peer2
let floodsub

before(async () => {
expect(Floodsub.multicodec).to.exist()

;[peer1, peer2] = await createPeers({ number: 2 })
floodsub1 = new Floodsub(peer1, defOptions)
floodsub2 = new Floodsub(peer2, defOptions)
const libp2p = {
peerId: await PeerId.create(),
registrar: {
handle: () => {},
register: () => {},
unregister: () => {}
}
}

floodsub = new Floodsub(libp2p, defOptions)
})

beforeEach(() => {
return Promise.all([
floodsub1.start(),
floodsub2.start()
])
floodsub.start()
})

afterEach(async () => {
afterEach(() => {
sinon.restore()
await floodsub1.stop()
await floodsub2.stop()
await peer1.stop()
await peer2.stop()
floodsub.stop()
})

it('checks cache when processing incoming message', async () => {
sinon.spy(floodsub2.seenCache, 'has')
sinon.spy(floodsub2.seenCache, 'put')
sinon.spy(floodsub2, '_processRpcMessage')
sinon.spy(floodsub2, '_publish')
it('checks cache when processing incoming message', async function () {
const otherPeer = await PeerId.create()
const sig = await sha256.encode(message)
const key = uint8ArrayToString(sig, 'base64')
let callCount = 0

let messageReceived = false
function checkMessage (msg) {
messageReceived = true
const peerStream = new PeerStreams({
id: otherPeer,
protocol: 'test'
})
const rpc = {
subscriptions: [],
msgs: [{
receivedFrom: peerStream.id.toB58String(),
data: message,
topicIDs: [topic]
}]
}

// connect peers
await floodsub1._libp2p.dial(floodsub2._libp2p.peerId)

// subscribe and wait for subscription to be received in the other peer
floodsub2.subscribe(topic)
floodsub2.on(topic, checkMessage)
await pWaitFor(() => {
const subs = floodsub1.getSubscribers(topic)

return subs.length === 1
floodsub.subscribe(topic)
floodsub.on(topic, () => {
callCount++
})

await floodsub1.publish(topic, message)
await pWaitFor(() => messageReceived === true)
// the message should not be in the cache
expect(floodsub.seenCache.has(key)).to.be.false()

// receive the message once
await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)

expect(floodsub2.seenCache.has.callCount).to.eql(2) // Put also calls .has
expect(floodsub2.seenCache.put.callCount).to.eql(1)
expect(floodsub2._publish.callCount).to.eql(1) // Forward message
// should have received the message
expect(callCount).to.equal(1)

const [msgProcessed] = floodsub2._processRpcMessage.getCall(0).args
// should be in the cache now
expect(floodsub.seenCache.has(key)).to.be.true()

// Force a second process for the message
await floodsub2._processRpcMessage(msgProcessed)
// receive the message multiple times
await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)
await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)
await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)

expect(floodsub2.seenCache.has.callCount).to.eql(3)
expect(floodsub2.seenCache.put.callCount).to.eql(1) // No new put
expect(floodsub2._publish.callCount).to.eql(1) // Not forwarded
// should only have emitted the message once
expect(callCount).to.equal(1)
})

it('forwards normalized messages on publish', async () => {
sinon.spy(floodsub1, '_forwardMessage')
sinon.spy(utils, 'randomSeqno')
sinon.spy(floodsub, '_forwardMessage')

await floodsub1.publish(topic, message)
expect(floodsub1._forwardMessage.callCount).to.eql(1)
const [messageToEmit] = floodsub1._forwardMessage.getCall(0).args

const computedSeqno = utils.randomSeqno.getCall(0).returnValue
utils.randomSeqno.restore()
sinon.stub(utils, 'randomSeqno').returns(computedSeqno)
await floodsub.publish(topic, message)
expect(floodsub._forwardMessage.callCount).to.equal(1)
const [messageToEmit] = floodsub._forwardMessage.getCall(0).args

const expected = utils.normalizeInRpcMessage(
await floodsub1._buildMessage({
receivedFrom: peer1.peerId.toB58String(),
from: peer1.peerId.toB58String(),
await floodsub._buildMessage({
receivedFrom: floodsub.peerId.toB58String(),
data: message,
topicIDs: [topic]
}))

expect(messageToEmit).to.eql(expected)
})

it('does not send received message back to original sender', async () => {
sinon.spy(floodsub, '_sendRpc')

const sender = await PeerId.create()

const peerStream = new PeerStreams({
id: sender,
protocol: 'test'
})
const rpc = {
subscriptions: [],
msgs: [{
receivedFrom: peerStream.id.toB58String(),
data: message,
topicIDs: [topic]
}]
}

// otherPeer is subscribed to the topic
floodsub.topics.set(topic, new Set([sender.toB58String()]))

// receive the message
await floodsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)

// should not forward back to the sender
expect(floodsub._sendRpc.called).to.be.false()
})
})

0 comments on commit 7103cf2

Please sign in to comment.