Skip to content

Commit

Permalink
[ETCM-211] Rename fetching states; make block helpers an object and a…
Browse files Browse the repository at this point in the history
…dded extra test for the block fetcher
  • Loading branch information
Nicolas Tallar committed Oct 27, 2020
1 parent 5f49352 commit 6fbad03
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ case class BlockFetcherState(
def withPeerForBlocks(peerId: PeerId, blocks: Seq[BigInt]): BlockFetcherState =
copy(blockProviders = blockProviders ++ blocks.map(block => block -> peerId))

def isFetchingHeaders: Boolean = fetchingHeadersState != NoHeadersFetched
def isFetchingHeaders: Boolean = fetchingHeadersState != NotFetchingHeaders
def withNewHeadersFetch: BlockFetcherState = copy(fetchingHeadersState = AwaitingHeaders)
def withHeaderFetchReceived: BlockFetcherState = copy(fetchingHeadersState = NoHeadersFetched)
def withHeaderFetchReceived: BlockFetcherState = copy(fetchingHeadersState = NotFetchingHeaders)

def isFetchingBodies: Boolean = fetchingBodiesState != NoBodiesFetched
def isFetchingBodies: Boolean = fetchingBodiesState != NotFetchingBodies
def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies)
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NoBodiesFetched)
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NotFetchingBodies)

def fetchingStateNode(hash: ByteString, requestor: ActorRef): BlockFetcherState =
copy(stateNodeFetcher = Some(StateNodeFetcher(hash, requestor)))
Expand Down Expand Up @@ -220,16 +220,16 @@ object BlockFetcherState {
importer = importer,
readyBlocks = Queue(),
waitingHeaders = Queue(),
fetchingHeadersState = NoHeadersFetched,
fetchingBodiesState = NoBodiesFetched,
fetchingHeadersState = NotFetchingHeaders,
fetchingBodiesState = NotFetchingBodies,
stateNodeFetcher = None,
lastBlock = lastBlock,
knownTop = lastBlock + 1,
blockProviders = Map()
)

trait FetchingHeadersState
case object NoHeadersFetched extends FetchingHeadersState
case object NotFetchingHeaders extends FetchingHeadersState
case object AwaitingHeaders extends FetchingHeadersState

