Skip to content
Permalink
Browse files

Implement kademlia PING RPC.

  • Loading branch information...
jtownson committed Sep 11, 2019
1 parent 45e08dd commit e8736390fa42b7bea6b1170a999501bb3ad8c464
@@ -16,11 +16,15 @@ object KMessage {

object KRequest {
case class FindNodes[A](requestId: UUID, nodeRecord: NodeRecord[A], targetNodeId: BitVector) extends KRequest[A]

case class Ping[A](requestId: UUID, nodeRecord: NodeRecord[A]) extends KRequest[A]
}

sealed trait KResponse[A] extends KMessage[A]

object KResponse {
case class Nodes[A](requestId: UUID, nodeRecord: NodeRecord[A], nodes: Seq[NodeRecord[A]]) extends KResponse[A]

case class Pong[A](requestId: UUID, nodeRecord: NodeRecord[A]) extends KResponse[A]
}
}
@@ -1,7 +1,8 @@
package io.iohk.scalanet.peergroup.kademlia

import io.iohk.scalanet.peergroup.kademlia.KMessage.KRequest.FindNodes
import io.iohk.scalanet.peergroup.kademlia.KMessage.KResponse.Nodes
import io.iohk.scalanet.peergroup.kademlia.KMessage.{KRequest, KResponse}
import io.iohk.scalanet.peergroup.kademlia.KMessage.KRequest.{FindNodes, Ping}
import io.iohk.scalanet.peergroup.kademlia.KMessage.KResponse.{Nodes, Pong}
import io.iohk.scalanet.peergroup.kademlia.KRouter.NodeRecord
import io.iohk.scalanet.peergroup.{Channel, PeerGroup}
import monix.eval.Task
@@ -25,6 +26,22 @@ trait KNetwork[A] {
* @return the future response
*/
def findNodes(to: NodeRecord[A], request: FindNodes[A]): Task[Nodes[A]]

/**
* Server side PING handler.
* @return An Observable for receiving PING requests.
* Each element contains a tuple consisting of a PING request
* with a function for accepting the require PONG response.
*/
def ping: Observable[(Ping[A], Pong[A] => Task[Unit])]

/**
* Send a PING message to another peer.
* @param to the peer to send the message to
* @param request the PING request
* @return the future response
*/
def ping(to: NodeRecord[A], request: Ping[A]): Task[Pong[A]]
}

object KNetwork {
@@ -37,37 +54,49 @@ object KNetwork {
)(implicit scheduler: Scheduler)
extends KNetwork[A] {

override def findNodes(to: NodeRecord[A], message: FindNodes[A]): Task[Nodes[A]] = {
override def findNodes: Observable[(FindNodes[A], Nodes[A] => Task[Unit])] = serverTemplate {
case f @ FindNodes(_, _, _) => f
}

override def ping: Observable[(Ping[A], Pong[A] => Task[Unit])] = serverTemplate {
case p @ Ping(_, _) => p
}

override def findNodes(to: NodeRecord[A], request: FindNodes[A]): Task[Nodes[A]] = {
requestTemplate(to, request, { case n @ Nodes(_, _, _) => n })
}

override def ping(to: NodeRecord[A], request: Ping[A]): Task[Pong[A]] = {
requestTemplate(to, request, { case p @ Pong(_, _) => p })
}

private def requestTemplate[Request <: KRequest[A], Response <: KResponse[A]](
to: NodeRecord[A],
message: Request,
pf: PartialFunction[KMessage[A], Response]
): Task[Response] = {
peerGroup
.client(to.routingAddress)
.bracket { clientChannel =>
makeFindNodesRequest(message, clientChannel)
sendRequest(message, clientChannel, pf)
} { clientChannel =>
clientChannel.close()
}
}

override def findNodes: Observable[(FindNodes[A], Nodes[A] => Task[Unit])] = {
peerGroup.server().collectChannelCreated.mapTask { channel: Channel[A, KMessage[A]] =>
channel.in
.collect {
case f @ FindNodes(_, _, _) =>
(f, nodesTask(channel))
}
.headL
.timeout(requestTimeout)
.doOnFinish(closeIfAnError(channel))
}
}

private def makeFindNodesRequest(message: FindNodes[A], clientChannel: Channel[A, KMessage[A]]): Task[Nodes[A]] = {
private def sendRequest[Request <: KRequest[A], Response <: KResponse[A]](
message: Request,
clientChannel: Channel[A, KMessage[A]],
pf: PartialFunction[KMessage[A], Response]
): Task[Response] = {
for {
_ <- clientChannel.sendMessage(message).timeout(requestTimeout)
nodes <- clientChannel.in
.collect { case n @ Nodes(_, _, _) => n }
response <- clientChannel.in
.collect(pf)
.headL
.timeout(requestTimeout)
} yield nodes
} yield response
}

private def closeIfAnError(
@@ -76,13 +105,26 @@ object KNetwork {
maybeError.fold(Task.unit)(_ => channel.close())
}

private def nodesTask(
private def sendResponse(
channel: Channel[A, KMessage[A]]
): Nodes[A] => Task[Unit] = { nodes =>
): KMessage[A] => Task[Unit] = { message =>
channel
.sendMessage(nodes)
.sendMessage(message)
.timeout(requestTimeout)
.doOnFinish(_ => channel.close())
}

private def serverTemplate[Request <: KRequest[A], Response <: KResponse[A]](
pf: PartialFunction[KMessage[A], Request]
): Observable[(Request, KMessage[A] => Task[Unit])] = {
peerGroup.server().collectChannelCreated.mapTask { channel: Channel[A, KMessage[A]] =>
channel.in
.collect(pf)
.map(request => (request, sendResponse(channel)))
.headL
.timeout(requestTimeout)
.doOnFinish(closeIfAnError(channel))
}
}
}
}

0 comments on commit e873639

Please sign in to comment.
You can’t perform that action at this time.