diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index 3dace174ea..1d576a9df5 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -233,52 +233,6 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll { .getBestBlockNumber() == peer3.blockchainReader.getBestBlockNumber() - peer3.testSyncConfig.pivotBlockOffset ) } - - it should "blacklist peer on Invalid batch last header number" in customTestCaseResourceM( - FakePeer.start3FakePeersRes() - ) { case (peer1, peer2, peer3) => - for { - _ <- peer2.importBlocksUntil(1000)(IdentityUpdate) - _ <- peer3.importInvalidBlockNumbers(201, 1200)(IdentityUpdate) - - _ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) - _ <- peer1.startFastSync().delayExecution(50.milliseconds) - _ <- peer1.waitForFastSyncFinish() - } yield { - // Peer3 is blacklisted - val blacklistedPeer = PeerId(peer3.node.toUri.getUserInfo) - val blacklistReason = peer1.blacklist.cache.getIfPresent(blacklistedPeer) - - assert(peer1.blacklist.isBlacklisted(blacklistedPeer)) - assert(blacklistReason.get == BlacklistReasonType.BlockHeaderValidationFailedType) - } - } - - it should "sync blockchain when peer responds with invalid batch last header hash" in customTestCaseResourceM( - FakePeer.start4FakePeersRes() - ) { case (peer1, peer2, peer3, peer4) => - for { - _ <- peer1.importBlocksUntil(400)(IdentityUpdate) - _ <- peer2.importBlocksUntil(1000)(IdentityUpdate) - - _ <- peer3.importInvalidBlocks(600, 800)(IdentityUpdate) - _ <- peer3.importBlocksUntil(1200)(updateStateAtBlock(1000)) - - _ <- peer4.importBlocksUntil(1100)(IdentityUpdate) - - _ <- peer1.connectToPeers(Set(peer2.node, peer3.node, peer4.node)).delayExecution(5.seconds) - _ <- peer1.startFastSync().delayExecution(50.millis) - _ <- peer2.importBlocksUntil(1200)(IdentityUpdate) - _ <- peer1.waitForFastSyncFinish() - } yield { - // Peer3 is blacklisted - val blacklistedPeer = PeerId(peer3.node.toUri.getUserInfo) - val blacklistReason = peer1.blacklist.cache.getIfPresent(blacklistedPeer) - - assert(peer1.blacklist.isBlacklisted(blacklistedPeer)) - assert(blacklistReason.get == BlacklistReasonType.BlockHeaderValidationFailedType) - } - } } object FastSyncItSpec { diff --git a/src/main/resources/conf/base.conf b/src/main/resources/conf/base.conf index 2efb823e09..c683df7c4e 100644 --- a/src/main/resources/conf/base.conf +++ b/src/main/resources/conf/base.conf @@ -354,6 +354,9 @@ mantis { # Also retry interval in regular sync: for picking blocks batch and retrying requests sync-retry-interval = 0.5 seconds + # Delay between finishing fast sync and starting regular sync + sync-switch-delay = 0.5 seconds + # Response time-out from peer during sync. If a peer fails to respond within this limit, it will be blacklisted peer-response-timeout = 30.seconds @@ -475,10 +478,6 @@ mantis { # On reaching this limit, it will perform branch resolving. fast-sync-max-batch-retries = 5 - # If the expected pivot block cannot be confirmed from `min-peers-to-choose-pivot-block`, - # the pivot block number is pushed back by the follwing number of blocks and the confirmation process repeats. - pivot-block-number-reset-delta = 50 - # Max number of times a pivot block is checked against available best peers before the whole process is restarted. max-pivot-block-failures-count = 5 } @@ -571,6 +570,9 @@ mantis { # Define which database to use [rocksdb] data-source = "rocksdb" + + # Run database checks every 10 mins and shut down when an inconsistency is found + periodic-consistency-check = false } filter { diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index 3f900e0f67..919765dfe5 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -26,7 +26,6 @@ import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress import io.iohk.ethereum.blockchain.sync._ -import io.iohk.ethereum.blockchain.sync.fast.HeaderSkeleton._ import io.iohk.ethereum.blockchain.sync.fast.ReceiptsValidator.ReceiptsValidationResult import io.iohk.ethereum.blockchain.sync.fast.SyncBlocksValidator.BlockBodyValidationResult import io.iohk.ethereum.blockchain.sync.fast.SyncStateSchedulerActor.RestartRequested @@ -138,44 +137,22 @@ class FastSync( // scalastyle:off number.of.methods private class SyncingHandler(initialSyncState: SyncState, var masterPeer: Option[Peer] = None) { + private val BlockHeadersHandlerName = "block-headers-request-handler" //not part of syncstate as we do not want to persist is. private var stateSyncRestartRequested = false - private var requestedHeaders: Map[Peer, HeaderRange] = Map.empty + private var requestedHeaders: Map[Peer, BigInt] = Map.empty private var syncState = initialSyncState private var assignedHandlers: Map[ActorRef, Peer] = Map.empty private var peerRequestsTime: Map[Peer, Instant] = Map.empty - - // TODO ETCM-701 get rid of state and move skeleton download to a separate actor - private val blockHeadersQueue: mutable.Queue[HeaderRange] = mutable.Queue.empty - private var currentSkeletonState: Option[HeaderSkeleton] = None - private var skeletonHandler: Option[ActorRef] = None - private var batchFailuresCount = 0 - private var requestedBlockBodies: Map[ActorRef, Seq[ByteString]] = Map.empty private var requestedReceipts: Map[ActorRef, Seq[ByteString]] = Map.empty private val syncStateStorageActor = context.actorOf(Props[StateStorageActor](), s"$countActor-state-storage") syncStateStorageActor ! fastSyncStateStorage - private val branchResolver = context.actorOf( - FastSyncBranchResolverActor - .props( - self, - peerEventBus, - etcPeerManager, - blockchain, - blockchainReader, - blacklist, - syncConfig, - appStateStorage, - scheduler - ), - s"$countActor-fast-sync-branch-resolver" - ) - private val syncStateScheduler = context.actorOf( SyncStateSchedulerActor .props( @@ -236,25 +213,25 @@ class FastSync( } } - // TODO ETCM-701 will be moved to separate actor and refactored private def handleResponses: Receive = { case ResponseReceived(peer, BlockHeaders(blockHeaders), timeTaken) => log.info( - "*** Received {} block headers from peer [{}] in {} ms ***", + "Received {} block headers from peer [{}] in {} ms", blockHeaders.size, peer.id, timeTaken ) FastSyncMetrics.setBlockHeadersDownloadTime(timeTaken) - currentSkeletonState match { - case Some(currentSkeleton) => - if (skeletonHandler.contains(sender())) handleSkeletonResponse(peer, blockHeaders, currentSkeleton) - else handleHeaderBatchResponse(peer, blockHeaders, currentSkeleton) - case None => - log.warning( - s"Received response to fill in header skeleton, but current header skeleton is not defined." - ) - processSyncing() + + requestedHeaders.get(peer).foreach { requestedNum => + removeRequestHandler(sender()) + requestedHeaders -= peer + if ( + blockHeaders.nonEmpty && blockHeaders.size <= requestedNum && blockHeaders.head.number == syncState.bestBlockHeaderNumber + 1 + ) + handleBlockHeaders(peer, blockHeaders) + else + blacklist.add(peer.id, blacklistDuration, WrongBlockHeaders) } case ResponseReceived(peer, BlockBodies(blockBodies), timeTaken) => log.info("Received {} block bodies from peer [{}] in {} ms", blockBodies.size, peer.id, timeTaken) @@ -274,171 +251,6 @@ class FastSync( handleReceipts(peer, requestedHashes, receipts) } - private def handleSkeletonResponse( - peer: Peer, - blockHeaders: Seq[BlockHeader], - currentSkeleton: HeaderSkeleton - ): Unit = { - def validateDownloadedHeaders = blockHeaders.toList.traverse_(validateHeaderOnly) - - log.info("Handling new received skeleton from peer [{}].", peer.id.value) - - skeletonHandler.foreach(context.unwatch) - skeletonHandler = None - - validateDownloadedHeaders match { - case Left(error) => - log.info(s"Validation of skeleton from $peer failed: $error") - blockHeadersError(peer, BlacklistReason.BlockHeaderValidationFailed) - case Right(_) => - currentSkeleton.setSkeletonHeaders(blockHeaders) match { - case Left(error) => - // TODO ETCM-701 if this error keeps happening, switch master peer - log.warning("Failed to set skeleton headers from peer [{}]: [{}]", peer.id.value, error.msg) - requestSkeletonHeaders(peer) - case Right(updatedSkeleton) => - log.debug( - "Updated current skeleton header. Included batches (starting numbers): [{}]", - updatedSkeleton.batchStartingHeaderNumbers.mkString(", ") - ) - currentSkeletonState = Some(updatedSkeleton) - - val blockHeadersToRequest = - updatedSkeleton.batchStartingHeaderNumbers.map { from => - HeaderRange(from, updatedSkeleton.batchSize) - } - - blockHeadersQueue.enqueueAll(blockHeadersToRequest) - } - } - } - - private def handleHeaderBatchResponse( - peer: Peer, - blockHeaders: Seq[BlockHeader], - currentSkeleton: HeaderSkeleton - ): Unit = { - def validHeadersChain(headers: Seq[BlockHeader], requestedNum: BigInt): Boolean = - headers.nonEmpty && headers.size <= requestedNum && checkHeadersChain(headers) - - removeRequestHandler(sender()) - requestedHeaders.get(peer) match { - case Some(requested) => - log.debug("Validating [{}] received block headers from peer [{}]", blockHeaders.size, peer.id.value) - requestedHeaders -= peer - if (validHeadersChain(blockHeaders, requested.limit)) - fillSkeletonGap(peer, requested, blockHeaders, currentSkeleton) - else { - handleHeaderResponseError( - InvalidDownloadedChain(blockHeaders), - requested, - peer, - BlacklistReason.WrongBlockHeaders - ) - } - case None => log.warning("Received block headers from peer [{}] but weren't expecting any.", peer.id.value) - } - } - - private def fillSkeletonGap( - peer: Peer, - request: HeaderRange, - blockHeaders: Seq[BlockHeader], - currentSkeleton: HeaderSkeleton - ): Unit = { - log.debug( - "Attempting to use [{}] block headers from peer [{}] to fill in header skeleton.", - blockHeaders.size, - peer.id - ) - currentSkeleton.addBatch(blockHeaders) match { - case Right(skeleton) => - log.debug("Successfully added headers from peer [{}] to current skeleton.", peer.id.value) - skeleton.fullChain match { - case Some(fullChain) => - log.debug("Current header skeleton completed. Starting to request bodies and receipts.") - handleBlockHeadersChain(peer, fullChain) - currentSkeletonState = None - case None => - log.debug("Skeleton is still incomplete. Waiting for remaining headers.") - currentSkeletonState = Some(skeleton) - } - case Left(error) => - log.warning("Failed to add headers from peer [{}] to current skeleton. Error: [{}]", peer.id.value, error.msg) - handleHeaderResponseError(error, request, peer, BlacklistReason.BlockHeaderValidationFailed) - } - } - - private def handleHeaderResponseError( - error: HeaderSkeletonError, - request: HeaderRange, - peer: Peer, - reason: BlacklistReason - ): Unit = { - def handleMasterPeerFailure(header: BlockHeader): Unit = { - batchFailuresCount += 1 - if (batchFailuresCount > fastSyncMaxBatchRetries) { - log.info("Max number of allowed failures reached. Switching branch and master peer.") - - blockHeadersQueue.dequeueAll(_ => true) - - handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration, continueSyncing = false) - - // Start branch resolution and wait for response from the FastSyncBranchResolver actor. - context.become(waitingForBranchResolution) - branchResolver ! FastSyncBranchResolverActor.StartBranchResolver - } - } - - blockHeadersQueue.enqueue(request) - error match { - // These are the reasons that make the master peer suspicious - case InvalidPenultimateHeader(_, header) => handleMasterPeerFailure(header) - case InvalidBatchHash(_, header) => handleMasterPeerFailure(header) - // Otherwise probably it's just this peer's fault - case _ => - log.warning(error.msg) - blockHeadersError(peer, reason) - } - } - - private def waitingForBranchResolution: Receive = handleStatus.orElse(handleRequestFailure).orElse { - case FastSyncBranchResolverActor.BranchResolvedSuccessful(firstCommonBlockNumber, newMasterPeer) => - log.debug( - s"Resolved branch with first common block number $firstCommonBlockNumber for new master peer $newMasterPeer" - ) - // Reset the batch failures count - batchFailuresCount = 0 - - context.children.foreach { child => - log.debug(s"Unwatching and killing $child") - context.unwatch(child) - child ! PoisonPill - } - - // Restart syncing from the valid block available in state. - log.debug("Starting with fresh SyncingHandler") - val syncingHandler = new SyncingHandler( - syncState.copy( - bestBlockHeaderNumber = firstCommonBlockNumber, - nextBlockToFullyValidate = firstCommonBlockNumber + 1, - pivotBlockUpdateFailures = 0 - ), - masterPeer = Some(newMasterPeer) - ) - context.become(syncingHandler.receive) - syncingHandler.processSyncing() - - case _: FastSyncBranchResolverActor.BranchResolutionFailed => - // there isn't much we can do if we don't find a branch/peer to continue syncing, so let's try again - branchResolver ! FastSyncBranchResolverActor.StartBranchResolver - } - - private def blockHeadersError(peer: Peer, blacklistReason: BlacklistReason): Unit = { - blacklist.add(peer.id, blacklistDuration, blacklistReason) - processSyncing() - } - def askForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Unit = { syncState = syncState.copy(updatingPivotBlock = true) log.info("Asking for new pivot block") @@ -460,7 +272,6 @@ class FastSync( newPivot.number == currentState.pivotBlock.number && updateReason.isSyncRestart newPivot.number >= currentState.pivotBlock.number && !stalePivotAfterRestart } - def waitingForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Receive = handlePeerListMessages.orElse(handleStatus).orElse(handleRequestFailure).orElse { case PivotBlockSelector.Result(pivotBlockHeader) @@ -562,7 +373,6 @@ class FastSync( private def removeRequestHandler(handler: ActorRef): Unit = { log.debug(s"Removing request handler ${handler.path}") context.unwatch(handler) - skeletonHandler = skeletonHandler.filter(_ != handler) assignedHandlers -= handler } @@ -635,7 +445,7 @@ class FastSync( } // scalastyle:off method.length - private def handleBlockHeadersChain(peer: Peer, headers: Seq[BlockHeader]): Unit = { + private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]): Unit = { def processHeader(header: BlockHeader): Either[HeaderProcessingResult, (BlockHeader, ChainWeight)] = for { validatedHeader <- validateHeader(header, peer) @@ -662,6 +472,11 @@ class FastSync( } else HeadersProcessingFinished + if (!checkHeadersChain(headers)) { + blacklist.add(peer.id, blacklistDuration, ErrorInBlockHeaders) + return processSyncing() // scalastyle:off return + } + processHeaders(headers) match { case ParentChainWeightNotFound(header) => // We could end in wrong fork and get blocked so we should rewind our state a little @@ -742,12 +557,8 @@ class FastSync( } private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: BlacklistReason): Unit = { - if (skeletonHandler == Some(handler)) - currentSkeletonState = None - removeRequestHandler(handler) - requestedHeaders.get(peer).foreach(blockHeadersQueue.enqueue) syncState = syncState .enqueueBlockBodies(requestedBlockBodies.getOrElse(handler, Nil)) .enqueueReceipts(requestedReceipts.getOrElse(handler, Nil)) @@ -912,7 +723,7 @@ class FastSync( appStateStorage.fastSyncDone().commit() context.become(idle) peerRequestsTime = Map.empty - syncController ! Done + scheduler.scheduleOnce(syncSwitchDelay, syncController, Done) } def cleanup(): Unit = { @@ -941,30 +752,27 @@ class FastSync( } def assignBlockchainWork(peerWithInfo: PeerWithInfo): Unit = { - val PeerWithInfo(peer, _) = peerWithInfo + val PeerWithInfo(peer, peerInfo) = peerWithInfo log.debug(s"Assigning blockchain work for peer [{}]", peer.id.value) if (syncState.receiptsQueue.nonEmpty) { requestReceipts(peer) } else if (syncState.blockBodiesQueue.nonEmpty) { requestBlockBodies(peer) - } else if (blockHeadersQueue.nonEmpty) { + } else if ( + requestedHeaders.isEmpty && + context.child(BlockHeadersHandlerName).isEmpty && + syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget && + peerInfo.maxBlockNumber >= syncState.pivotBlock.number + ) { requestBlockHeaders(peer) - } else if (shouldRequestNewSkeleton()) { - requestSkeletonHeaders(peer) } else { log.debug( - "Nothing to request. Waiting for responses from: {} and/or {}", - assignedHandlers.keys, - skeletonHandler + "Nothing to request. Waiting for responses from: {}", + assignedHandlers.keys ) } } - private def shouldRequestNewSkeleton(): Boolean = - currentSkeletonState.isEmpty && - skeletonHandler.isEmpty && - syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget - private def requestReceipts(peer: Peer): Unit = { val (receiptsToGet, remainingReceipts) = syncState.receiptsQueue.splitAt(receiptsPerRequest) @@ -1013,102 +821,29 @@ class FastSync( requestedBlockBodies += handler -> blockBodiesToGet } - private def requestBlockHeaders(peer: Peer): Unit = - Try(blockHeadersQueue.dequeue()) match { - case Success(toRequest) => - log.debug( - "Requesting [{}] block headers starting at block header [{}] from peer [{}]", - toRequest.limit, - toRequest.from, - peer.id.value - ) - - val handler = context.actorOf( - PeerRequestHandler.props[GetBlockHeaders, BlockHeaders]( - peer, - peerResponseTimeout, - etcPeerManager, - peerEventBus, - requestMsg = GetBlockHeaders(Left(toRequest.from), toRequest.limit, skip = 0, reverse = false), - responseMsgCode = Codes.BlockHeadersCode - ), - s"$countActor-peer-request-handler-block-headers" - ) - - context.watch(handler) - assignedHandlers += (handler -> peer) - requestedHeaders += (peer -> toRequest) - peerRequestsTime += (peer -> Instant.now()) - case _ => log.warning("Tried to request more block headers but work queue was empty.") - } - - private def requestSkeletonHeaders(peerCandidate: Peer): Unit = { - val skeleton = - HeaderSkeleton(syncState.bestBlockHeaderNumber + 1, syncState.safeDownloadTarget, blockHeadersPerRequest) - - val masterPeerBestBlock = - masterPeer.flatMap(mp => peersToDownloadFrom.get(mp.id).map(_.peerInfo.maxBlockNumber)) - - val (peerToRequestFrom, peerBestBlock) = (masterPeer, masterPeerBestBlock) match { - case (Some(mp), Some(bestBlock)) if bestBlock >= syncState.safeDownloadTarget => - (Some(mp), Some(bestBlock)) - case _ => // switch to new peer as master peer if best block is high enough - val peerBestBlock = peersToDownloadFrom.get(peerCandidate.id).map(_.peerInfo.maxBlockNumber) - peerBestBlock match { - case Some(bestBlock) if bestBlock >= syncState.safeDownloadTarget => - masterPeer = Some(peerCandidate) - (Some(peerCandidate), Some(bestBlock)) - case _ => (None, peerBestBlock) - } - } + private def requestBlockHeaders(peer: Peer): Unit = { + val limit: BigInt = + if (blockHeadersPerRequest < (syncState.safeDownloadTarget - syncState.bestBlockHeaderNumber)) + blockHeadersPerRequest + else + syncState.safeDownloadTarget - syncState.bestBlockHeaderNumber - log.debug( - "Attempting to request header skeleton for range [{}-{}] from master peer [{}] with best known block [{}]", - skeleton.from, - skeleton.lastSkeletonHeaderNumber, - peerToRequestFrom.map(_.id.value), - peerBestBlock - ) - log.debug( - "Request details: [firstSkeletonHeader={}], [limit={}], [gapSize={}] and [safeDownloadTarget={}]", - skeleton.firstSkeletonHeaderNumber, - skeleton.limit, - skeleton.gapSize, - syncState.safeDownloadTarget + val handler = context.actorOf( + PeerRequestHandler.props[GetBlockHeaders, BlockHeaders]( + peer, + peerResponseTimeout, + etcPeerManager, + peerEventBus, + requestMsg = GetBlockHeaders(Left(syncState.bestBlockHeaderNumber + 1), limit, skip = 0, reverse = false), + responseMsgCode = Codes.BlockHeadersCode + ), + BlockHeadersHandlerName ) - peerToRequestFrom match { - case Some(peer) => - val msg = GetBlockHeaders( - Left(skeleton.firstSkeletonHeaderNumber), - skeleton.limit, - skeleton.gapSize, - reverse = false - ) - - val handler = context.actorOf( - PeerRequestHandler.props[GetBlockHeaders, BlockHeaders]( - peer, - peerResponseTimeout, - etcPeerManager, - peerEventBus, - requestMsg = msg, - responseMsgCode = Codes.BlockHeadersCode - ), - s"$countActor-peer-request-handler-block-headers-skeleton" - ) - - context.watch(handler) - skeletonHandler = Some(handler) - currentSkeletonState = Some(skeleton) - peerRequestsTime += (peer -> Instant.now()) - case None => - log.warning( - "Attempted to download new skeleton headers but neither master peer [{}] nor peer candidate [{}] had a high enough best block.", - masterPeer.map(_.id), - peerCandidate.id - ) - } + context.watch(handler) + assignedHandlers += (handler -> peer) + requestedHeaders += (peer -> limit) + peerRequestsTime += (peer -> Instant.now()) } private def unassignedPeers: List[PeerWithInfo] = { @@ -1274,6 +1009,4 @@ object FastSync { case object ImportedLastBlock extends PivotBlockUpdateReason case object LastBlockValidationFailed extends PivotBlockUpdateReason case object SyncRestart extends PivotBlockUpdateReason - - final private[fast] case class HeaderRange(from: BigInt, limit: BigInt) } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/PivotBlockSelector.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/PivotBlockSelector.scala index a4e3ab072d..646d779732 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/PivotBlockSelector.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/PivotBlockSelector.scala @@ -227,11 +227,11 @@ class PivotBlockSelector( .map { case (_, bestPeerBestBlockNumber) => bestPeerBestBlockNumber } .getOrElse(BigInt(0)) - // The current best block number is pushed back by `pivotBlockNumberResetDelta` + // The current best block number is set to the highest below the previous one, // if this request is issued by the retry logic. val currentBestBlockNumber: BigInt = previousBestBlockNumber - .map(_ - pivotBlockNumberResetDelta.max(0)) + .flatMap(previous => peersSortedByBestNumber.collectFirst { case (_, number) if number < previous => number }) .getOrElse(bestPeerBestBlockNumber) val expectedPivotBlock = (currentBestBlockNumber - syncConfig.pivotBlockOffset).max(0) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncStateSchedulerActor.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncStateSchedulerActor.scala index 0cf721a750..41654f7830 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncStateSchedulerActor.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncStateSchedulerActor.scala @@ -56,7 +56,7 @@ class SyncStateSchedulerActor( }.toList private def requestNodes(request: PeerRequest): ActorRef = { - log.info("Requesting {} from peer {}", request.nodes.size, request.peer) + log.info("Requesting {} from peer {}", request.nodes.size, request.peer.id) val handler = context.actorOf( PeerRequestHandler.props[GetNodeData, NodeData]( request.peer, diff --git a/src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala b/src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala index fd095b08bc..19b8640361 100644 --- a/src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala +++ b/src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala @@ -100,15 +100,16 @@ abstract class BaseNode extends Node { if (jsonRpcConfig.ipcServerConfig.enabled) jsonRpcIpcServer.run() def startPeriodicDBConsistencyCheck(): Unit = - ActorSystem( - PeriodicConsistencyCheck.start( - storagesInstance.storages.appStateStorage, - storagesInstance.storages.blockNumberMappingStorage, - storagesInstance.storages.blockHeadersStorage, - shutdown - ), - "PeriodicDBConsistencyCheck" - ) + if (Config.Db.periodicConsistencyCheck) + ActorSystem( + PeriodicConsistencyCheck.start( + storagesInstance.storages.appStateStorage, + storagesInstance.storages.blockNumberMappingStorage, + storagesInstance.storages.blockHeadersStorage, + shutdown + ), + "PeriodicDBConsistencyCheck" + ) override def shutdown: () => Unit = () => { def tryAndLogFailure(f: () => Any): Unit = Try(f()) match { diff --git a/src/main/scala/io/iohk/ethereum/utils/Config.scala b/src/main/scala/io/iohk/ethereum/utils/Config.scala index 3c1c1cf3e9..e39584b6d3 100644 --- a/src/main/scala/io/iohk/ethereum/utils/Config.scala +++ b/src/main/scala/io/iohk/ethereum/utils/Config.scala @@ -111,6 +111,7 @@ object Config { criticalBlacklistDuration: FiniteDuration, startRetryInterval: FiniteDuration, syncRetryInterval: FiniteDuration, + syncSwitchDelay: FiniteDuration, peerResponseTimeout: FiniteDuration, printStatusInterval: FiniteDuration, maxConcurrentRequests: Int, @@ -144,7 +145,6 @@ object Config { pivotBlockReScheduleInterval: FiniteDuration, maxPivotBlockAge: Int, fastSyncMaxBatchRetries: Int, - pivotBlockNumberResetDelta: Int, maxPivotBlockFailuresCount: Int ) @@ -158,6 +158,7 @@ object Config { criticalBlacklistDuration = syncConfig.getDuration("critical-blacklist-duration").toMillis.millis, startRetryInterval = syncConfig.getDuration("start-retry-interval").toMillis.millis, syncRetryInterval = syncConfig.getDuration("sync-retry-interval").toMillis.millis, + syncSwitchDelay = syncConfig.getDuration("sync-switch-delay").toMillis.millis, peerResponseTimeout = syncConfig.getDuration("peer-response-timeout").toMillis.millis, printStatusInterval = syncConfig.getDuration("print-status-interval").toMillis.millis, maxConcurrentRequests = syncConfig.getInt("max-concurrent-requests"), @@ -191,7 +192,6 @@ object Config { pivotBlockReScheduleInterval = syncConfig.getDuration("pivot-block-reschedule-interval").toMillis.millis, maxPivotBlockAge = syncConfig.getInt("max-pivot-block-age"), fastSyncMaxBatchRetries = syncConfig.getInt("fast-sync-max-batch-retries"), - pivotBlockNumberResetDelta = syncConfig.getInt("pivot-block-number-reset-delta"), maxPivotBlockFailuresCount = syncConfig.getInt("max-pivot-block-failures-count") ) } @@ -203,6 +203,7 @@ object Config { private val rocksDbConfig = dbConfig.getConfig("rocksdb") val dataSource: String = dbConfig.getString("data-source") + val periodicConsistencyCheck: Boolean = dbConfig.getBoolean("periodic-consistency-check") object RocksDb extends RocksDbConfig { override val createIfMissing: Boolean = rocksDbConfig.getBoolean("create-if-missing") diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index 9af787bf8e..748c24f487 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -113,6 +113,7 @@ mantis { start-retry-interval = 500.millis fastsync-throttle = 100.millis sync-retry-interval = 1.second + sync-switch-delay = 0 peer-response-timeout = 1.second print-status-interval = 1.hour diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala index daf465fd38..137a9c92e5 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala @@ -452,57 +452,31 @@ class PivotBlockSelectorSpec peerMessageBus.expectMsgAllOf( Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer1.id))), + Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer3.id))), Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer4.id))) ) etcPeerManager.expectMsgAllOf( - EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(1400), 1, 0, reverse = false), peer1.id), - EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(1400), 1, 0, reverse = false), peer4.id) + EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(900), 1, 0, reverse = false), peer1.id), + EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(900), 1, 0, reverse = false), peer3.id), + EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(900), 1, 0, reverse = false), peer4.id) ) etcPeerManager.expectNoMessage() // Collecting pivot block (for voting) - pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(baseBlockHeader.copy(number = 1400))), peer1.id) - pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(baseBlockHeader.copy(number = 1400))), peer4.id) + pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(baseBlockHeader.copy(number = 900))), peer1.id) + pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(baseBlockHeader.copy(number = 900))), peer3.id) + pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(baseBlockHeader.copy(number = 900))), peer4.id) peerMessageBus.expectMsgAllOf( Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer1.id))), + Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer3.id))), Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer4.id))), Unsubscribe() ) peerMessageBus.expectNoMessage() - fastSync.expectMsg(Result(baseBlockHeader.copy(number = 1400))) - } - - it should "restart pivot block selection after `maxPivotBlockFailuresCount` is reached" in new TestSetup { - - override val minPeersToChoosePivotBlock = 2 - override val peersToChoosePivotBlockMargin = 1 - - updateHandshakedPeers( - HandshakedPeers( - allPeers - .updated(peer1, allPeers(peer1).copy(maxBlockNumber = 2000)) - .updated(peer2, allPeers(peer2).copy(maxBlockNumber = 800)) - .updated(peer3, allPeers(peer3).copy(maxBlockNumber = 900)) - .updated(peer4, allPeers(peer4).copy(maxBlockNumber = 1000)) - ) - ) - - pivotBlockSelector ! SelectPivotBlock - - peerMessageBus.expectNoMessage() - - updateHandshakedPeers(HandshakedPeers(threeAcceptedPeers)) - - time.advance(syncConfig.startRetryInterval) - - peerMessageBus.expectMsgAllOf( - Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer1.id))), - Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer2.id))), - Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer3.id))) - ) + fastSync.expectMsg(Result(baseBlockHeader.copy(number = 900))) } class TestSetup extends TestSyncConfig { diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala index abe92cfba7..33f76e0105 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala @@ -169,12 +169,8 @@ class SyncControllerSpec eventually { someTimePasses() val syncState = storagesInstance.storages.fastSyncStateStorage.getSyncState().get - - // Header validation failed when downloading skeleton headers. - // Sync state remains the same and the peer is blacklisted. - syncState.pivotBlock shouldBe defaultPivotBlockHeader - syncState.bestBlockHeaderNumber shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber) - syncState.nextBlockToFullyValidate shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber + 1) + syncState.bestBlockHeaderNumber shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber - syncConfig.fastSyncBlockValidationN) + syncState.nextBlockToFullyValidate shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber - syncConfig.fastSyncBlockValidationN + 1) syncState.blockBodiesQueue.isEmpty shouldBe true syncState.receiptsQueue.isEmpty shouldBe true } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala index d47ac30407..f2d6d27c93 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala @@ -14,6 +14,7 @@ trait TestSyncConfig extends SyncConfigBuilder { blacklistDuration = 5.seconds, criticalBlacklistDuration = 10.seconds, syncRetryInterval = 1.second, + syncSwitchDelay = 0.5.second, checkForNewBlockInterval = 1.milli, startRetryInterval = 500.milliseconds, blockChainOnlyPeersPoolSize = 100, @@ -46,7 +47,6 @@ trait TestSyncConfig extends SyncConfigBuilder { pivotBlockReScheduleInterval = 1.second, maxPivotBlockAge = 96, fastSyncMaxBatchRetries = 3, - pivotBlockNumberResetDelta = 50, maxPivotBlockFailuresCount = 3 )