Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/interfaces/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"libp2p-crypto": "^0.19.5",
"multiaddr": "^10.0.0",
"multiformats": "^9.1.2",
"p-queue": "^6.6.2",
"peer-id": "^0.15.0",
"protobufjs": "^6.10.2",
"uint8arrays": "^3.0.0"
Expand Down
39 changes: 31 additions & 8 deletions packages/interfaces/src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { EventEmitter } = require('events')
const errcode = require('err-code')

const { pipe } = require('it-pipe')
const { default: Queue } = require('p-queue')

const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')
Expand Down Expand Up @@ -50,6 +51,7 @@ const {
* @property {SignaturePolicyType} [globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
* @property {boolean} [canRelayMessage = false] - if can relay messages not subscribed
* @property {boolean} [emitSelf = false] - if publish should emit to self, if subscribed
* @property {number} [messageProcessingConcurrency = 10] - handle this many incoming pubsub messages concurrently
*/

/**
Expand All @@ -67,7 +69,8 @@ class PubsubBaseProtocol extends EventEmitter {
libp2p,
globalSignaturePolicy = SignaturePolicy.StrictSign,
canRelayMessage = false,
emitSelf = false
emitSelf = false,
messageProcessingConcurrency = 10
}) {
if (typeof debugName !== 'string') {
throw new Error('a debugname `string` is required')
Expand Down Expand Up @@ -162,6 +165,11 @@ class PubsubBaseProtocol extends EventEmitter {
*/
this.topicValidators = new Map()

/**
* @type {Queue}
*/
this.queue = new Queue({ concurrency: messageProcessingConcurrency })

this._registrarId = undefined
this._onIncomingStream = this._onIncomingStream.bind(this)
this._onPeerConnected = this._onPeerConnected.bind(this)
Expand Down Expand Up @@ -356,7 +364,17 @@ class PubsubBaseProtocol extends EventEmitter {
const rpcBytes = data instanceof Uint8Array ? data : data.slice()
const rpcMsg = this._decodeRpc(rpcBytes)

await this._processRpc(idB58Str, peerStreams, rpcMsg)
// Since _processRpc may be overridden entirely in unsafe ways,
// the simplest/safest option here is to wrap in a function and capture all errors
// to prevent a top-level unhandled exception
// This processing of rpc messages should happen without awaiting full validation/execution of prior messages
;(async () => {
try {
await this._processRpc(idB58Str, peerStreams, rpcMsg)
} catch (err) {
this.log.err(err)
}
})()
}
}
)
Expand Down Expand Up @@ -392,15 +410,20 @@ class PubsubBaseProtocol extends EventEmitter {
}

if (msgs.length) {
// @ts-ignore RPC message is modified
for (const message of msgs) {
this.queue.addAll(msgs.map(message => async () => {
if (!(this.canRelayMessage || (message.topicIDs && message.topicIDs.some((topic) => this.subscriptions.has(topic))))) {
this.log('received message we didn\'t subscribe to. Dropping.')
continue
return
}
const msg = utils.normalizeInRpcMessage(message, idB58Str)
await this._processRpcMessage(msg)
}

try {
const msg = utils.normalizeInRpcMessage(message, idB58Str)

await this._processRpcMessage(msg)
} catch (err) {
this.log.err(err)
}
}))
}
return true
}
Expand Down