Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: disallow publishing pubsub messages to zero peers (#4286)
Browse files Browse the repository at this point in the history
Use the default gossipsub behaviour of not allowing publishing message
to zero peers.

If a user publishes a message they may be surprised to find it's not
been recieved by anyone, so instead an error should be thrown when there
are no peers that will receive a message.
  • Loading branch information
achingbrain committed Jan 11, 2023
1 parent 789ee58 commit fa578ba
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 4 deletions.
16 changes: 16 additions & 0 deletions packages/interface-ipfs-core/src/name-pubsub/pubsub.js
Expand Up @@ -88,6 +88,14 @@ export function testPubsub (factory, options) {
const topic = `${namespace}${uint8ArrayToString(routingKey, 'base64url')}`

await nodeB.pubsub.subscribe(topic, () => {})

// wait for nodeA to see nodeB's subscription
await waitFor(async () => {
const peers = await nodeA.pubsub.peers(topic)

return peers.map(p => p.toString()).includes(idB.id.toString())
})

await nodeA.name.publish(ipfsRef, { resolve: false })
await delay(1000) // guarantee record is written

Expand Down Expand Up @@ -149,6 +157,14 @@ export function testPubsub (factory, options) {
const topic = `${namespace}${uint8ArrayToString(routingKey, 'base64url')}`

await nodeB.pubsub.subscribe(topic, checkMessage)

// wait for nodeA to see nodeB's subscription
await waitFor(async () => {
const peers = await nodeA.pubsub.peers(topic)

return peers.map(p => p.toString()).includes(idB.id.toString())
})

await nodeA.name.publish(ipfsRef, { resolve: false, key: testAccountName })
await waitFor(alreadySubscribed)

Expand Down
39 changes: 37 additions & 2 deletions packages/interface-ipfs-core/src/pubsub/publish.js
Expand Up @@ -5,11 +5,30 @@ import { nanoid } from 'nanoid'
import { getTopic } from './utils.js'
import { expect } from 'aegir/chai'
import { getDescribe, getIt } from '../utils/mocha.js'
import pWaitFor from 'p-wait-for'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
* @typedef {import('ipfs-core-types').IPFS} IPFS
*/

/**
* @param {string} topic
* @param {IPFS} ipfs
* @param {IPFS} remote
*/
async function waitForRemoteToBeSubscribed (topic, ipfs, remote) {
await remote.pubsub.subscribe(topic, () => {})
const remoteId = await remote.id()

// wait for remote to be subscribed to topic
await pWaitFor(async () => {
const peers = await ipfs.pubsub.peers(topic)

return peers.map(p => p.toString()).includes(remoteId.id.toString())
})
}

/**
* @param {Factory} factory
* @param {object} options
Expand All @@ -21,30 +40,46 @@ export function testPublish (factory, options) {
describe('.pubsub.publish', function () {
this.timeout(80 * 1000)

/** @type {import('ipfs-core-types').IPFS} */
/** @type {IPFS} */
let ipfs

/** @type {IPFS} */
let remote

before(async () => {
ipfs = (await factory.spawn()).api
remote = (await factory.spawn()).api

// ensure we have peers to allow publishing
const remoteId = await remote.id()
await ipfs.swarm.connect(remoteId.addresses[0])
})

after(() => factory.clean())

it('should fail with undefined msg', async () => {
const topic = getTopic()

await waitForRemoteToBeSubscribed(topic, ipfs, remote)

// @ts-expect-error invalid parameter
await expect(ipfs.pubsub.publish(topic)).to.eventually.be.rejected()
})

it('should publish message from buffer', () => {
it('should publish message from buffer', async () => {
const topic = getTopic()

await waitForRemoteToBeSubscribed(topic, ipfs, remote)

return ipfs.pubsub.publish(topic, uint8ArrayFromString(nanoid()))
})

it('should publish 10 times within time limit', async () => {
const count = 10
const topic = getTopic()

await waitForRemoteToBeSubscribed(topic, ipfs, remote)

for (let i = 0; i < count; i++) {
await ipfs.pubsub.publish(topic, uint8ArrayFromString(nanoid()))
}
Expand Down
Expand Up @@ -5,7 +5,6 @@ import { gossipsub } from '@chainsafe/libp2p-gossipsub'
/** @type {() => Record<string, (components: any) => PubSub>}>} */
export const routers = () => ({
gossipsub: gossipsub({
allowPublishToZeroPeers: true,
fallbackToFloodsub: true,
emitSelf: true,
maxInboundStreams: 64,
Expand Down
1 change: 0 additions & 1 deletion packages/ipfs-core-config/src/libp2p-pubsub-routers.js
Expand Up @@ -6,7 +6,6 @@ import { floodsub } from '@libp2p/floodsub'
/** @type {() => Record<string, (components: any) => PubSub>}>} */
export const routers = () => ({
gossipsub: gossipsub({
allowPublishToZeroPeers: true,
fallbackToFloodsub: true,
emitSelf: true,
maxInboundStreams: 64,
Expand Down

0 comments on commit fa578ba

Please sign in to comment.