Skip to content

Commit

Permalink
[ETCM-316] Add more tests and fix binary search logic
Browse files Browse the repository at this point in the history
  • Loading branch information
1015bit committed Feb 28, 2021
1 parent 3c9e4d6 commit 8834451
Show file tree
Hide file tree
Showing 6 changed files with 450 additions and 92 deletions.
Expand Up @@ -7,23 +7,24 @@ import cats.Applicative
import cats.implicits._
import io.iohk.ethereum.domain.BlockHeader
import cats.data.NonEmptyList
import io.iohk.ethereum.blockchain.sync.fast.FastSyncBranchResolverActor.SearchState

import com.typesafe.scalalogging
import org.slf4j.LoggerFactory
import io.iohk.ethereum.utils.Logger
import io.iohk.ethereum.network.Peer

trait FastSyncBranchResolver {

protected def blockchain: Blockchain

def parentOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber - 1
import FastSyncBranchResolver._

def childOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber + 1
protected def blockchain: Blockchain

// TODO [ETCM-676] move to [[Blockchain]] and make sure it's atomic
def discardBlocksAfter(lastValidBlock: BigInt): Unit =
discardBlocks(lastValidBlock, blockchain.getBestBlockNumber())

def discardBlocks(fromBlock: BigInt, toBlock: BigInt): Unit = {
// TODO [ETCM-676] move to [[Blockchain]] and make sure it's atomic
private def discardBlocks(fromBlock: BigInt, toBlock: BigInt): Unit = {
val blocksToBeRemoved = childOf(fromBlock).to(toBlock).reverse.toList
blocksToBeRemoved.foreach { toBeRemoved =>
blockchain
Expand All @@ -34,19 +35,28 @@ trait FastSyncBranchResolver {

}

object FastSyncBranchResolver {

/**
* Stores the current search state for binary search.
* Meaning we know the first common block lies between minBlockNumber and maxBlockNumber.
*/
final case class SearchState(minBlockNumber: BigInt, maxBlockNumber: BigInt, masterPeer: Peer)

def parentOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber - 1
def childOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber + 1
}

/**
* Attempt to find last common block within recent blocks by looking for a parent/child
* relationship between our block headers and remote peer's block headers.
*/
trait RecentBlocksSearchSupport {
self: FastSyncBranchResolver =>

protected def recentHeadersSize: Int
class RecentBlocksSearch(blockchain: Blockchain) {

/**
* Find the first common block by trying to find a block so that our block n is the parent of remote block n + 1
* Find the highest common block by trying to find a block so that our block n is the parent of remote candidate block n + 1
*/
def getFirstCommonBlock(
def getHighestCommonBlock(
candidateHeaders: Seq[BlockHeader],
bestBlockNumber: BigInt
): Option[BigInt] = {
Expand All @@ -63,35 +73,65 @@ trait RecentBlocksSearchSupport {

}

trait BinarySearchSupport {
self: FastSyncBranchResolver =>
object BinarySearchSupport extends Logger {
import FastSyncBranchResolver._

sealed trait BinarySearchResult
final case class BinarySearchCompleted(highestCommonBlockNumber: BigInt) extends BinarySearchResult
final case class ContinueBinarySearch(searchState: SearchState) extends BinarySearchResult
case object NoCommonBlock extends BinarySearchResult

import BinarySearchSupport._
/**
* Returns the block number in the middle between min and max.
* If there is no middle, it will return the lower value.
*
* E.g. calling this method with min = 3 and max = 6 will return 4
*/
def middleBlockNumber(min: BigInt, max: BigInt): BigInt = (min + max) / 2

private val logger: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
def blockHeaderNumberToRequest(min: BigInt, max: BigInt): BigInt =
childOf(middleBlockNumber(min, max))

protected def validateBlockHeaders(
def validateBlockHeadersFoo(
parentBlockHeader: BlockHeader,
childBlockHeader: BlockHeader,
searchState: SearchState
): BinarySearchResult = {
logger.debug(s"$searchState")
log.debug(s"$searchState")
if (parentBlockHeader.isParentOf(childBlockHeader)) {
if (parentBlockHeader.number == searchState.minBlockNumber)
BinarySearchCompleted(parentBlockHeader.number)
else ContinueBinarySearch(searchState.copy(minBlockNumber = childOf(parentBlockHeader.number)))
} else if (parentBlockHeader.number == searchState.minBlockNumber)
else ContinueBinarySearch(searchState.copy(minBlockNumber = parentBlockHeader.number))
} else if (parentBlockHeader.number == searchState.minBlockNumber && searchState.minBlockNumber == 0)
NoCommonBlock
else if (parentBlockHeader.number == searchState.minBlockNumber)
BinarySearchCompleted(parentOf(parentBlockHeader.number))
else
ContinueBinarySearch(searchState.copy(maxBlockNumber = parentBlockHeader.number))
}

}

object BinarySearchSupport {

sealed trait BinarySearchResult
final case class BinarySearchCompleted(firstCommonBlockNumber: BigInt) extends BinarySearchResult
final case class ContinueBinarySearch(searchState: SearchState) extends BinarySearchResult
def validateBlockHeaders(
parentBlockHeader: BlockHeader,
childBlockHeader: BlockHeader,
searchState: SearchState
): BinarySearchResult = {
val childNum = childBlockHeader.number
val parentNum = parentBlockHeader.number
val min = searchState.minBlockNumber
val max = searchState.maxBlockNumber

log.debug(s"$searchState")

if (parentBlockHeader.isParentOf(childBlockHeader)) { // chains are still aligned but there might be an even better block
if (parentNum == max) BinarySearchCompleted(parentNum)
else if (parentNum == min && childNum == max) ContinueBinarySearch(searchState.copy(minBlockNumber = childNum))
else ContinueBinarySearch(searchState.copy(minBlockNumber = parentNum))
} else { // no parent/child -> chains have diverged before parent block
if (min == 1 && max == 2) NoCommonBlock
else if (min == max) BinarySearchCompleted(parentOf(parentNum))
// else if (parentNum == max) BinarySearchCompleted(parentNum)
else ContinueBinarySearch(searchState.copy(maxBlockNumber = parentOf(parentNum)))
}
}

}
Expand Up @@ -32,14 +32,16 @@ class FastSyncBranchResolverActor(
) extends Actor
with ActorLogging
with FastSyncBranchResolver
with RecentBlocksSearchSupport
with BinarySearchSupport
with PeerListSupportNg
with Timers {

import FastSyncBranchResolverActor._
import FastSyncBranchResolver._
import BinarySearchSupport._

override protected val recentHeadersSize: Int = syncConfig.blockHeadersPerRequest
private val recentHeadersSize: Int = syncConfig.blockHeadersPerRequest

private val recentBlocksSearch: RecentBlocksSearch = new RecentBlocksSearch(blockchain)

override def receive: Receive = waitingForPeerWithHighestBlock

Expand All @@ -61,7 +63,7 @@ class FastSyncBranchResolverActor(
case ResponseReceived(peer, BlockHeaders(headers), timeTaken) if peer == masterPeer =>
if (headers.size == recentHeadersSize) {
log.debug("Received {} block headers from peer {} in {} ms", headers.size, masterPeer.id, timeTaken)
verifyRecentBlockHeadersResponse(headers, masterPeer, bestBlockNumber)
handleRecentBlockHeadersResponse(headers, masterPeer, bestBlockNumber)
} else {
handleInvalidResponse(peer, requestHandler)
}
Expand All @@ -74,7 +76,6 @@ class FastSyncBranchResolverActor(
blockHeaderNumberToSearch: BigInt,
requestHandler: ActorRef
): Receive = {
import BinarySearchSupport._
handlePeerListMessages orElse {
case ResponseReceived(peer, BlockHeaders(headers), durationMs) if peer == searchState.masterPeer =>
context.unwatch(requestHandler)
Expand All @@ -83,20 +84,10 @@ class FastSyncBranchResolverActor(
log.debug(
s"Received requested block header [$blockHeaderNumberToSearch] from peer [${peer.id}] in $durationMs ms"
)
blockchain.getBlockHeaderByNumber(parentOf(childHeader.number)) match {
case Some(parentHeader) =>
validateBlockHeaders(parentHeader, childHeader, searchState) match {
case BinarySearchCompleted(firstCommonBlockNumber) =>
finalizeBranchResolver(firstCommonBlockNumber, searchState.masterPeer)
case ContinueBinarySearch(newSearchState) =>
log.debug(s"Continuing binary search with new search state: $newSearchState")
requestBlockHeader(newSearchState)
}
case None => stopWithFailure(BranchResolutionFailed.blockHeaderNotFound(blockHeaderNumberToSearch))
}
handleBinarySearchBlockHeaderResponse(searchState, childHeader)
case _ =>
log.warning(
s"Received invalid block header response when searching block header [$blockHeaderNumberToSearch]: $headers"
s"Received invalid response when requesting block header [$blockHeaderNumberToSearch]: $headers"
)
handleInvalidResponse(peer, requestHandler)
}
Expand All @@ -119,33 +110,48 @@ class FastSyncBranchResolverActor(
* Searches recent blocks for a valid parent/child relationship.
* If we dont't find one, we switch to binary search.
*/
private def verifyRecentBlockHeadersResponse(
private def handleRecentBlockHeadersResponse(
blockHeaders: Seq[BlockHeader],
masterPeer: Peer,
bestBlockNumber: BigInt
): Unit = {
getFirstCommonBlock(blockHeaders, bestBlockNumber) match {
case Some(firstCommonBlockNumber) =>
finalizeBranchResolver(firstCommonBlockNumber, masterPeer)
recentBlocksSearch.getHighestCommonBlock(blockHeaders, bestBlockNumber) match {
case Some(highestCommonBlockNumber) =>
finalizeBranchResolver(highestCommonBlockNumber, masterPeer)
case None =>
log.info(SwitchToBinarySearchLog, recentHeadersSize)
requestBlockHeader(
SearchState(minBlockNumber = 0, maxBlockNumber = bestBlockNumber, masterPeer)
requestBlockHeaderForBinarySearch(
SearchState(minBlockNumber = 1, maxBlockNumber = bestBlockNumber, masterPeer)
)
}
}

private def requestBlockHeader(searchState: SearchState): Unit = {
val middleBlockHeaderNumber: BigInt = (searchState.minBlockNumber + searchState.maxBlockNumber) / 2
val blockHeaderNumberToRequest: BigInt = childOf(middleBlockHeaderNumber)
val handler = sendGetBlockHeadersRequest(searchState.masterPeer, blockHeaderNumberToRequest, 1)
context.become(waitingForBinarySearchBlock(searchState, blockHeaderNumberToRequest, handler))
private def requestBlockHeaderForBinarySearch(searchState: SearchState): Unit = {
val headerNumberToRequest = blockHeaderNumberToRequest(searchState.minBlockNumber, searchState.maxBlockNumber)
val handler = sendGetBlockHeadersRequest(searchState.masterPeer, headerNumberToRequest, 1)
context.become(waitingForBinarySearchBlock(searchState, headerNumberToRequest, handler))
}

private def handleBinarySearchBlockHeaderResponse(searchState: SearchState, childHeader: BlockHeader): Unit = {
import BinarySearchSupport._
blockchain.getBlockHeaderByNumber(parentOf(childHeader.number)) match {
case Some(parentHeader) =>
validateBlockHeaders(parentHeader, childHeader, searchState) match {
case NoCommonBlock => stopWithFailure(BranchResolutionFailed.noCommonBlock)
case BinarySearchCompleted(highestCommonBlockNumber) =>
finalizeBranchResolver(highestCommonBlockNumber, searchState.masterPeer)
case ContinueBinarySearch(newSearchState) =>
log.debug(s"Continuing binary search with new search state: $newSearchState")
requestBlockHeaderForBinarySearch(newSearchState)
}
case None => stopWithFailure(BranchResolutionFailed.blockHeaderNotFound(childHeader.number))
}
}

private def finalizeBranchResolver(firstCommonBlockNumber: BigInt, masterPeer: Peer): Unit = {
discardBlocksAfter(firstCommonBlockNumber)
log.info(s"Branch resolution completed with first common block number [$firstCommonBlockNumber]")
fastSync ! BranchResolvedSuccessful(firstCommonBlockNumber = firstCommonBlockNumber, masterPeer = masterPeer)
fastSync ! BranchResolvedSuccessful(highestCommonBlockNumber = firstCommonBlockNumber, masterPeer = masterPeer)
context.stop(self)
}

Expand Down Expand Up @@ -243,17 +249,11 @@ object FastSyncBranchResolverActor {
)
)

/**
* Stores the current search state for binary search.
* Meaning we know the first common block lies between minBlockNumber and maxBlockNumber.
*/
final case class SearchState(minBlockNumber: BigInt, maxBlockNumber: BigInt, masterPeer: Peer)

sealed trait BranchResolverRequest
case object StartBranchResolver extends BranchResolverRequest

sealed trait BranchResolverResponse
final case class BranchResolvedSuccessful(firstCommonBlockNumber: BigInt, masterPeer: Peer)
final case class BranchResolvedSuccessful(highestCommonBlockNumber: BigInt, masterPeer: Peer)
extends BranchResolverResponse
import BranchResolutionFailed._
final case class BranchResolutionFailed(failure: BranchResolutionFailure)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/iohk/ethereum/utils/Logger.scala
Expand Up @@ -4,11 +4,11 @@ import com.typesafe.scalalogging
import org.slf4j.{LoggerFactory, MDC}

trait Logger {
val log: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
protected val log: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
}

trait LazyLogger {
lazy val log: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
protected lazy val log: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
}

trait LoggingContext {
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/logback-test.xml
Expand Up @@ -8,6 +8,7 @@
</encoder>
</appender>

<logger name="io.iohk.ethereum" level="DEBUG" />
<logger name="io.iohk.ethereum.vm.VM" level="OFF" />

<root level="INFO">
Expand Down

0 comments on commit 8834451

Please sign in to comment.