Skip to content

Commit

Permalink
[ETCM-211] Tracking of headers/bodies requests that will be ignored d…
Browse files Browse the repository at this point in the history
…ue to invalidation (#749)
  • Loading branch information
Nicolás Tallar committed Oct 27, 2020
1 parent 1667f33 commit ae398e1
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 31 deletions.
Expand Up @@ -9,6 +9,10 @@ import cats.instances.future._
import cats.instances.option._
import cats.syntax.either._
import io.iohk.ethereum.blockchain.sync.PeersClient._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
AwaitingBodiesToBeIgnored,
AwaitingHeadersToBeIgnored
}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain._
Expand Down Expand Up @@ -91,27 +95,51 @@ class BlockFetcher(

private def handleHeadersMessages(state: BlockFetcherState): Receive = {
case Response(peer, BlockHeaders(headers)) if state.isFetchingHeaders =>
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))

val newState = state.validatedHeaders(headers) match {
case Left(err) =>
peersClient ! BlacklistPeer(peer.id, err)
state.fetchingHeaders(false)
case Right(validHeaders) =>
state.appendHeaders(validHeaders)
}
val newState =
if (state.fetchingHeadersState == AwaitingHeadersToBeIgnored) {
log.debug(
"Received {} headers starting from block {} that will be ignored",
headers.size,
headers.headOption.map(_.number)
)
state.withHeaderFetchReceived
} else {
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))

state.validatedHeaders(headers) match {
case Left(err) =>
peersClient ! BlacklistPeer(peer.id, err)
state.withHeaderFetchReceived
case Right(validHeaders) =>
state.withHeaderFetchReceived.appendHeaders(validHeaders)
}
}

fetchBlocks(newState)
case RetryHeadersRequest if state.isFetchingHeaders =>
log.debug("Retrying request for headers")
fetchHeaders(state)
log.debug("Time-out occurred while waiting for headers")

val newState = state.withHeaderFetchReceived
fetchBlocks(newState)
}

private def handleBodiesMessages(state: BlockFetcherState): Receive = {
case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies =>
log.debug("Fetched {} block bodies", bodies.size)
state.addBodies(peer, bodies) |> fetchBlocks
case RetryBodiesRequest if state.isFetchingBodies => fetchBodies(state)
val newState =
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
log.debug("Received {} block bodies that will be ignored", bodies.size)
state.withBodiesFetchReceived
} else {
log.debug("Fetched {} block bodies", bodies.size)
state.withBodiesFetchReceived.addBodies(peer, bodies)
}

fetchBlocks(newState)
case RetryBodiesRequest if state.isFetchingBodies =>
log.debug("Time-out occurred while waiting for bodies")

val newState = state.withBodiesFetchReceived
fetchBlocks(newState)
}

