Skip to content

Commit

Permalink
ETCM-207: Test fetchEnr
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Oct 15, 2020
1 parent 611df2b commit 0bf7c42
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 27 deletions.
Expand Up @@ -112,17 +112,31 @@ object DiscoveryService {
// Last time a peer responded with a Pong to our Ping.
lastPongTimestampMap: Map[Peer[A], Timestamp],
// Deferred results so we can ensure there's only one concurrent Ping to a given peer.
bondingResultsMap: Map[Peer[A], BondingResults]
bondingResultsMap: Map[Peer[A], BondingResults],
// Deferred ENR fetches so we only do one at a time to a given peer.
fetchEnrMap: Map[Peer[A], Deferred[Task, Unit]]
) {
def withLastPongTimestamp(peer: Peer[A], timestamp: Timestamp): State[A] =
copy(lastPongTimestampMap = lastPongTimestampMap.updated(peer, timestamp))

def withBondingResults(peer: Peer[A], results: BondingResults): State[A] =
copy(bondingResultsMap = bondingResultsMap.updated(peer, results))

def withEnrAndAddress(peer: Peer[A], enr: EthereumNodeRecord, address: Node.Address): State[A] =
copy(
enrMap = enrMap.updated(peer.id, enr),
nodeMap = nodeMap.updated(peer.id, Node(peer.id, address))
)

def clearBondingResults(peer: Peer[A]): State[A] =
copy(bondingResultsMap = bondingResultsMap - peer)

def withEnrFetch(peer: Peer[A], result: Deferred[Task, Unit]): State[A] =
copy(fetchEnrMap = fetchEnrMap.updated(peer, result))

def clearEnrFetch(peer: Peer[A]): State[A] =
copy(fetchEnrMap = fetchEnrMap - peer)

def removePeer(peer: Peer[A]): State[A] =
copy(
nodeMap = nodeMap - peer.id,
Expand All @@ -144,7 +158,8 @@ object DiscoveryService {
nodeMap = Map(node.id -> node),
enrMap = Map(node.id -> enr),
lastPongTimestampMap = Map.empty[Peer[A], Timestamp],
bondingResultsMap = Map.empty[Peer[A], BondingResults]
bondingResultsMap = Map.empty[Peer[A], BondingResults],
fetchEnrMap = Map.empty[Peer[A], Deferred[Task, Unit]]
)
}

Expand Down Expand Up @@ -346,24 +361,50 @@ object DiscoveryService {
} yield ()

/** Fetch a fresh ENR from the peer and store it. */
protected[v4] def fetchEnr(peer: Peer[A]): Task[Unit] =
rpc.enrRequest(peer)(()).flatMap {
case None =>
Task.unit
protected[v4] def fetchEnr(peer: Peer[A]): Task[Unit] = {
val waitOrFetch =
for {
d <- Deferred[Task, Unit]
decision <- stateRef.modify { state =>
state.fetchEnrMap.get(peer) match {
case Some(d) =>
state -> Left(d)
case None =>
state.withEnrFetch(peer, d) -> Right(d)
}
}
} yield decision

case Some(enr) =>
EthereumNodeRecord.validateSignature(enr, publicKey = peer.id) match {
case Attempt.Successful(true) =>
tryUpdateEnr(peer, enr)
waitOrFetch.flatMap {
case Left(wait) =>
wait.get

case Attempt.Successful(false) =>
Task(logger.debug("Could not validate ENR signature!")) >>
removePeer(peer)
case Right(fetch) =>
rpc
.enrRequest(peer)(())
.flatMap {
case None =>
Task.unit

case Attempt.Failure(err) =>
Task(logger.error(s"Error validateing ENR: $err"))
}
case Some(enr) =>
EthereumNodeRecord.validateSignature(enr, publicKey = peer.id) match {
case Attempt.Successful(true) =>
tryUpdateEnr(peer, enr)

case Attempt.Successful(false) =>
Task(logger.debug("Could not validate ENR signature!")) >>
removePeer(peer)

case Attempt.Failure(err) =>
Task(logger.error(s"Error validateing ENR: $err"))
}
}
.guarantee {
stateRef.update(_.clearEnrFetch(peer)) >>
fetch.complete(())
}
}
}

/** Try to extract the node address from the ENR record and update the node database,
* otherwise if there's no address we can use remove the peer.
Expand All @@ -373,12 +414,7 @@ object DiscoveryService {
case None =>
Task(logger.debug(s"Could not extract node address from $enr")) >> removePeer(peer)
case Some(address) =>
stateRef.update { state =>
state.copy(
enrMap = state.enrMap.updated(peer.id, enr),
nodeMap = state.nodeMap.updated(peer.id, Node(peer.id, address))
)
}
stateRef.update(_.withEnrAndAddress(peer, enr, address))
}

/** Forget everything about this peer. */
Expand Down
@@ -1,6 +1,7 @@
package io.iohk.scalanet.discovery.ethereum.v4

import cats.effect.concurrent.Ref
import io.iohk.scalanet.discovery.crypto.{PublicKey, Signature}
import io.iohk.scalanet.discovery.ethereum.{EthereumNodeRecord, Node}
import io.iohk.scalanet.discovery.ethereum.codecs.DefaultCodecs
import io.iohk.scalanet.discovery.ethereum.v4.mocks.MockSigAlg
Expand All @@ -9,9 +10,9 @@ import io.iohk.scalanet.NetUtils.aRandomAddress
import java.net.InetSocketAddress
import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.atomic.AtomicInt
import org.scalatest._
import scala.concurrent.duration._
import io.iohk.scalanet.discovery.crypto.PublicKey

class DiscoveryServiceSpec extends AsyncFlatSpec with Matchers {
import DiscoveryService.{State, BondingResults}
Expand Down Expand Up @@ -240,7 +241,7 @@ class DiscoveryServiceSpec extends AsyncFlatSpec with Matchers {

trait BondingFixture extends Fixture {
override lazy val config = defaultConfig.copy(
requestTimeout = 100.millis
requestTimeout = 100.millis // To not wait for pings during bonding.
)
override lazy val rpc = unimplementedRPC.copy(
ping = _ => _ => Task.pure(Some(None)),
Expand Down Expand Up @@ -307,10 +308,76 @@ class DiscoveryServiceSpec extends AsyncFlatSpec with Matchers {
}
}

behavior of "maybeFetchEnr"

it should "not fetch if the record we have is at least as new" in test {
new Fixture {
override val test = for {
_ <- stateRef.update(_.withEnrAndAddress(remotePeer, remoteENR, remoteNode.address))
_ <- service.maybeFetchEnr(remotePeer, Some(remoteENR.content.seq))
} yield {
succeed // Would have failed if it called the RPC.
}
}
}

behavior of "fetchEnr"
it should "only initiate one fetch at a time" in (pending)
it should "validate that the packet sender signed the ENR" in (pending)
it should "remove the node if the validation fails" in (pending)

it should "only initiate one fetch at a time" in test {
new Fixture {
val callCount = AtomicInt(0)

override lazy val rpc = unimplementedRPC.copy(
enrRequest = _ =>
_ =>
Task {
callCount.increment()
Some(remoteENR)
}.delayExecution(100.millis) // Delay so the first is still running when the second is started.
)

override val test = for {
_ <- Task.parSequenceUnordered(
List.fill(5)(service.fetchEnr(remotePeer))
)
} yield {
callCount.get shouldBe 1
}
}
}

it should "update the ENR and node maps" in test {
new Fixture {
override lazy val rpc = unimplementedRPC.copy(
enrRequest = _ => _ => Task(Some(remoteENR))
)
override val test = for {
_ <- service.fetchEnr(remotePeer)
state <- stateRef.get
} yield {
state.fetchEnrMap should not contain key(remotePeer)
state.nodeMap should contain key (remotePeer.id)
state.enrMap(remotePeer.id) shouldBe remoteENR
}
}
}

it should "remove the node if the ENR signature validation fails" in test {
new Fixture {
override lazy val rpc = unimplementedRPC.copy(
enrRequest = _ => _ => Task(Some(remoteENR.copy(signature = Signature(remoteENR.signature.reverse))))
)
override val test = for {
_ <- stateRef.update(_.withEnrAndAddress(remotePeer, remoteENR, remoteNode.address))
_ <- service.fetchEnr(remotePeer)
state <- stateRef.get
} yield {
state.fetchEnrMap should not contain key(remotePeer)
state.nodeMap should not contain key(remotePeer.id)
state.enrMap should not contain key(remotePeer.id)
}
}
}

behavior of "ping"
it should "complete bonding processes waiting for that ping" in (pending)
Expand Down

0 comments on commit 0bf7c42

Please sign in to comment.