Skip to content

Commit

Permalink
[ETCM-102] Fix bugs found during integration testing
Browse files Browse the repository at this point in the history
  • Loading branch information
KonradStaniec committed Sep 16, 2020
1 parent 1238df4 commit 01584d5
Show file tree
Hide file tree
Showing 21 changed files with 188 additions and 135 deletions.
2 changes: 1 addition & 1 deletion src/it/resources/logback-test.xml
Expand Up @@ -25,7 +25,7 @@
</encoder>
</appender>

<root level="DEBUG">
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
Expand Down
97 changes: 82 additions & 15 deletions src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala
@@ -1,6 +1,7 @@
package io.iohk.ethereum.sync

import java.net.{InetSocketAddress, ServerSocket}
import java.nio.file.Files
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference

Expand All @@ -9,9 +10,12 @@ import akka.testkit.TestProbe
import akka.util.{ByteString, Timeout}
import cats.effect.Resource
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.{BlockchainHostActor, FastSync, TestSyncConfig}
import io.iohk.ethereum.db.components.{SharedEphemDataSources, Storages}
import io.iohk.ethereum.db.storage.AppStateStorage
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig}
import io.iohk.ethereum.db.components.{SharedRocksDbDataSources, Storages}
import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource}
import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces}
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
import io.iohk.ethereum.domain.{Account, Address, Block, Blockchain, BlockchainImpl}
import io.iohk.ethereum.ledger.InMemoryWorldStateProxy
Expand All @@ -22,6 +26,7 @@ import io.iohk.ethereum.network.discovery.Node
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo}
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.network.rlpx.AuthHandshaker
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, ServerActor}
Expand Down Expand Up @@ -51,8 +56,8 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter {
_ <- peer1.startFastSync()
_ <- peer1.waitForFastSyncFinish()
} yield {
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
}
}

Expand All @@ -66,11 +71,25 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter {
_ <- peer1.waitForFastSyncFinish()
} yield {
val trie = peer1.getBestBlockTrie()
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
assert(trie.isDefined)
}
}


it should "should update target block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
case (peer1, peer2) =>
for {
_ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(IdentityUpdate)
_ <- peer1.connectToPeers(Set(peer2.node))
_ <- peer2.syncUntil(2000)(IdentityUpdate).startAndForget
_ <- peer1.startFastSync()
_ <- peer1.waitForFastSyncFinish()
} yield {
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
}
}
}