private def handleStateNodeMessages(state: BlockFetcherState): Receive = {
Expand Down Expand Up @@ -207,7 +235,7 @@ class BlockFetcher(
.filter(!_.hasFetchedTopHeader)
.filter(!_.hasReachedSize(syncConfig.maxFetcherQueueSize))
.tap(fetchHeaders)
.map(_.fetchingHeaders(true))
.map(_.withNewHeadersFetch)
.getOrElse(fetcherState)

private def fetchHeaders(state: BlockFetcherState): Unit = {
Expand All @@ -229,10 +257,12 @@ class BlockFetcher(
.filter(!_.isFetchingBodies)
.filter(_.waitingHeaders.nonEmpty)
.tap(fetchBodies)
.map(state => state.fetchingBodies(true))
.map(state => state.withNewBodiesFetch)
.getOrElse(fetcherState)

private def fetchBodies(state: BlockFetcherState): Unit = {
log.debug("Fetching bodies")

val hashes = state.takeHashes(syncConfig.blockBodiesPerRequest)
requestBlockBodies(hashes) pipeTo self
}
Expand Down
Expand Up @@ -12,12 +12,32 @@ import cats.syntax.option._

import scala.collection.immutable.Queue

// scalastyle:off number.of.methods
/**
* State used by the BlockFetcher
*
* @param importer the BlockImporter actor reference
* @param readyBlocks
* @param waitingHeaders
* @param fetchingHeadersState the current state of the headers fetching, whether we
* - haven't fetched any yet
* - are awaiting a response
* - are awaiting a response but it should be ignored due to blocks being invalidated
* @param fetchingBodiesState the current state of the bodies fetching, whether we
* - haven't fetched any yet
* - are awaiting a response
* - are awaiting a response but it should be ignored due to blocks being invalidated
* @param stateNodeFetcher
* @param lastBlock
* @param knownTop
* @param blockProviders
*/
case class BlockFetcherState(
importer: ActorRef,
readyBlocks: Queue[Block],
waitingHeaders: Queue[BlockHeader],
isFetchingHeaders: Boolean,
isFetchingBodies: Boolean,
fetchingHeadersState: FetchingHeadersState,
fetchingBodiesState: FetchingBodiesState,
stateNodeFetcher: Option[StateNodeFetcher],
lastBlock: BigInt,
knownTop: BigInt,
Expand All @@ -28,7 +48,7 @@ case class BlockFetcherState(

def isFetchingStateNode: Boolean = stateNodeFetcher.isDefined

def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty

def hasFetchedTopHeader: Boolean = lastBlock == knownTop

Expand All @@ -53,8 +73,7 @@ case class BlockFetcherState(
def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)

def appendHeaders(headers: Seq[BlockHeader]): BlockFetcherState =
fetchingHeaders(false)
.withPossibleNewTopAt(headers.lastOption.map(_.number))
withPossibleNewTopAt(headers.lastOption.map(_.number))
.copy(
waitingHeaders = waitingHeaders ++ headers.filter(_.number > lastBlock).sortBy(_.number),
lastBlock = HeadersSeq.lastNumber(headers).getOrElse(lastBlock)
Expand Down Expand Up @@ -86,9 +105,11 @@ case class BlockFetcherState(
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
val blocks = matching.zip(bodies).map((Block.apply _).tupled)

fetchingBodies(false)
.withPeerForBlocks(peer.id, blocks.map(_.header.number))
.copy(readyBlocks = readyBlocks.enqueue(blocks), waitingHeaders = waiting)
withPeerForBlocks(peer.id, blocks.map(_.header.number))
.copy(
readyBlocks = readyBlocks.enqueue(blocks),
waitingHeaders = waiting
)
}

def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
Expand Down Expand Up @@ -126,18 +147,25 @@ case class BlockFetcherState(

def invalidateBlocksFrom(nr: BigInt): (Option[PeerId], BlockFetcherState) = invalidateBlocksFrom(nr, Some(nr))

def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) =
def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = {
// We can't start completely from scratch as requests could be in progress, we have to keep special track of them
val newFetchingHeadersState =
if (fetchingHeadersState == AwaitingHeaders) AwaitingHeadersToBeIgnored else fetchingHeadersState
val newFetchingBodiesState =
if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState

(
toBlacklist.flatMap(blockProviders.get),
copy(
readyBlocks = Queue(),
waitingHeaders = Queue(),
lastBlock = (nr - 2).max(0),
isFetchingHeaders = false,
isFetchingBodies = false,
fetchingHeadersState = newFetchingHeadersState,
fetchingBodiesState = newFetchingBodiesState,
blockProviders = blockProviders - nr
)
)
}

def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr)

Expand All @@ -154,9 +182,13 @@ case class BlockFetcherState(
def withPeerForBlocks(peerId: PeerId, blocks: Seq[BigInt]): BlockFetcherState =
copy(blockProviders = blockProviders ++ blocks.map(block => block -> peerId))

def fetchingHeaders(isFetching: Boolean): BlockFetcherState = copy(isFetchingHeaders = isFetching)
def isFetchingHeaders: Boolean = fetchingHeadersState != NotFetchingHeaders
def withNewHeadersFetch: BlockFetcherState = copy(fetchingHeadersState = AwaitingHeaders)
def withHeaderFetchReceived: BlockFetcherState = copy(fetchingHeadersState = NotFetchingHeaders)

def fetchingBodies(isFetching: Boolean): BlockFetcherState = copy(isFetchingBodies = isFetching)
def isFetchingBodies: Boolean = fetchingBodiesState != NotFetchingBodies
def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies)
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NotFetchingBodies)

def fetchingStateNode(hash: ByteString, requestor: ActorRef): BlockFetcherState =
copy(stateNodeFetcher = Some(StateNodeFetcher(hash, requestor)))
Expand Down Expand Up @@ -188,11 +220,31 @@ object BlockFetcherState {
importer = importer,
readyBlocks = Queue(),
waitingHeaders = Queue(),
isFetchingHeaders = false,
isFetchingBodies = false,
fetchingHeadersState = NotFetchingHeaders,
fetchingBodiesState = NotFetchingBodies,
stateNodeFetcher = None,
lastBlock = lastBlock,
knownTop = lastBlock + 1,
blockProviders = Map()
)

trait FetchingHeadersState
case object NotFetchingHeaders extends FetchingHeadersState
case object AwaitingHeaders extends FetchingHeadersState

/**
* Headers request in progress but will be ignored due to invalidation
* State used to keep track of pending request to prevent multiple requests in parallel
*/
case object AwaitingHeadersToBeIgnored extends FetchingHeadersState

trait FetchingBodiesState
case object NotFetchingBodies extends FetchingBodiesState
case object AwaitingBodies extends FetchingBodiesState

/**
* Bodies request in progress but will be ignored due to invalidation
* State used to keep track of pending request to prevent multiple requests in parallel
*/
case object AwaitingBodiesToBeIgnored extends FetchingBodiesState
}
25 changes: 25 additions & 0 deletions src/test/scala/io/iohk/ethereum/BlockHelpers.scala
@@ -0,0 +1,25 @@
package io.iohk.ethereum

import akka.util.ByteString
import io.iohk.ethereum.domain.Block

object BlockHelpers {

def generateChain(amount: Int, parent: Block): Seq[Block] = {
(1 to amount).foldLeft[Seq[Block]](Nil){ case (acc, _) =>
val baseBlock = Fixtures.Blocks.ValidBlock.block

val parentHeader = acc.lastOption.getOrElse(parent)
val blockHeader = baseBlock.header.copy(
number = parentHeader.number + 1,
parentHash = parentHeader.hash,
// Random nonce used for having our blocks be different
nonce = ByteString(Math.random().toString)
)
val block = baseBlock.copy(header = blockHeader)

acc :+ block
}
}

}

0 comments on commit ae398e1

Please sign in to comment.