Skip to content

Commit

Permalink
Merge branch 'fix/regularSyncAfterIncFastSync' into phase/daedalus
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Tallar committed Aug 16, 2017
2 parents 6a0e219 + 0e6e0dc commit 08c0cfe
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import io.iohk.ethereum.validators.BlockValidator

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

trait FastSync {
selfSyncController: SyncController =>
Expand Down Expand Up @@ -180,7 +181,10 @@ trait FastSync {

private var blockChainOnlyPeers = Seq.empty[Peer]

private val syncStatePersistCancellable = scheduler.schedule(persistStateSnapshotInterval, persistStateSnapshotInterval, self, PersistSyncState)
//Delay before starting to persist snapshot. It should be 0, as the presence of it marks that fast sync was started
private val persistStateSnapshotDelay: FiniteDuration = 0.seconds

private val syncStatePersistCancellable = scheduler.schedule(persistStateSnapshotDelay, persistStateSnapshotInterval, self, PersistSyncState)
private val heartBeat = scheduler.schedule(syncRetryInterval, syncRetryInterval * 2, self, ProcessSyncing)

// scalastyle:off cyclomatic.complexity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.PeerDis
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
import io.iohk.ethereum.network.p2p.messages.PV62.BlockBody
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId}
import io.iohk.ethereum.utils.Config
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.validators.Validators

class SyncController(
Expand All @@ -28,6 +28,7 @@ class SyncController(
val pendingTransactionsManager: ActorRef,
val ommersPool: ActorRef,
val etcPeerManager: ActorRef,
syncConfig: SyncConfig,
val externalSchedulerOpt: Option[Scheduler] = None)
extends Actor
with ActorLogging
Expand All @@ -36,7 +37,7 @@ class SyncController(
with RegularSync {

import BlacklistSupport._
import Config.Sync._
import syncConfig._
import SyncController._

override val supervisorStrategy: OneForOneStrategy =
Expand Down Expand Up @@ -65,8 +66,12 @@ class SyncController(
case (true, false) =>
startRegularSync()
case (false, false) =>
fastSyncStateStorage.purge()
startRegularSync()
//Check whether fast sync was started before
if (fastSyncStateStorage.getSyncState().isDefined) {
log.warning(s"do-fast-sync is set to $doFastSync but regular sync cannot start because fast sync hasn't completed")
startFastSync()
} else
startRegularSync()
}

case FastSyncDone =>
Expand Down Expand Up @@ -115,9 +120,10 @@ object SyncController {
peerEventBus: ActorRef,
pendingTransactionsManager: ActorRef,
ommersPool: ActorRef,
etcPeerManager: ActorRef):
etcPeerManager: ActorRef,
syncConfig: SyncConfig):
Props = Props(new SyncController(appStateStorage, blockchain, blockchainStorages, syncStateStorage, ledger, validators,
peerEventBus, pendingTransactionsManager, ommersPool, etcPeerManager))
peerEventBus, pendingTransactionsManager, ommersPool, etcPeerManager, syncConfig))

case class BlockHeadersToResolve(peer: Peer, headers: Seq[BlockHeader])

Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ trait SyncControllerBuilder {
OmmersPoolBuilder with
EtcPeerManagerActorBuilder =>


lazy val syncConfig = Config.Sync

lazy val syncController = actorSystem.actorOf(
SyncController.props(
Expand All @@ -381,7 +381,8 @@ trait SyncControllerBuilder {
peerEventBus,
pendingTransactionsManager,
ommersPool,
etcPeerManager), "sync-controller")
etcPeerManager,
syncConfig), "sync-controller")

}

Expand Down
26 changes: 25 additions & 1 deletion src/main/scala/io/iohk/ethereum/utils/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,31 @@ object Config {

}

object Sync {
trait SyncConfig {
val doFastSync: Boolean

val peersScanInterval: FiniteDuration
val blacklistDuration: FiniteDuration
val startRetryInterval: FiniteDuration
val syncRetryInterval: FiniteDuration
val peerResponseTimeout: FiniteDuration
val printStatusInterval: FiniteDuration

val maxConcurrentRequests: Int
val blockHeadersPerRequest: Int
val blockBodiesPerRequest: Int
val receiptsPerRequest: Int
val nodesPerRequest: Int
val minPeersToChooseTargetBlock: Int
val targetBlockOffset: Int
val persistStateSnapshotInterval: FiniteDuration

val checkForNewBlockInterval: FiniteDuration
val blockResolveDepth: Int
val blockChainOnlyPeersPoolSize: Int
}

object Sync extends SyncConfig {
private val syncConfig = config.getConfig("sync")

val doFastSync: Boolean = syncConfig.getBoolean("do-fast-sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.actor.{ActorSystem, Props}
import akka.testkit.{TestActorRef, TestProbe}
import akka.util.ByteString
import com.miguno.akka.testing.VirtualTime
import io.iohk.ethereum.{Fixtures, Mocks, Timeouts}
import io.iohk.ethereum.blockchain.sync.FastSync.{StateMptNodeHash, SyncState}
import io.iohk.ethereum.blockchain.sync.SyncController.MinedBlock
import io.iohk.ethereum.domain.{Account, Block, BlockHeader}
Expand All @@ -21,7 +22,8 @@ import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer}
import io.iohk.ethereum.ommers.OmmersPool.{AddOmmers, RemoveOmmers}
import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddTransactions, RemoveTransactions}
import io.iohk.ethereum.utils.Config
import io.iohk.ethereum.{Mocks, Timeouts}
import io.iohk.ethereum.utils.Config.SyncConfig
import org.scalamock.scalatest.MockFactory
import org.scalatest.{FlatSpec, Matchers}
import org.spongycastle.util.encoders.Hex

Expand Down Expand Up @@ -164,6 +166,7 @@ class SyncControllerSpec extends FlatSpec with Matchers {
ledger,
new Mocks.MockValidatorsFailingOnBlockBodies,
peerMessageBus.ref, pendingTransactionsManager.ref, ommersPool.ref, etcPeerManager.ref,
syncConfig(Config.Sync.doFastSync),
externalSchedulerOpt = Some(time.scheduler))))


