Skip to content

Commit

Permalink
[ETCM-213] Properly handle restart while loading bloom filter
Browse files Browse the repository at this point in the history
  • Loading branch information
KonradStaniec committed Oct 19, 2020
1 parent 1bdc6b9 commit 116782f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
Expand Up @@ -53,6 +53,9 @@ class SyncStateSchedulerActor(downloader: ActorRef, sync: SyncStateScheduler, sy
val initStats = ProcessingStatistics().addSaved(result.writtenElements)
val initState = startSyncing(startSignal.stateRoot, startSignal.blockNumber)
context become (syncing(initState, initStats, startSignal.blockNumber, sender))
case Some((restartSignal: RestartRequested.type, sender)) =>
sender ! WaitingForNewTargetBlock
context.become(idle(ProcessingStatistics().addSaved(result.writtenElements)))
case _ =>
context.become(idle(ProcessingStatistics().addSaved(result.writtenElements)))
}
Expand Down Expand Up @@ -104,7 +107,6 @@ class SyncStateSchedulerActor(downloader: ActorRef, sync: SyncStateScheduler, sy
log.debug(s"Received {} new nodes to process", nodes.size)
// Current SyncStateDownloaderActor makes sure that there is no not requested or duplicated values in its response.
// so we can ignore those errors.
// TODO make processing async as sometimes downloader sits idle
sync.processResponses(currentState, nodes) match {
case Left(value) =>
log.error(s"Critical error while state syncing ${value}, stopping state sync")
Expand Down
@@ -1,6 +1,7 @@
package io.iohk.ethereum.blockchain.sync

import java.net.InetSocketAddress
import java.util.concurrent.ThreadLocalRandom

import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.TestActor.AutoPilot
Expand All @@ -14,12 +15,12 @@ import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{
}
import io.iohk.ethereum.domain.BlockchainImpl
import io.iohk.ethereum.network.EtcPeerManagerActor.{GetHandshakedPeers, HandshakedPeers, PeerInfo, SendMessage}
import io.iohk.ethereum.network.{Peer, PeerId}
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
import io.iohk.ethereum.network.p2p.messages.CommonMessages.Status
import io.iohk.ethereum.network.p2p.messages.PV63.GetNodeData.GetNodeDataEnc
import io.iohk.ethereum.network.p2p.messages.PV63.NodeData
import io.iohk.ethereum.network.p2p.messages.Versions
import io.iohk.ethereum.network.{Peer, PeerId}
import io.iohk.ethereum.utils.Config
import io.iohk.ethereum.{Fixtures, ObjectGenerators, WithActorSystemShutDown}
import org.scalactic.anyvals.PosInt
Expand All @@ -29,7 +30,6 @@ import org.scalatest.matchers.should.Matchers
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks

import scala.concurrent.duration._
import scala.util.Random

class StateSyncSpec
extends TestKit(ActorSystem("MySpec"))
Expand Down Expand Up @@ -131,7 +131,7 @@ class StateSyncSpec
}

val maxMptNodeRequest = 50

val minMptNodeRequest = 20
val partialResponseConfig: PeerConfig = peersMap.map { case (peer, _) =>
peer.id -> PartialResponse
}
Expand Down Expand Up @@ -162,7 +162,8 @@ class StateSyncSpec
sender ! MessageFromPeer(responseMsg, peer)
this
case PartialResponse =>
val elementsToServe = Random.nextInt(maxMptNodeRequest)
val random: ThreadLocalRandom = ThreadLocalRandom.current()
val elementsToServe = random.nextInt(minMptNodeRequest, maxMptNodeRequest + 1)
val toGet = msg.underlyingMsg.mptElementsHashes.toList.take(elementsToServe)
val responseMsg = NodeData(trieProvider.getNodes(toGet).map(_.data))
sender ! MessageFromPeer(responseMsg, peer)
Expand Down

0 comments on commit 116782f

Please sign in to comment.