object FastSyncItSpec {
Expand Down Expand Up @@ -108,7 +127,7 @@ object FastSyncItSpec {

val IdentityUpdate: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = (_, world) => world

def updateWorldWithNRandomAcounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = {
def updateWorldWithNRandomAccounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = {
val resultWorld = (0 until n).foldLeft(world) { (world, num) =>
val randomBalance = num
val randomAddress = Address(num)
Expand All @@ -125,7 +144,7 @@ object FastSyncItSpec {

def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = { (blockNr: BigInt, world: InMemoryWorldStateProxy) =>
if (blockNr == blockWithUpdate) {
updateWorldWithNRandomAcounts(1000, world)
updateWorldWithNRandomAccounts(1000, world)
} else {
IdentityUpdate(blockNr, world)
}
Expand All @@ -151,12 +170,30 @@ object FastSyncItSpec {
discoveryStatus = ServerStatus.NotListening
)

lazy val tempDir = Files.createTempDirectory("temp-fast-sync")

def getRockDbTestConfig(dbPath: String) = {
new RocksDbConfig {
override val createIfMissing: Boolean = true
override val paranoidChecks: Boolean = false
override val path: String = dbPath
override val maxThreads: Int = 1
override val maxOpenFiles: Int = 32
override val verifyChecksums: Boolean = false
override val levelCompaction: Boolean = true
override val blockSize: Long = 16384
override val blockCacheSize: Long = 33554432
}
}

sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder {
override lazy val pruningMode: PruningMode = ArchivePruning
}

lazy val nodeStatusHolder = new AtomicReference(nodeStatus)
lazy val storagesInstance = new SharedEphemDataSources with LocalPruningConfigBuilder with Storages.DefaultStorages
lazy val storagesInstance = new SharedRocksDbDataSources with LocalPruningConfigBuilder with Storages.DefaultStorages {
override lazy val dataSource: RocksDbDataSource = RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq)
}
lazy val blockchainConfig = Config.blockchains.blockchainConfig
/**
* Default persist interval is 20s, which is too long for tests. As in all tests we treat peer as connected when
Expand Down Expand Up @@ -252,15 +289,21 @@ object FastSyncItSpec {

val testSyncConfig = syncConfig.copy(
minPeersToChooseTargetBlock = 1,
peersScanInterval = 1.second,
peersScanInterval = 5.milliseconds,
blockHeadersPerRequest = 200,
blockBodiesPerRequest = 50,
receiptsPerRequest = 50,
fastSyncThrottle = 10.milliseconds,
startRetryInterval = 50.milliseconds,
nodesPerRequest = 200
nodesPerRequest = 200,
maxTargetDifference = 1,
syncRetryInterval = 50.milliseconds
)

lazy val broadcaster = new BlockBroadcast(etcPeerManager, testSyncConfig)

lazy val broadcasterActor = system.actorOf(BlockBroadcasterActor.props(broadcaster, peerEventBus, etcPeerManager, testSyncConfig, system.scheduler))

lazy val fastSync = system.actorOf(FastSync.props(
storagesInstance.storages.fastSyncStateStorage,
storagesInstance.storages.appStateStorage,
Expand All @@ -282,6 +325,10 @@ object FastSyncItSpec {
)
}

private def broadcastBlock(block: Block, td: BigInt) = {
broadcasterActor ! BroadcastBlock(NewBlock(block, td))
}

def getCurrentState(): BlockchainState = {
val bestBlock = bl.getBestBlock()
val currentWorldState = getMptForBlock(bestBlock)
Expand All @@ -302,7 +349,10 @@ object FastSyncItSpec {
}

def shutdown(): Task[Unit] = {
Task.deferFuture(system.terminate()).map(_ => ())
for {
_ <- Task.deferFuture(system.terminate())
_ <- Task(storagesInstance.dataSource.destroy())
} yield ()
}

def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = {
Expand Down Expand Up @@ -343,12 +393,29 @@ object FastSyncItSpec {
go(startState.bestBlock, startState.currentTd, startState.currentWorldState, n)
}

def syncUntil(n: BigInt)(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = {
Task(bl.getBestBlock()).flatMap { block =>
if (block.number >= n) {
Task(())
} else {
Task {
val currentTd = bl.getTotalDifficultyByHash(block.hash).get
val currentWolrd = getMptForBlock(block)
val (newBlock, newTd, newWorld) = createChildBlock(block, currentTd, currentWolrd)(updateWorldForBlock)
bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true)
bl.persistCachedNodes()
broadcastBlock(newBlock, newTd)
}.flatMap(_ => syncUntil(n)(updateWorldForBlock))
}
}
}

def startFastSync(): Task[Unit] = Task {
fastSync ! FastSync.Start
}

def waitForFastSyncFinish(): Task[Boolean] = {
retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 30){ isDone =>
retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 90){ isDone =>
isDone
}
}
Expand All @@ -357,7 +424,7 @@ object FastSyncItSpec {
def getBestBlockTrie(): Option[MptNode] = {
Try {
val bestBlock = bl.getBestBlock()
val bestStateRoot =bestBlock.header.stateRoot
val bestStateRoot = bestBlock.header.stateRoot
MptTraversals.parseTrieIntoMemory(HashNode(bestStateRoot.toArray), storagesInstance.storages.stateStorage.getBackingStorage(bestBlock.number))
}.toOption
}
Expand Down
Expand Up @@ -34,9 +34,9 @@ class FastSyncTargetBlockSelector(
val peersUsedToChooseTarget = peersToDownloadFrom.filter(_._2.forkAccepted)

if (peersUsedToChooseTarget.size >= minPeersToChooseTargetBlock) {
peersUsedToChooseTarget.foreach { case (peer, PeerInfo(status, _, _, _)) =>
peersUsedToChooseTarget.foreach { case (peer, PeerInfo(_, _, _, _, bestBlockHash)) =>
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id)))
etcPeerManager ! EtcPeerManagerActor.SendMessage(GetBlockHeaders(Right(status.bestHash), 1, 0, reverse = false), peer.id)
etcPeerManager ! EtcPeerManagerActor.SendMessage(GetBlockHeaders(Right(bestBlockHash), 1, 0, reverse = false), peer.id)
}
log.debug("Asking {} peers for block headers", peersUsedToChooseTarget.size)
val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, BlockHeadersTimeout)
Expand Down
Expand Up @@ -134,7 +134,7 @@ object PeersClient {
def bestPeer(peersToDownloadFrom: Map[Peer, PeerInfo]): Option[Peer] = {
val peersToUse = peersToDownloadFrom
.collect {
case (ref, PeerInfo(_, totalDifficulty, true, _)) =>
case (ref, PeerInfo(_, totalDifficulty, true, _, _)) =>
(ref, totalDifficulty)
}

Expand Down
Expand Up @@ -559,7 +559,7 @@ class OldRegularSync(

private def bestPeer: Option[Peer] = {
val peersToUse = peersToDownloadFrom
.collect{ case (ref, PeerInfo(_, totalDifficulty, true, _)) => (ref, totalDifficulty) }
.collect{ case (ref, PeerInfo(_, totalDifficulty, true, _, _)) => (ref, totalDifficulty) }

if (peersToUse.nonEmpty) {
val (peer, _) = peersToUse.maxBy{ case (_, td) => td }
Expand Down
Expand Up @@ -9,20 +9,23 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou
* key.drop to remove namespace prefix from the key
* @return key values paris from this storage
*/
def getAll(namespace: Namespace): Seq[(IndexedSeq[Byte], IndexedSeq[Byte])] =
def getAll(namespace: Namespace): Seq[(IndexedSeq[Byte], IndexedSeq[Byte])] = synchronized {
storage.toSeq.map{case (key, value) => (key.array().drop(namespace.length).toIndexedSeq, value.toIndexedSeq)}
}

override def get(namespace: Namespace, key: Key): Option[Value] = storage.get(ByteBuffer.wrap((namespace ++ key).toArray)).map(_.toIndexedSeq)
override def get(namespace: Namespace, key: Key): Option[Value] = synchronized {
storage.get(ByteBuffer.wrap((namespace ++ key).toArray)).map(_.toIndexedSeq)
}

override def update(namespace: Namespace, toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): DataSource = {
override def update(namespace: Namespace, toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): DataSource = synchronized {
val afterRemoval = toRemove.foldLeft(storage)((storage, key) => storage - ByteBuffer.wrap((namespace ++ key).toArray))
val afterUpdate = toUpsert.foldLeft(afterRemoval)((storage, toUpdate) =>
storage + (ByteBuffer.wrap((namespace ++ toUpdate._1).toArray) -> toUpdate._2.toArray))
storage = afterUpdate
this
}

override def clear: DataSource = {
override def clear: DataSource = synchronized {
storage = Map()
this
}
Expand All @@ -31,7 +34,7 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou

override def destroy(): Unit = ()

override def updateOptimized(toRemove: Seq[Array[Byte]], toUpsert: Seq[(Array[Byte], Array[Byte])]): DataSource = {
override def updateOptimized(toRemove: Seq[Array[Byte]], toUpsert: Seq[(Array[Byte], Array[Byte])]): DataSource = synchronized {
val afterRemoval = toRemove.foldLeft(storage)((storage, key) => storage - ByteBuffer.wrap(key))
val afterUpdate = toUpsert.foldLeft(afterRemoval)((storage, toUpdate) =>
storage + (ByteBuffer.wrap(toUpdate._1) -> toUpdate._2))
Expand Down
45 changes: 29 additions & 16 deletions src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala
@@ -1,6 +1,7 @@
package io.iohk.ethereum.network

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.util.ByteString
import io.iohk.ethereum.db.storage.AppStateStorage
import io.iohk.ethereum.network.PeerActor.{DisconnectPeer, SendMessage}
import io.iohk.ethereum.network.EtcPeerManagerActor._
Expand Down Expand Up @@ -37,16 +38,21 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
def handleMessages(peersWithInfo: PeersWithInfo): Receive =
handleCommonMessages(peersWithInfo) orElse handlePeersInfoEvents(peersWithInfo)

private def peerHasUdpatedBestBlock(peerInfo: PeerInfo): Boolean = {
val peerBestBlockIsItsGenesisBlock = peerInfo.bestBlockHash == peerInfo.remoteStatus.genesisHash
peerBestBlockIsItsGenesisBlock || (!peerBestBlockIsItsGenesisBlock && peerInfo.maxBlockNumber > 0)
}
/**
* Processes both messages for sending messages and for requesting peer information
*
* @param peersWithInfo, which has the peer and peer information for each handshaked peer (identified by it's id)
*/
private def handleCommonMessages(peersWithInfo: PeersWithInfo): Receive = {
case GetHandshakedPeers =>
// Provide only peers which already responded to request for best block hash
// Provide only peers which already responded to request for best block hash, and theirs best block hash is different
// form their genesis block
sender() ! HandshakedPeers(peersWithInfo.collect {
case (_, PeerWithInfo(peer, peerInfo)) if peerInfo.maxBlockNumber > 0 => peer -> peerInfo
case (_, PeerWithInfo(peer, peerInfo)) if peerHasUdpatedBestBlock(peerInfo) => peer -> peerInfo
})

case PeerInfoRequest(peerId) =>
Expand Down Expand Up @@ -112,7 +118,7 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
* @return new updated peer info
*/
private def handleSentMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo =
updateMaxBlock(message)(initialPeerWithInfo.peerInfo)
initialPeerWithInfo.peerInfo

/**
* Processes the message and the old peer info and returns the peer info
Expand All @@ -121,11 +127,12 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
* @param initialPeerWithInfo from before the message was processed
* @return new updated peer info
*/
private def handleReceivedMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo =
private def handleReceivedMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo = {
(updateTotalDifficulty(message) _
andThen updateForkAccepted(message, initialPeerWithInfo.peer)
andThen updateMaxBlock(message)
)(initialPeerWithInfo.peerInfo)
}


/**
Expand Down Expand Up @@ -178,24 +185,28 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe
* @return new peer info with the max block number updated
*/
private def updateMaxBlock(message: Message)(initialPeerInfo: PeerInfo): PeerInfo = {
def update(ns: Seq[BigInt]): PeerInfo = {
val maxBlockNumber = ns.fold(0: BigInt) { case (a, b) => if (a > b) a else b }
if (maxBlockNumber > appStateStorage.getEstimatedHighestBlock())
appStateStorage.putEstimatedHighestBlock(maxBlockNumber)

if (maxBlockNumber > initialPeerInfo.maxBlockNumber)
initialPeerInfo.withMaxBlockNumber(maxBlockNumber)
else
def update(ns: Seq[(BigInt, ByteString)]): PeerInfo = {
if (ns.isEmpty) {
initialPeerInfo
} else {
val (maxBlockNumber, maxBlockHash) = ns.maxBy(_._1)
if (maxBlockNumber > appStateStorage.getEstimatedHighestBlock())
appStateStorage.putEstimatedHighestBlock(maxBlockNumber)

if (maxBlockNumber > initialPeerInfo.maxBlockNumber)
initialPeerInfo.withMaxBlockNumber(maxBlockNumber).withMaxBlockHash(maxBlockHash)
else
initialPeerInfo
}
}

message match {
case m: BlockHeaders =>
update(m.headers.map(_.number))
update(m.headers.map(header => (header.number, header.hash)))
case m: NewBlock =>
update(Seq(m.block.header.number))
update(Seq((m.block.header.number, m.block.header.hash)))
case m: NewBlockHashes =>
update(m.hashes.map(_.number))
update(m.hashes.map(h => (h.number, h.hash)))
case _ => initialPeerInfo
}
}
Expand All @@ -209,14 +220,16 @@ object EtcPeerManagerActor {
case class PeerInfo(remoteStatus: Status,
totalDifficulty: BigInt,
forkAccepted: Boolean,
maxBlockNumber: BigInt) extends HandshakeResult {
maxBlockNumber: BigInt,
bestBlockHash: ByteString) extends HandshakeResult {

def withTotalDifficulty(totalDifficulty: BigInt): PeerInfo = copy(totalDifficulty = totalDifficulty)

def withForkAccepted(forkAccepted: Boolean): PeerInfo = copy(forkAccepted = forkAccepted)

def withMaxBlockNumber(maxBlockNumber: BigInt): PeerInfo = copy(maxBlockNumber = maxBlockNumber)

def withMaxBlockHash(bestBlockHash: ByteString): PeerInfo = copy(bestBlockHash = bestBlockHash)
}

private case class PeerWithInfo(peer: Peer, peerInfo: PeerInfo)
Expand Down

0 comments on commit 01584d5

Please sign in to comment.