Expand Down Expand Up @@ -935,6 +938,38 @@ class SyncControllerSpec extends FlatSpec with Matchers {
pendingTransactionsManager.expectNoMsg()
}

it should "start fast sync after restart, if fast sync was partially ran and then regular sync started" in new TestSetup with MockFactory {
val peerTestProbe: TestProbe = TestProbe()(system)
val peer = Peer(new InetSocketAddress("127.0.0.1", 0), peerTestProbe.ref, false)
val peer1Status= Status(1, 1, 1, ByteString("peer1_bestHash"), ByteString("unused"))

//Save previous incomplete attempt to fast sync
val syncState = SyncState(targetBlock = Fixtures.Blocks.Block3125369.header, mptNodesQueue = Seq(StateMptNodeHash(ByteString("node_hash"))))
storagesInstance.storages.fastSyncStateStorage.putSyncState(syncState)

//Attempt to start regular sync
val syncConfigWithRegularSync = syncConfig(enableFastSync = false)
val syncControllerWithRegularSync = TestActorRef(Props(new SyncController(
storagesInstance.storages.appStateStorage,
blockchain,
storagesInstance.storages,
storagesInstance.storages.fastSyncStateStorage,
ledger,
new Mocks.MockValidatorsAlwaysSucceed,
peerMessageBus.ref, pendingTransactionsManager.ref, ommersPool.ref, etcPeerManager.ref,
syncConfigWithRegularSync,
externalSchedulerOpt = Some(time.scheduler))))

etcPeerManager.send(syncControllerWithRegularSync, HandshakedPeers(Map(
peer -> PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0))))

syncControllerWithRegularSync ! SyncController.StartSync

//Fast sync node request should be received
etcPeerManager.expectMsg(
EtcPeerManagerActor.SendMessage(GetNodeData(Seq(ByteString("node_hash"))), peer.id))
}

class TestSetup(blocksForWhichLedgerFails: Seq[BigInt] = Nil) extends EphemBlockchainTestSetup {
implicit val system = ActorSystem("FastSyncControllerSpec_System")

Expand All @@ -952,6 +987,29 @@ class SyncControllerSpec extends FlatSpec with Matchers {
val pendingTransactionsManager = TestProbe()
val ommersPool = TestProbe()

def syncConfig(enableFastSync: Boolean): SyncConfig = new SyncConfig {
override val doFastSync: Boolean = enableFastSync

//unchanged
override val blockBodiesPerRequest: Int = Config.Sync.blockBodiesPerRequest
override val blacklistDuration: FiniteDuration = Config.Sync.blacklistDuration
override val peersScanInterval: FiniteDuration = Config.Sync.peersScanInterval
override val blockResolveDepth: Int = Config.Sync.blockResolveDepth
override val printStatusInterval: FiniteDuration = Config.Sync.printStatusInterval
override val targetBlockOffset: Int = Config.Sync.targetBlockOffset
override val syncRetryInterval: FiniteDuration = Config.Sync.syncRetryInterval
override val peerResponseTimeout: FiniteDuration = Config.Sync.peerResponseTimeout
override val maxConcurrentRequests: Int = Config.Sync.maxConcurrentRequests
override val startRetryInterval: FiniteDuration = Config.Sync.startRetryInterval
override val receiptsPerRequest: Int = Config.Sync.receiptsPerRequest
override val blockHeadersPerRequest: Int = Config.Sync.blockHeadersPerRequest
override val minPeersToChooseTargetBlock: Int = Config.Sync.minPeersToChooseTargetBlock
override val checkForNewBlockInterval: FiniteDuration = Config.Sync.checkForNewBlockInterval
override val blockChainOnlyPeersPoolSize: Int = Config.Sync.blockChainOnlyPeersPoolSize
override val persistStateSnapshotInterval: FiniteDuration = Config.Sync.persistStateSnapshotInterval
override val nodesPerRequest: Int = Config.Sync.nodesPerRequest
}

val syncController = TestActorRef(Props(new SyncController(
storagesInstance.storages.appStateStorage,
blockchain,
Expand All @@ -960,6 +1018,7 @@ class SyncControllerSpec extends FlatSpec with Matchers {
ledger,
new Mocks.MockValidatorsAlwaysSucceed,
peerMessageBus.ref, pendingTransactionsManager.ref, ommersPool.ref, etcPeerManager.ref,
syncConfig(Config.Sync.doFastSync),
externalSchedulerOpt = Some(time.scheduler))))

val EmptyTrieRootHash: ByteString = Account.EmptyStorageRootHash
Expand Down

0 comments on commit 08c0cfe

Please sign in to comment.