/**
Expand All @@ -239,7 +239,7 @@ object BlockFetcherState {
case object AwaitingHeadersToBeIgnored extends FetchingHeadersState

trait FetchingBodiesState
case object NoBodiesFetched extends FetchingBodiesState
case object NotFetchingBodies extends FetchingBodiesState
case object AwaitingBodies extends FetchingBodiesState

/**
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/io/iohk/ethereum/BlockHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.iohk.ethereum
import akka.util.ByteString
import io.iohk.ethereum.domain.Block

trait BlockHelpers {
object BlockHelpers {

def generateChain(amount: Int, parent: Block): Seq[Block] = {
(1 to amount).foldLeft[Seq[Block]](Nil){ case (acc, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBodies, BlockHeaders, Ge
import org.scalatest.freespec.AnyFreeSpecLike
import org.scalatest.matchers.should.Matchers

class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) with AnyFreeSpecLike with Matchers with BlockHelpers {
import scala.concurrent.duration._

"BlockFetcher" - {
class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) with AnyFreeSpecLike with Matchers {

"should not requests headers upon invalidation if a request is already in progress" in new TestSetup {
blockFetcher ! BlockFetcher.Start(importer.ref, 0)
"BlockFetcher" - {

peerEventBus.expectMsg(Subscribe(MessageClassifier(Set(NewBlock.code, NewBlockHashes.code), PeerSelector.AllPeers)))
"should not requests headers upon invalidation while a request is already in progress, should resume after response" in new TestSetup {
startFetcher()

// First headers request
val firstGetBlockHeadersRequest = GetBlockHeaders(Left(1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false)
peersClient.expectMsgPF(){ case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }

val firstBlocksBatch = generateChain(syncConfig.blockHeadersPerRequest, FixtureBlocks.Genesis.block)
val firstBlocksBatch = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, FixtureBlocks.Genesis.block)
val firstGetBlockHeadersResponse = BlockHeaders(firstBlocksBatch.map(_.header))
peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockHeadersResponse))

Expand All @@ -44,20 +44,65 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w
val firstGetBlockBodiesResponse = BlockBodies(firstBlocksBatch.map(_.body))
peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockBodiesResponse))

// Trigger sync
val farAwayBlockTd = 100000
val farAwayBlock = Block(FixtureBlocks.ValidBlock.header.copy(number = 1000), FixtureBlocks.ValidBlock.body)
// Trigger sync (to be improved with ETCM-248)
triggerFetching()

blockFetcher ! MessageFromPeer(NewBlock(farAwayBlock, farAwayBlockTd), fakePeer.id)
// Second headers request with response pending
val secondGetBlockHeadersRequest = GetBlockHeaders(Left(firstBlocksBatch.last.number + 1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false)
// Save the reference to respond to the ask pattern on fetcher
val refExpectingReply = peersClient.expectMsgPF(){ case PeersClient.Request(msg, _, _) if msg == secondGetBlockHeadersRequest => peersClient.lastSender }

// Mark first blocks as invalid, no further request should be done
blockFetcher ! InvalidateBlocksFrom(1, "")
peersClient.expectMsgClass(classOf[BlacklistPeer])

peersClient.expectNoMessage()

// Respond to the second request should make the fetcher resume with his requests
val secondBlocksBatch = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, firstBlocksBatch.last)
val secondGetBlockHeadersResponse = BlockHeaders(secondBlocksBatch.map(_.header))
peersClient.send(refExpectingReply, PeersClient.Response(fakePeer, secondGetBlockHeadersResponse))

peersClient.expectMsgPF(){ case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
}

"should not requests headers upon invalidation while a request is already in progress, should resume after failure in response" in new TestSetup {
startFetcher()

// First headers request
val firstGetBlockHeadersRequest = GetBlockHeaders(Left(1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false)
peersClient.expectMsgPF(){ case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }

val firstBlocksBatch = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, FixtureBlocks.Genesis.block)
val firstGetBlockHeadersResponse = BlockHeaders(firstBlocksBatch.map(_.header))
peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockHeadersResponse))

// First bodies request
val firstGetBlockBodiesRequest = GetBlockBodies(firstBlocksBatch.map(_.hash))
peersClient.expectMsgPF(){ case PeersClient.Request(msg, _, _) if msg == firstGetBlockBodiesRequest => () }

val firstGetBlockBodiesResponse = BlockBodies(firstBlocksBatch.map(_.body))
peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockBodiesResponse))

// Trigger sync (to be improved with ETCM-248)
triggerFetching()

// Second headers request with response pending
val secondGetBlockHeadersRequest = GetBlockHeaders(Left(firstBlocksBatch.last.number + 1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false)
peersClient.expectMsgPF(){ case PeersClient.Request(msg, _, _) if msg == secondGetBlockHeadersRequest => () }
// Save the reference to respond to the ask pattern on fetcher
val refExpectingReply = peersClient.expectMsgPF(){ case PeersClient.Request(msg, _, _) if msg == secondGetBlockHeadersRequest => peersClient.lastSender }

// Mark first blocks as invalid, no further request should be done
blockFetcher ! InvalidateBlocksFrom(1, "")
peersClient.expectMsgClass(classOf[BlacklistPeer])

peersClient.expectNoMessage()

// Failure of the second request should make the fetcher resume with his requests
peersClient.send(refExpectingReply, PeersClient.RequestFailed(fakePeer, ""))

peersClient.expectMsgClass(classOf[BlacklistPeer])
peersClient.expectMsgPF(){ case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
}
}

Expand All @@ -68,10 +113,12 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w
val peerEventBus: TestProbe = TestProbe()
val importer: TestProbe = TestProbe()

// Same request size was selected for simplification purposes of the flow
override lazy val syncConfig = defaultSyncConfig.copy(
// Same request size was selected for simplification purposes of the flow
blockHeadersPerRequest = 10,
blockBodiesPerRequest = 10
blockBodiesPerRequest = 10,
// Huge timeout on ask pattern
peerResponseTimeout = 5.minutes
)

val fakePeerActor: TestProbe = TestProbe()
Expand All @@ -86,6 +133,21 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w
time.scheduler
)
)

def startFetcher(): Unit = {
blockFetcher ! BlockFetcher.Start(importer.ref, 0)

peerEventBus.expectMsg(Subscribe(MessageClassifier(Set(NewBlock.code, NewBlockHashes.code), PeerSelector.AllPeers)))
}

// Sending a far away block as a NewBlock message
// Currently BlockFetcher only downloads first block-headers-per-request blocks without this
def triggerFetching(): Unit = {
val farAwayBlockTd = 100000
val farAwayBlock = Block(FixtureBlocks.ValidBlock.header.copy(number = 1000), FixtureBlocks.ValidBlock.body)

blockFetcher ! MessageFromPeer(NewBlock(farAwayBlock, farAwayBlockTd), fakePeer.id)
}
}

}

0 comments on commit 6fbad03

Please sign in to comment.