Skip to content

Commit

Permalink
Gossipsub scoring fixes (#508)
Browse files Browse the repository at this point in the history
* Fix some problematics when running with full scoring

* more fixes
  • Loading branch information
sinkingsugar committed Jan 25, 2021
1 parent 0959877 commit 1d77d37
Showing 1 changed file with 127 additions and 57 deletions.
184 changes: 127 additions & 57 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,14 @@ type

disconnectBadPeers*: bool

BackoffTable = Table[string, Table[PeerID, Moment]]

GossipSub* = ref object of FloodSub
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
gossipsub*: PeerTable # peers that are subscribed to a topic
explicit*: PeerTable # directpeers that we keep alive explicitly
backingOff*: Table[PeerID, Moment] # explicit (always connected/forward) peers
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
gossip*: Table[string, seq[ControlIHave]] # pending gossip
control*: Table[string, ControlMessage] # pending control messages
Expand Down Expand Up @@ -200,6 +202,13 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"])
declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"])
declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow")
declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout")
Expand Down Expand Up @@ -365,16 +374,21 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
g.grafted(p, topic)

proc pruned(g: GossipSub, p: PubSubPeer, topic: string) =
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]())
.mgetOrPut(p.peerId, backoff) = backoff

g.peerStats.withValue(p.peerId, stats):
if topic in stats.topicInfos:
var info = stats.topicInfos[topic]
let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init())

# penalize a peer that delivered no message
let threshold = topicParams.meshMessageDeliveriesThreshold
if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold:
let deficit = threshold - info.meshMessageDeliveries
info.meshFailurePenalty += deficit * deficit
if topic in g.topicParams:
let topicParams = g.topicParams[topic]
# penalize a peer that delivered no message
let threshold = topicParams.meshMessageDeliveriesThreshold
if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold:
let deficit = threshold - info.meshMessageDeliveries
info.meshFailurePenalty += deficit * deficit

info.inMesh = false

Expand Down Expand Up @@ -462,7 +476,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
# don't pick explicit peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin g.backingOff
it.peerId notin g.backingOff.getOrDefault(topic)
)

# shuffle anyway, score might be not used
Expand Down Expand Up @@ -507,7 +521,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
# don't pick explicit peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin g.backingOff
it.peerId notin g.backingOff.getOrDefault(topic)
)

# shuffle anyway, score might be not used
Expand Down Expand Up @@ -606,7 +620,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
# don't pick explicit peers
x.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
x.peerId notin g.backingOff
x.peerId notin g.backingOff.getOrDefault(topic)

# by spec, grab only 2
if avail.len > 2:
Expand Down Expand Up @@ -832,6 +846,32 @@ proc updateScores(g: GossipSub) = # avoid async

peer.score += topicScore * topicParams.topicWeight

# Score metrics
when defined(libp2p_agents_metrics):
let agent =
block:
if peer.shortAgent.len > 0:
peer.shortAgent
else:
if peer.sendConn != nil:
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
if KnownLibP2PAgentsSeq.contains(shortAgent):
peer.shortAgent = shortAgent
else:
peer.shortAgent = "unknown"
peer.shortAgent
else:
"unknown"
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent])
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent])
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent])
libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = [agent])
else:
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = ["unknown"])
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = ["unknown"])
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = ["unknown"])
libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = ["unknown"])

# Score decay
info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay
if info.firstMessageDeliveries < g.parameters.decayToZero:
Expand All @@ -857,7 +897,32 @@ proc updateScores(g: GossipSub) = # avoid async

peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight

peer.score += g.colocationFactor(peer) * g.parameters.ipColocationFactorWeight
let colocationFactor = g.colocationFactor(peer)
peer.score += colocationFactor * g.parameters.ipColocationFactorWeight

# Score metrics
when defined(libp2p_agents_metrics):
let agent =
block:
if peer.shortAgent.len > 0:
peer.shortAgent
else:
if peer.sendConn != nil:
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
if KnownLibP2PAgentsSeq.contains(shortAgent):
peer.shortAgent = shortAgent
else:
peer.shortAgent = "unknown"
peer.shortAgent
else:
"unknown"
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent])
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent])
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent])
else:
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = ["unknown"])
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = ["unknown"])
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = ["unknown"])

# decay behaviourPenalty
peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay
Expand All @@ -876,20 +941,6 @@ proc updateScores(g: GossipSub) = # avoid async
asyncSpawn g.disconnectPeer(peer)

when defined(libp2p_agents_metrics):
let agent =
block:
if peer.shortAgent.len > 0:
peer.shortAgent
else:
if peer.sendConn != nil:
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
if KnownLibP2PAgentsSeq.contains(shortAgent):
peer.shortAgent = shortAgent
else:
peer.shortAgent = "unknown"
peer.shortAgent
else:
"unknown"
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])
else:
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"])
Expand All @@ -899,20 +950,19 @@ proc updateScores(g: GossipSub) = # avoid async

trace "updated scores", peers = g.peers.len

proc handleBackingOff(t: var BackoffTable, topic: string) =
let now = Moment.now()
var expired = toSeq(t.getOrDefault(topic).pairs())
expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool:
now >= pair.expire
for (peer, _) in expired:
t.mgetOrPut(topic, initTable[PeerID, Moment]()).del(peer)

proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning:
try:
trace "running heartbeat", instance = cast[int](g)

# remove expired backoffs
block:
let now = Moment.now()
var expired = toSeq(g.backingOff.pairs())
expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool:
now >= pair.expire
for (peer, _) in expired:
g.backingOff.del(peer)

# reset IWANT budget
# reset IHAVE cap
block:
Expand All @@ -925,6 +975,10 @@ proc heartbeat(g: GossipSub) {.async.} =
var meshMetrics = MeshMetrics()

for t in toSeq(g.topics.keys):
# remove expired backoffs
block:
handleBackingOff(g.backingOff, t)

# prune every negative score peer
# do this before relance
# in order to avoid grafted -> pruned in the same cycle
Expand Down Expand Up @@ -1057,13 +1111,11 @@ method subscribeTopic*(g: GossipSub,

trace "gossip peers", peers = g.gossipsub.peers(topic), topic

proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
proc punishInvalidMessage(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
for t in topics:
if t notin g.topics:
continue

# ensure we init a new topic if unknown
let _ = g.topicParams.mgetOrPut(t, TopicParams.init())
# update stats
g.peerStats.withValue(peer.peerId, stats):
stats[].topicInfos.withValue(t, tstats):
Expand Down Expand Up @@ -1092,29 +1144,40 @@ proc handleGraft(g: GossipSub,
# It is an error to GRAFT on a explicit peer
if peer.peerId in g.parameters.directPeers:
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
warn "attempt to graft an explicit peer", peer=peer.id,
topicID=graft.topicID
warn "attempt to graft an explicit peer", peer=peer.peerId,
topic
# and such an attempt should be logged and rejected with a PRUNE
result.add(ControlPrune(
topicID: graft.topicID,
topicID: topic,
peers: @[], # omitting heavy computation here as the remote did something illegal
backoff: g.parameters.pruneBackoff.seconds.uint64))

g.punishPeer(peer, @[topic])
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]())
.mgetOrPut(peer.peerId, backoff) = backoff

peer.behaviourPenalty += 0.1

continue

if peer.peerId in g.backingOff and g.backingOff[peer.peerId] > Moment.now():
trace "attempt to graft a backingOff peer", peer=peer.id,
topicID=graft.topicID,
expire=g.backingOff[peer.peerId]
if g.backingOff
.getOrDefault(topic)
.getOrDefault(peer.peerId) > Moment.now():
warn "attempt to graft a backingOff peer", peer=peer.peerId,
topic
# and such an attempt should be logged and rejected with a PRUNE
result.add(ControlPrune(
topicID: graft.topicID,
topicID: topic,
peers: @[], # omitting heavy computation here as the remote did something illegal
backoff: g.parameters.pruneBackoff.seconds.uint64))

g.punishPeer(peer, @[topic])
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]())
.mgetOrPut(peer.peerId, backoff) = backoff

peer.behaviourPenalty += 0.1

continue

Expand Down Expand Up @@ -1150,18 +1213,23 @@ proc handleGraft(g: GossipSub,

proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes:
trace "peer pruned topic", peer, topic = prune.topicID
let topic = prune.topicID

trace "peer pruned topic", peer, topic

# add peer backoff
if prune.backoff > 0:
let backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds)
let current = g.backingOff.getOrDefault(peer.peerId)
let
backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds)
current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId)
if backoff > current:
g.backingOff[peer.peerId] = backoff
g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]())
.mgetOrPut(peer.peerId, backoff) = backoff

trace "pruning rpc received peer", peer, score = peer.score
g.pruned(peer, prune.topicID)
g.mesh.removePeer(prune.topicID, peer)
g.pruned(peer, topic)
g.mesh.removePeer(topic, peer)

# TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that
# another option could be to implement signed peer records
Expand Down Expand Up @@ -1264,14 +1332,14 @@ method rpcHandler*(g: GossipSub,
# always validate if signature is present or required
debug "Dropping message due to failed signature verification",
msgId = shortLog(msgId), peer
g.punishPeer(peer, msg.topicIDs)
g.punishInvalidMessage(peer, msg.topicIDs)
continue

if msg.seqno.len > 0 and msg.seqno.len != 8:
# if we have seqno should be 8 bytes long
debug "Dropping message due to invalid seqno length",
msgId = shortLog(msgId), peer
g.punishPeer(peer, msg.topicIDs)
g.punishInvalidMessage(peer, msg.topicIDs)
continue

# g.anonymize needs no evaluation when receiving messages
Expand All @@ -1282,7 +1350,7 @@ method rpcHandler*(g: GossipSub,
of ValidationResult.Reject:
debug "Dropping message after validation, reason: reject",
msgId = shortLog(msgId), peer
g.punishPeer(peer, msg.topicIDs)
g.punishInvalidMessage(peer, msg.topicIDs)
continue
of ValidationResult.Ignore:
debug "Dropping message after validation, reason: ignore",
Expand Down Expand Up @@ -1410,6 +1478,8 @@ proc unsubscribe*(g: GossipSub, topic: string) =
else:
g.broadcast(toSeq(gpeers), msg)

g.topicParams.del(topic)

method unsubscribeAll*(g: GossipSub, topic: string) =
g.unsubscribe(topic)
# finally let's remove from g.topics, do that by calling PubSub
Expand Down

0 comments on commit 1d77d37

Please sign in to comment.