Skip to content

Commit

Permalink
fix: tag mesh peers on graft (ChainSafe#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Feb 7, 2023
1 parent bae1492 commit 061bf6e
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,10 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
return []
}

await this.components.peerStore.tagPeer(peerIdFromString(id), 'gossipsub-mesh-peer', {
value: 100 // value should be 0-100
})

return await Promise.all(prune.map((topic) => this.makePrune(id, topic, doPX)))
}

Expand Down Expand Up @@ -1551,6 +1555,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
await this.pxConnect(peers)
}
}

await this.components.peerStore.unTagPeer(peerIdFromString(id), 'gossipsub-mesh-peer')
}

/**
Expand Down
117 changes: 117 additions & 0 deletions test/e2e/go-gossipsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { mockNetwork } from '@libp2p/interface-mocks'
import { stop } from '@libp2p/interfaces/startable'
import { TopicScoreParams } from '../../src/score/peer-score-params.js'
import { awaitEvents, checkReceivedSubscription, checkReceivedSubscriptions } from '../utils/events.js'
import sinon, { SinonStubbedInstance } from 'sinon'
import { Tag } from '@libp2p/interface-peer-store'

/**
* These tests were translated from:
Expand Down Expand Up @@ -76,6 +78,121 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
mockNetwork.reset()
})

it.skip('should add the tag gossipsub-mesh-peer upon grafting', async function () {
// Create 20 gossipsub nodes
// Sparsely connect nodes
// Subscribe to the topic, all nodes, waiting for each subscription to propagate first
// Publish 100 messages, each from a random node
// Assert that the subscribed nodes receive every message
psubs = await createComponentsArray({
number: 20,
init: {
scoreParams: {
IPColocationFactorThreshold: 20,
behaviourPenaltyWeight: 0
}
}
})
const topic = 'foobar'

await sparseConnect(psubs)

const nodeAGraftSpy = psubs[0].pubsub as Partial<GossipSub> as SinonStubbedInstance<{
handleGraft: GossipSub['handleGraft']
}>

const nodeATagPeerSpy = psubs[0].components.peerStore as Partial<GossipSubAndComponents> as SinonStubbedInstance<{
tagPeer: GossipSubAndComponents['components']['peerStore']['tagPeer']
}>

sinon.spy(nodeAGraftSpy, 'handleGraft')

// add subscriptions to each node
for (const ps of psubs) {
ps.pubsub.subscribe(topic)
// wait for announce to propagate
await delay(100)
}
// await mesh rebalancing
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

const allTags: Tag[][] = []

psubs.forEach(async (subscribedPeer) => {
const tagArray = await subscribedPeer.components.peerStore.getTags(subscribedPeer.components.peerId)
allTags.push(tagArray)
})

expect(nodeAGraftSpy.handleGraft.callCount, 'should call handleGraft method').to.be.greaterThan(0)
expect(nodeATagPeerSpy.tagPeer.callCount, 'should call tagPeer fucntion').to.be.greaterThan(0)
expect(
allTags.map((tags) => {
tags.map((tag) => {
tag.name.toString()
})
})
).to.include('gossipsub-mesh-peer')
})

it.skip('should remove the tag gossipsub-mesh-peer upon pruning', async function () {
// Create 20 gossipsub nodes
// Sparsely connect nodes
// Subscribe to the topic, all nodes, waiting for each subscription to propagate first
// Publish 100 messages, each from a random node
// Assert that the subscribed nodes receive every message
psubs = await createComponentsArray({
number: 20,
init: {
scoreParams: {
IPColocationFactorThreshold: 20,
behaviourPenaltyWeight: 0
}
}
})
const topic = 'foobar'
psubs.forEach((ps) => ps.pubsub.subscribe(topic))

const unsubscribedPeers: GossipSubAndComponents[] = []

// every node connected to every other
await denseConnect(psubs)
// wait a bit to take effect
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

// disconnect some peers from the mesh to get some PRUNEs
psubs.slice(0, 5).forEach((ps) => {
unsubscribedPeers.push(ps)
ps.pubsub.unsubscribe(topic)
})

// wait a bit to take effect
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

const nodeAPruneSpy = unsubscribedPeers[0].pubsub as Partial<GossipSub> as SinonStubbedInstance<{
handlePrune: GossipSub['handlePrune']
}>
sinon.spy(nodeAPruneSpy, 'handlePrune')

// await mesh rebalancing
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

const allTags: Tag[][] = []

unsubscribedPeers.forEach(async (unsubscribedPeer) => {
const tagArray = await unsubscribedPeer.components.peerStore.getTags(unsubscribedPeer.components.peerId)
allTags.push(tagArray)
})

expect(nodeAPruneSpy.handlePrune.callCount).to.be.greaterThan(0)
expect(
allTags.map((tags) => {
tags.map((tag) => {
tag.name.toString()
})
})
).not.to.include('gossipsub-mesh-peer')
})

it('test sparse gossipsub', async function () {
// Create 20 gossipsub nodes
// Subscribe to the topic, all nodes
Expand Down

0 comments on commit 061bf6e

Please sign in to comment.