Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get negotiated Gossip protocol version from either inbound or outbound stream #160

Merged
merged 2 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ fun P2PService.PeerHandler.getIP(): String? =
streamHandler.stream.connection.remoteAddress().getStringComponent(Protocol.IP4)

fun P2PService.PeerHandler.isOutbound() = streamHandler.stream.connection.isInitiator
fun P2PService.PeerHandler.getOutboundProtocol() = getOutboundHandler()?.stream?.getProtocol()?.getNow(null)
?: throw InternalErrorException("Outbound gossip stream not initialized or protocol is missing")

fun P2PService.PeerHandler.getOutboundGossipProtocol() = PubsubProtocol.fromProtocol(getOutboundProtocol())
fun P2PService.PeerHandler.getPeerProtocol(): PubsubProtocol {
fun P2PService.StreamHandler.getProtocol(): String? = stream.getProtocol().getNow(null)
val proto =
getOutboundHandler()?.getProtocol()
?: getInboundHandler()?.getProtocol()
?: throw InternalErrorException("Couldn't get peer gossip protocol")
return PubsubProtocol.fromProtocol(proto)
}

/**
* Router implementing this protocol: https://github.com/libp2p/specs/tree/master/pubsub/gossipsub
Expand Down Expand Up @@ -111,7 +116,11 @@ open class GossipRouter @JvmOverloads constructor(
score.notifyUnseenMessage(peer, msg)
}

override fun notifySeenMessage(peer: PeerHandler, msg: PubsubMessage, validationResult: Optional<ValidationResult>) {
override fun notifySeenMessage(
peer: PeerHandler,
msg: PubsubMessage,
validationResult: Optional<ValidationResult>
) {
score.notifySeenMessage(peer, msg, validationResult)
if (validationResult.isPresent && validationResult.get() != ValidationResult.Invalid) {
notifyAnyValidMessage(peer, msg)
Expand Down Expand Up @@ -449,7 +458,7 @@ open class GossipRouter @JvmOverloads constructor(

private fun enqueuePrune(peer: PeerHandler, topic: Topic) {
val pruneBuilder = Rpc.ControlPrune.newBuilder().setTopicID(topic)
if (peer.getOutboundGossipProtocol() == PubsubProtocol.Gossip_V_1_1 && this.protocol == PubsubProtocol.Gossip_V_1_1) {
if (peer.getPeerProtocol() == PubsubProtocol.Gossip_V_1_1 && this.protocol == PubsubProtocol.Gossip_V_1_1) {
// add v1.1 specific fields
pruneBuilder.backoff = params.pruneBackoff.seconds
(getTopicPeers(topic) - peer)
Expand Down
44 changes: 44 additions & 0 deletions src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,48 @@ class GossipPubsubRouterTest : PubsubRouterTest({
Assertions.assertFalse(testLogAppender.hasAnyWarns())
}
}

@Test
fun `test that no warn when trying to respond prune without outbound gossip stream`() {
// when remote gossip makes connection and immediately send GRAFT
// the situation when we fail to send PRUNE (as not outbound stream yet)
// shouldn't be treated as internal error and no WARN logs should be printed
val fuzz = DeterministicFuzz()

val router1 = fuzz.createTestRouter(MockRouter())

// when isDirect the gossip router should reply with PRUNE to GRAFT
// this would reproduce the case
val gossipScoreParams = GossipScoreParams(GossipPeerScoreParams(isDirect = { true }))

val router2 = fuzz.createTestRouter(GossipRouter(scoreParams = gossipScoreParams))
val mockRouter = router1.router as MockRouter

router2.router.subscribe("topic1")
router1.connect(router2, LogLevel.INFO, LogLevel.INFO)

TestLogAppender().install().use { testLogAppender ->
val msg1 = Rpc.RPC.newBuilder()
.setControl(
Rpc.ControlMessage.newBuilder().addGraft(
Rpc.ControlGraft.newBuilder().setTopicID("topic1")
)
).build()

mockRouter.sendToSingle(msg1)

Assertions.assertFalse(testLogAppender.hasAnyWarns())
}

router2.connect(router1, LogLevel.INFO, LogLevel.INFO)

val msg1 = Rpc.RPC.newBuilder()
.setControl(
Rpc.ControlMessage.newBuilder().addIhave(
Rpc.ControlIHave.newBuilder().addMessageIDs("messageId".toByteArray().toProtobuf())
)
).build()

mockRouter.sendToSingle(msg1)
Nashatyrev marked this conversation as resolved.
Show resolved Hide resolved
}
}