Skip to content

Commit

Permalink
ETCM-196: DiscoveryConfig to hold misc settings.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Oct 15, 2020
1 parent e817579 commit 04d3469
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 41 deletions.
@@ -0,0 +1,23 @@
package io.iohk.scalanet.discovery.ethereum.v4

import scala.concurrent.duration._

case class DiscoveryConfig(
// How long in the future to set message expiration.
messageExpiration: FiniteDuration,
// Timeout for individual requests.
requestTimeout: FiniteDuration,
// Timeout for collecting multiple potential Neighbors responses.
kademliaTimeout: FiniteDuration,
// Max number of neighbours to expect.
kademliaBucketSize: Int
)

object DiscoveryConfig {
val default = DiscoveryConfig(
messageExpiration = 60.seconds,
requestTimeout = 3.seconds,
kademliaTimeout = 7.seconds,
kademliaBucketSize = 16
)
}
Expand Up @@ -42,20 +42,14 @@ object DiscoveryNetwork {
peerGroup: PeerGroup[A, Packet],
privateKey: PrivateKey,
toNodeAddress: A => Node.Address,
messageExpiration: FiniteDuration = 60.seconds,
// Timeout for individual requests.
requestTimeout: FiniteDuration = 3.seconds,
// Timeout for collecting multiple potential Neighbors responses.
kademliaTimeout: FiniteDuration = 7.seconds,
// Max number of neighbours to expect.
kademliaBucketSize: Int = 16
config: DiscoveryConfig
)(implicit codec: Codec[Payload], sigalg: SigAlg, clock: Clock[Task]): Task[DiscoveryNetwork[A]] = Task {
new DiscoveryNetwork[A] with LazyLogging {

import DiscoveryRPC.ENRSeq
import Payload._

private val expirationMillis = messageExpiration.toMillis
private val expirationMillis = config.messageExpiration.toMillis

private val currentTimeMillis = clock.realTime(MILLISECONDS)

Expand Down Expand Up @@ -102,7 +96,7 @@ object DiscoveryNetwork {
channel
.nextMessage()
.withCancelToken(cancelToken)
.timeout(messageExpiration) // Messages older than this would be ignored anyway.
.timeout(config.messageExpiration) // Messages older than this would be ignored anyway.
.toIterant
.mapEval {
case MessageReceived(packet) =>
Expand Down Expand Up @@ -159,7 +153,7 @@ object DiscoveryNetwork {
handler.findNode(caller)(target)
} { nodes =>
nodes
.take(kademliaBucketSize)
.take(config.kademliaBucketSize)
.grouped(maxNeighborsPerPacket)
.toList
.traverse { group =>
Expand Down Expand Up @@ -226,11 +220,11 @@ object DiscoveryNetwork {
(target: PublicKey) =>
peerGroup.client(to).use { channel =>
channel.send(FindNode(target, 0)).flatMap { _ =>
channel.collectAndFoldResponses(kademliaTimeout, Vector.empty[Node]) {
channel.collectAndFoldResponses(config.kademliaTimeout, Vector.empty[Node]) {
case Neighbors(nodes, _) => nodes
} { (acc, nodes) =>
val found = (acc ++ nodes).take(kademliaBucketSize)
if (found.size < kademliaBucketSize) Left(found) else Right(found)
val found = (acc ++ nodes).take(config.kademliaBucketSize)
if (found.size < config.kademliaBucketSize) Left(found) else Right(found)
}
}
}
Expand Down Expand Up @@ -268,7 +262,7 @@ object DiscoveryNetwork {
)(pf: PartialFunction[Payload.Response, T]): Iterant[Task, T] =
channel
.nextMessage()
.timeoutL(Task(requestTimeout.min(deadline.timeLeft)))
.timeoutL(Task(config.requestTimeout.min(deadline.timeLeft)))
.toIterant
.collect {
case MessageReceived(packet) => packet
Expand Down Expand Up @@ -304,7 +298,7 @@ object DiscoveryNetwork {
/** Collect the first response that matches the partial function or return None if one cannot be found */
def collectFirstResponse[T](pf: PartialFunction[Payload.Response, T]): Task[Option[T]] =
channel
.collectResponses(requestTimeout.fromNow)(pf)
.collectResponses(config.requestTimeout.fromNow)(pf)
.headOptionL
.onErrorRecover {
case NonFatal(ex) => None
Expand Down
Expand Up @@ -185,7 +185,9 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {

it should "return Some Nodes if the peer responds" in test {
new Fixture {
override val kademliaTimeout: FiniteDuration = 250.millis
override lazy val config = defaultConfig.copy(
kademliaTimeout = 250.millis
)

override val test = for {
finding <- network.findNode(remoteAddress)(remotePublicKey).start
Expand All @@ -205,10 +207,12 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {

it should "collect responses up to the timeout" in test {
new Fixture {
override val kademliaTimeout: FiniteDuration = 500.millis
override val kademliaBucketSize: Int = 16
override lazy val config = defaultConfig.copy(
kademliaTimeout = 500.millis,
kademliaBucketSize = 16
)

val randomNodes = List.fill(kademliaBucketSize)(randomNode)
val randomNodes = List.fill(config.kademliaBucketSize)(randomNode)

override val test = for {
finding <- network.findNode(remoteAddress)(remotePublicKey).start
Expand All @@ -222,7 +226,7 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {

_ <- send(randomNodes.take(3))
_ <- send(randomNodes.drop(3).take(7))
_ <- send(randomNodes.drop(10)).delayExecution(kademliaTimeout + 50.millis)
_ <- send(randomNodes.drop(10)).delayExecution(config.kademliaTimeout + 50.millis)

nodes <- finding.join
} yield {
Expand All @@ -233,10 +237,12 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {

it should "collect responses up to the bucket size" in test {
new Fixture {
override val kademliaTimeout: FiniteDuration = 7.seconds
override val kademliaBucketSize: Int = 16
override lazy val config = defaultConfig.copy(
kademliaTimeout = 7.seconds,
kademliaBucketSize = 16
)

val randomGroups = List.fill(kademliaBucketSize + 6)(randomNode).grouped(6).toList
val randomGroups = List.fill(config.kademliaBucketSize + 6)(randomNode).grouped(6).toList

override val test = for {
finding <- network.findNode(remoteAddress)(remotePublicKey).start
Expand All @@ -253,16 +259,18 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {
nodes <- finding.join
} yield {
nodes should not be empty
nodes.get should have size kademliaBucketSize
nodes.get shouldBe randomGroups.flatten.take(kademliaBucketSize)
nodes.get should have size config.kademliaBucketSize
nodes.get shouldBe randomGroups.flatten.take(config.kademliaBucketSize)
}
}
}

it should "ignore expired neighbors" in test {
new Fixture {
override val kademliaTimeout: FiniteDuration = 7.seconds
override val kademliaBucketSize: Int = 16
override lazy val config = defaultConfig.copy(
kademliaTimeout = 7.seconds,
kademliaBucketSize = 16
)

override val test = for {
finding <- network.findNode(remoteAddress)(remotePublicKey).start
Expand Down Expand Up @@ -434,12 +442,14 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {

it should "close idle channels" in test {
new Fixture {
override val messageExpiration: FiniteDuration = 500.millis
override lazy val config = defaultConfig.copy(
messageExpiration = 500.millis
)

override val test = for {
_ <- network.startHandling(StubDiscoveryRPC())
channel <- peerGroup.createServerChannel(from = remoteAddress)
_ <- Task.sleep(messageExpiration + 100.millis)
_ <- Task.sleep(config.messageExpiration + 100.millis)
} yield {
channel.isClosed shouldBe true
}
Expand Down Expand Up @@ -494,7 +504,7 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {

it should "respond with multiple unexpired Neighbors each within the packet size limit, in total no more than the bucket size, if the handler returns Some Nodes" in test {
new Fixture {
val randomNodes = List.fill(kademliaBucketSize * 2)(randomNode)
val randomNodes = List.fill(config.kademliaBucketSize * 2)(randomNode)

override val test = for {
_ <- network.startHandling {
Expand Down Expand Up @@ -528,7 +538,7 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {
nodes
}
}
nodes.flatten shouldBe randomNodes.take(kademliaBucketSize)
nodes.flatten shouldBe randomNodes.take(config.kademliaBucketSize)
}
}
}
Expand Down Expand Up @@ -729,14 +739,18 @@ object DiscoveryNetworkSpec extends Matchers {
tcpPort = address.getPort
)

val defaultConfig = DiscoveryConfig(
requestTimeout = 100.millis,
messageExpiration = 60.seconds,
kademliaTimeout = 250.millis,
kademliaBucketSize = 16
)

trait Fixture {
// Implement `test` to assert something.
def test: Task[Assertion]

val requestTimeout = 100.millis
val messageExpiration = 60.seconds
val kademliaTimeout = 250.millis
val kademliaBucketSize = 16
lazy val config = defaultConfig

lazy val localAddress = aRandomAddress
// Keys for the System Under Test.
Expand Down Expand Up @@ -776,17 +790,14 @@ object DiscoveryNetworkSpec extends Matchers {
peerGroup = peerGroup,
privateKey = privateKey,
toNodeAddress = toNodeAddress,
messageExpiration = messageExpiration,
requestTimeout = requestTimeout,
kademliaTimeout = kademliaTimeout,
kademliaBucketSize = kademliaBucketSize
config = config
).runSyncUnsafe()

def assertExpirationSet(expiration: Long) =
expiration shouldBe (System.currentTimeMillis + messageExpiration.toMillis) +- 3000
expiration shouldBe (System.currentTimeMillis + config.messageExpiration.toMillis) +- 3000

def validExpiration =
System.currentTimeMillis + messageExpiration.toMillis
System.currentTimeMillis + config.messageExpiration.toMillis

// Anything in the past is invalid.
def invalidExpiration =
Expand Down

0 comments on commit 04d3469

Please sign in to comment.