Skip to content

Commit

Permalink
ETCM-972: Add skeleton for future ExecutionSync - fetching part
Browse files Browse the repository at this point in the history
Scalafmt
  • Loading branch information
AnastasiiaL authored and Leonor Boga committed Jul 29, 2021
1 parent 16703e8 commit 607f456
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/main/resources/conf/base.conf
Expand Up @@ -390,6 +390,9 @@ mantis {
# Number of peers used to reach consensus = min-peers-to-choose-pivot-block + peers-to-choose-pivot-block-margin
peers-to-choose-pivot-block-margin = 1

# Number of peers to fetch the blocks in parallel for execution sync
peers-to-fetch-from = 5

# During fast-sync when most up to date block is determined from peers, the actual target block number
# will be decreased by this value
pivot-block-offset = 32
Expand Down
@@ -1,5 +1,8 @@
package io.iohk.ethereum.blockchain.sync

import java.util.Collections.newSetFromMap

import scala.jdk.CollectionConverters._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
Expand All @@ -9,7 +12,6 @@ import akka.actor.Scheduler

import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag

import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
Expand All @@ -23,6 +25,8 @@ import io.iohk.ethereum.network.p2p.messages.ETH63.GetNodeData
import io.iohk.ethereum.network.p2p.messages.ETH63.NodeData
import io.iohk.ethereum.utils.Config.SyncConfig

import scala.collection.mutable

class PeersClient(
val etcPeerManager: ActorRef,
val peerEventBus: ActorRef,
Expand All @@ -39,6 +43,8 @@ class PeersClient(
val statusSchedule: Cancellable =
scheduler.scheduleWithFixedDelay(syncConfig.printStatusInterval, syncConfig.printStatusInterval, self, PrintStatus)

val activeFetchingNodes: mutable.Set[Peer] = lruSet[Peer](syncConfig.peersToFetchFrom)

def receive: Receive = running(Map())

override def postStop(): Unit = {
Expand Down Expand Up @@ -129,6 +135,25 @@ class PeersClient(

log.debug(s"Handshaked peers status (number of peers: ${handshakedPeersStatus.size}): $handshakedPeersStatus")
}

//returns the next best peer after the one already returned previously
//TODO: make sure the next best peer has a different best block, so we don't fetch in parallel identical branches
def nextBestPeer(peersToDownloadFrom: Map[PeerId, PeerWithInfo]): Option[Peer] = {
val peersToUse = peersToDownloadFrom.values
.collect { case PeerWithInfo(peer, PeerInfo(_, chainWeight, true, _, _)) =>
(peer, chainWeight)
}

val peer =
peersToUse.filter(peerAndWeight => !activeFetchingNodes.contains(peerAndWeight._1)).maxByOption(_._2).map(_._1)
peer.foreach(activeFetchingNodes.add)
peer
}

private def lruSet[A](maxEntries: Int): mutable.Set[A] =
newSetFromMap[A](new java.util.LinkedHashMap[A, java.lang.Boolean]() {
override def removeEldestEntry(eldest: java.util.Map.Entry[A, java.lang.Boolean]): Boolean = size > maxEntries
}).asScala
}

object PeersClient {
Expand Down
Expand Up @@ -307,7 +307,7 @@ class BlockFetcher(
private def fetchHeaders(state: BlockFetcherState): Unit = {
val blockNr = state.nextBlockToFetch
val amount = syncConfig.blockHeadersPerRequest
headersFetcher ! HeadersFetcher.FetchHeaders(blockNr, amount)
headersFetcher ! HeadersFetcher.FetchHeaders(Left(blockNr), amount)
}

private def tryFetchBodies(fetcherState: BlockFetcherState): BlockFetcherState =
Expand Down
@@ -0,0 +1,14 @@
package io.iohk.ethereum.blockchain.sync.regular

import cats.data.NonEmptyList
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{BlockImportType, ImportFn, ResolvingBranch}
import io.iohk.ethereum.domain.Block

//not used atm, a part of the future ExecutionSync
class BlockImporterService() {

private def importBlocks(blocks: NonEmptyList[Block], blockImportType: BlockImportType) = ???

private def importBlock(block: Block) = ???

}
@@ -0,0 +1,28 @@
package io.iohk.ethereum.blockchain.sync.regular

import cats.data.NonEmptyList
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.BlockImportType
import io.iohk.ethereum.domain.Block
import io.iohk.ethereum.network.Peer
import monix.eval.Task

//not used atm, a part of the future ExecutionSync
object ChainManagerService {

type ValidationError
type Branch

//saves blocks to the storage
def processBlocksFromFetcher(peer: Peer): Task[Either[ValidationError, Branch]] = ???

//saves blocks to the storage
def processMinedBlocks(): Task[Either[ValidationError, Branch]] = ???

private def resolvingMissingNode(blocksToRetry: NonEmptyList[Block], blockImportType: BlockImportType) = ???

private def buildBranch() = ???

private def resolveBranch() = ???

private def discriminateBranch() = ???
}
@@ -0,0 +1,86 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.util.ByteString
import cats.data.EitherT
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, BlockchainReader}
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.p2p.messages.ETH62.{BlockBodies, BlockHeaders}
import monix.eval.Task
import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.RequestFailed
import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.utils.Config.SyncConfig
import cats.implicits._

import scala.annotation.tailrec

//not used atm, a part of the future ExecutionSync
class FetcherService(validator: BlockValidator, blockchainReader: BlockchainReader, syncConfig: SyncConfig) {
val batchSize = syncConfig.blockHeadersPerRequest

private def requestHeaders(
block: Either[BigInt, ByteString],
amount: BigInt
): Task[Either[RequestFailed, BlockHeaders]] = ???

private def requestBodies(hashes: Seq[ByteString]): Task[Either[RequestFailed, BlockBodies]] = ???

private def requestStateNode(hash: ByteString): Task[Either[RequestFailed, Seq[ByteString]]] = ???

private def placeBlockInQueue(block: Block, peer: Peer): Unit = ???

def fetchBlocksUntil(
peer: Peer,
startFromBlock: Either[BigInt, ByteString],
fetchUntilBlock: Either[BigInt, ByteString]
): EitherT[Task, RequestFailed, Unit] = {
val endNumber: Option[BigInt] =
fetchUntilBlock.fold(Some(_), blockchainReader.getBlockHeaderByHash(_).map(_.number))
val startNumber: Option[BigInt] =
startFromBlock.fold(Some(_), blockchainReader.getBlockHeaderByHash(_).map(_.number))

//TODO: make sure negatives are not possible
val startBatchBlocks: Option[Seq[BigInt]] =
for {
start <- startNumber
end <- endNumber
} yield start.to(end, batchSize)

startBatchBlocks match {
case None => EitherT.leftT(RequestFailed(peer, "couldn't find blocks to fetch"))
case Some(seq) => seq.traverse(num => fetchBlocks(peer, batchSize, Left(num))).map(_ => ())
}
}

private def fetchBlocks(
peer: Peer,
amount: BigInt,
block: Either[BigInt, ByteString]
): EitherT[Task, RequestFailed, Unit] =
for {
headers <- EitherT(requestHeaders(block, amount))
bodies <- EitherT(requestBodies(headers.headers.map(_.hash)))
blocks <- EitherT.fromOption[Task](
bodiesAreOrderedSubsetOfRequested(headers.headers, bodies.bodies),
RequestFailed(peer, "Unmatching bodies")
)
_ = blocks.foreach(placeBlockInQueue(_, peer))
} yield ()

// Checks that the received block bodies are an ordered subset of the ones requested
@tailrec
private def bodiesAreOrderedSubsetOfRequested(
requestedHeaders: Seq[BlockHeader],
respondedBodies: Seq[BlockBody],
matchedBlocks: Seq[Block] = Nil
): Option[Seq[Block]] =
(requestedHeaders, respondedBodies) match {
case (Seq(), _ +: _) => None
case (_, Seq()) => Some(matchedBlocks)
case (header +: remainingHeaders, body +: remainingBodies) =>
val doMatch = validator.validateHeaderAndBody(header, body).isRight
if (doMatch)
bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block(header, body))
else
bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks)
}
}
Expand Up @@ -5,15 +5,13 @@ import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.{ActorRef => ClassicActorRef}

import akka.util.ByteString
import monix.eval.Task
import monix.execution.Scheduler

import scala.util.Failure
import scala.util.Success

import org.slf4j.Logger

import io.iohk.ethereum.blockchain.sync.PeersClient.BestPeer
import io.iohk.ethereum.blockchain.sync.PeersClient.Request
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.FetchCommand
Expand Down Expand Up @@ -41,9 +39,9 @@ class HeadersFetcher(

override def onMessage(message: HeadersFetcherCommand): Behavior[HeadersFetcherCommand] =
message match {
case FetchHeaders(blockNumber: BigInt, amount: BigInt) =>
log.debug("Start fetching headers from block {}", blockNumber)
requestHeaders(blockNumber, amount)
case FetchHeaders(block: Either[BigInt, ByteString], amount: BigInt) =>
log.debug("Start fetching headers from block {}", block)
requestHeaders(block, amount)
Behaviors.same
case AdaptedMessage(peer, BlockHeaders(headers)) =>
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
Expand All @@ -55,9 +53,9 @@ class HeadersFetcher(
case _ => Behaviors.unhandled
}

private def requestHeaders(blockNr: BigInt, amount: BigInt): Unit = {
log.debug("Fetching headers from block {}", blockNr)
val msg = GetBlockHeaders(Left(blockNr), amount, skip = 0, reverse = false)
private def requestHeaders(block: Either[BigInt, ByteString], amount: BigInt): Unit = {
log.debug("Fetching headers from block {}", block)
val msg = GetBlockHeaders(block, amount, skip = 0, reverse = false)

val resp = makeRequest(Request.create(msg, BestPeer), HeadersFetcher.RetryHeadersRequest)
.flatMap {
Expand All @@ -84,7 +82,7 @@ object HeadersFetcher {
Behaviors.setup(context => new HeadersFetcher(peersClient, syncConfig, supervisor, context))

sealed trait HeadersFetcherCommand
final case class FetchHeaders(blockNumber: BigInt, amount: BigInt) extends HeadersFetcherCommand
final case class FetchHeaders(block: Either[BigInt, ByteString], amount: BigInt) extends HeadersFetcherCommand
final case object RetryHeadersRequest extends HeadersFetcherCommand
final private case class AdaptedMessage[T <: Message](peer: Peer, msg: T) extends HeadersFetcherCommand
}
2 changes: 2 additions & 0 deletions src/main/scala/io/iohk/ethereum/utils/Config.scala
Expand Up @@ -120,6 +120,7 @@ object Config {
nodesPerRequest: Int,
minPeersToChoosePivotBlock: Int,
peersToChoosePivotBlockMargin: Int,
peersToFetchFrom: Int,
pivotBlockOffset: Int,
persistStateSnapshotInterval: FiniteDuration,
blocksBatchSize: Int,
Expand Down Expand Up @@ -166,6 +167,7 @@ object Config {
nodesPerRequest = syncConfig.getInt("nodes-per-request"),
minPeersToChoosePivotBlock = syncConfig.getInt("min-peers-to-choose-pivot-block"),
peersToChoosePivotBlockMargin = syncConfig.getInt("peers-to-choose-pivot-block-margin"),
peersToFetchFrom = syncConfig.getInt("peers-to-fetch-from"),
pivotBlockOffset = syncConfig.getInt("pivot-block-offset"),
persistStateSnapshotInterval = syncConfig.getDuration("persist-state-snapshot-interval").toMillis.millis,
blocksBatchSize = syncConfig.getInt("blocks-batch-size"),
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/application.conf
Expand Up @@ -123,6 +123,7 @@ mantis {
nodes-per-request = 10
min-peers-to-choose-pivot-block = 2
peers-to-choose-pivot-block-margin = 1
peers-to-fetch-from = 5
pivot-block-offset = 500
state-sync-bloom-filter-size = 20000

Expand Down
Expand Up @@ -27,6 +27,7 @@ trait TestSyncConfig extends SyncConfigBuilder {
receiptsPerRequest = 10,
minPeersToChoosePivotBlock = 2,
peersToChoosePivotBlockMargin = 0,
peersToFetchFrom = 5,
peerResponseTimeout = 1.second,
peersScanInterval = 1.hour,
fastSyncThrottle = 100.milliseconds,
Expand Down

0 comments on commit 607f456

Please sign in to comment.