Skip to content

Commit

Permalink
[ETCM-76] network messages for checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
rtkaczyk committed Oct 28, 2020
1 parent ae398e1 commit f0befc1
Show file tree
Hide file tree
Showing 37 changed files with 604 additions and 121 deletions.
Expand Up @@ -158,6 +158,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
override val peerConfiguration: PeerConfiguration = peerConf
override val blockchain: Blockchain = bl
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
override val blockchainConfig = CommonFakePeer.this.blockchainConfig // FIXME: remove in ETCM-280
}

lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)
Expand Down
Expand Up @@ -70,6 +70,7 @@ object RegularSyncItSpecUtils {
peerEventBus,
ledger,
bl,
blockchainConfig, // FIXME: remove in ETCM-280
testSyncConfig,
ommersPool,
pendingTransactionsManager,
Expand Down
Expand Up @@ -19,7 +19,7 @@ import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.network.{ForkResolver, PeerEventBusActor, PeerManagerActor}
import io.iohk.ethereum.nodebuilder.{AuthHandshakerBuilder, NodeKeyBuilder, SecureRandomBuilder}
import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus}
import io.iohk.ethereum.utils.{BlockchainConfig, Config, NodeStatus, ServerStatus}
import java.util.concurrent.atomic.AtomicReference

import io.iohk.ethereum.db.dataSource.{DataSourceBatchUpdate, RocksDbDataSource}
Expand Down Expand Up @@ -82,6 +82,7 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
override val nodeStatusHolder: AtomicReference[NodeStatus] = DumpChainApp.nodeStatusHolder
override val peerConfiguration: PeerConfiguration = peerConfig
override val blockchain: Blockchain = DumpChainApp.blockchain
override val blockchainConfig: BlockchainConfig = DumpChainApp.blockchainConfig
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
}

Expand Down
Expand Up @@ -35,7 +35,9 @@ class BlockBroadcast(val etcPeerManager: ActorRef, syncConfig: SyncConfig) {
}

private def shouldSendNewBlock(newBlock: NewBlock, peerInfo: PeerInfo): Boolean =
newBlock.block.header.number > peerInfo.maxBlockNumber || newBlock.totalDifficulty > peerInfo.totalDifficulty
newBlock.block.header.number > peerInfo.maxBlockNumber ||
newBlock.totalDifficulty > peerInfo.totalDifficulty ||
newBlock.latestCheckpointNumber > peerInfo.latestCheckpointNumber

private def broadcastNewBlock(newBlock: NewBlock, peers: Set[Peer]): Unit =
obtainRandomPeerSubset(peers).foreach { peer =>
Expand Down
Expand Up @@ -161,12 +161,12 @@ object PeersClient {

def bestPeer(peersToDownloadFrom: Map[Peer, PeerInfo]): Option[Peer] = {
val peersToUse = peersToDownloadFrom
.collect { case (ref, PeerInfo(_, totalDifficulty, true, _, _)) =>
(ref, totalDifficulty)
.collect { case (ref, PeerInfo(_, totalDifficulty, latestChkp, true, _, _)) =>
(ref, totalDifficulty, latestChkp)
}

if (peersToUse.nonEmpty) {
val (peer, _) = peersToUse.maxBy { case (_, td) => td }
val (peer, _, _) = peersToUse.maxBy { case (_, td, latestChkp) => latestChkp -> td }
Some(peer)
} else {
None
Expand Down
Expand Up @@ -173,8 +173,9 @@ class PivotBlockSelector(
}

private def collectVoters: ElectionDetails = {
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) =>
(peer, maxBlockNumber)
val peersUsedToChooseTarget = peersToDownloadFrom.collect {
case (peer, PeerInfo(_, _, _, true, maxBlockNumber, _)) =>
(peer, maxBlockNumber)
}

val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => -number }
Expand Down
Expand Up @@ -7,11 +7,13 @@ import io.iohk.ethereum.consensus.validators.Validators
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
import io.iohk.ethereum.domain.Blockchain
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.utils.BlockchainConfig
import io.iohk.ethereum.utils.Config.SyncConfig

class SyncController(
appStateStorage: AppStateStorage,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
fastSyncStateStorage: FastSyncStateStorage,
ledger: Ledger,
validators: Validators,
Expand Down Expand Up @@ -101,6 +103,7 @@ class SyncController(
peerEventBus,
ledger,
blockchain,
blockchainConfig,
syncConfig,
ommersPool,
pendingTransactionsManager,
Expand All @@ -120,6 +123,7 @@ object SyncController {
def props(
appStateStorage: AppStateStorage,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncStateStorage: FastSyncStateStorage,
ledger: Ledger,
validators: Validators,
Expand All @@ -134,6 +138,7 @@ object SyncController {
new SyncController(
appStateStorage,
blockchain,
blockchainConfig,
syncStateStorage,
ledger,
validators,
Expand Down
Expand Up @@ -53,7 +53,9 @@ class BlockFetcher(

private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) =>
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
peerEventBus ! Subscribe(MessageClassifier(Set(NewBlock.code, NewBlockHashes.code), PeerSelector.AllPeers))
peerEventBus ! Subscribe(
MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers)
)
}

def handleCommonMessages(state: Option[BlockFetcherState]): Receive = { case PrintStatus =>
Expand Down Expand Up @@ -177,7 +179,7 @@ class BlockFetcher(
}

fetchBlocks(newState)
case MessageFromPeer(NewBlock(block, _), peerId) =>
case MessageFromPeer(NewBlock(block, _, _), peerId) =>
val newBlockNr = block.number
val nextExpectedBlock = state.lastFullBlockNumber + 1

Expand Down
Expand Up @@ -14,23 +14,24 @@ import io.iohk.ethereum.domain.{Block, Blockchain, Checkpoint, SignedTransaction
import io.iohk.ethereum.ledger._
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.network.p2p.messages.CommonMessages.{NewBlock, NewBlock63, NewBlock64}
import io.iohk.ethereum.ommers.OmmersPool.AddOmmers
import io.iohk.ethereum.transactions.PendingTransactionsManager
import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddUncheckedTransactions, RemoveTransactions}
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.{BlockchainConfig, ByteStringUtils}
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.utils.FunctorOps._

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

// scalastyle:off cyclomatic.complexity
// scalastyle:off cyclomatic.complexity parameter.number
class BlockImporter(
fetcher: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig, //FIXME: this should not be needed after ETCM-280
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
Expand Down Expand Up @@ -248,8 +249,22 @@ class BlockImporter(
}
}

private def broadcastBlocks(blocks: List[Block], totalDifficulties: List[BigInt]): Unit = {
val newBlocks = (blocks, totalDifficulties).mapN(NewBlock.apply)
private def broadcastBlocks(
blocks: List[Block],
totalDifficulties: List[BigInt]
): Unit = {
val constructNewBlock = {
//FIXME: instead of choosing the message version based on block we should rely on the receiving
// peer's `Capability`. To be addressed in ETCM-280
if (blocks.lastOption.exists(_.number < blockchainConfig.ecip1097BlockNumber))
NewBlock63.apply _
else
//FIXME: we should use checkpoint number corresponding to the block we're broadcasting. This will be addressed
// in ETCM-263 by using ChainWeight for that block
NewBlock64.apply(_, _, blockchain.getLatestCheckpointBlockNumber())
}

val newBlocks = (blocks, totalDifficulties).mapN(constructNewBlock)
broadcastNewBlocks(newBlocks)
}

Expand Down Expand Up @@ -316,6 +331,7 @@ object BlockImporter {
fetcher: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
Expand All @@ -327,6 +343,7 @@ object BlockImporter {
fetcher,
ledger,
blockchain,
blockchainConfig,
syncConfig,
ommersPool,
broadcaster,
Expand Down
Expand Up @@ -7,7 +7,7 @@ import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.crypto.ECDSASignature
import io.iohk.ethereum.domain.{Block, Blockchain}
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.{BlockchainConfig, ByteStringUtils}
import io.iohk.ethereum.utils.Config.SyncConfig

class RegularSync(
Expand All @@ -16,6 +16,7 @@ class RegularSync(
peerEventBus: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
Expand All @@ -38,6 +39,7 @@ class RegularSync(
fetcher,
ledger,
blockchain,
blockchainConfig,
syncConfig,
ommersPool,
broadcaster,
Expand Down Expand Up @@ -91,6 +93,7 @@ object RegularSync {
peerEventBus: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
Expand All @@ -104,6 +107,7 @@ object RegularSync {
peerEventBus,
ledger,
blockchain,
blockchainConfig,
syncConfig,
ommersPool,
pendingTransactionsManager,
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala
Expand Up @@ -274,8 +274,11 @@ case object BlockEnqueued extends BlockImportResult

case object DuplicateBlock extends BlockImportResult

case class ChainReorganised(oldBranch: List[Block], newBranch: List[Block], totalDifficulties: List[BigInt])
extends BlockImportResult
case class ChainReorganised(
oldBranch: List[Block],
newBranch: List[Block],
totalDifficulties: List[BigInt]
) extends BlockImportResult

case class BlockImportFailed(error: String) extends BlockImportResult

Expand Down
28 changes: 20 additions & 8 deletions src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala
Expand Up @@ -143,7 +143,7 @@ class EtcPeerManagerActor(
* @return new updated peer info
*/
private def handleReceivedMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo = {
(updateTotalDifficulty(message) _
(updateTotalDifficultyAndCheckpoint(message) _
andThen updateForkAccepted(message, initialPeerWithInfo.peer)
andThen updateMaxBlock(message))(initialPeerWithInfo.peerInfo)
}
Expand All @@ -155,11 +155,15 @@ class EtcPeerManagerActor(
* @param initialPeerInfo from before the message was processed
* @return new peer info with the total difficulty updated
*/
private def updateTotalDifficulty(message: Message)(initialPeerInfo: PeerInfo): PeerInfo = message match {
case newBlock: NewBlock =>
initialPeerInfo.withTotalDifficulty(newBlock.totalDifficulty)
case _ => initialPeerInfo
}
private def updateTotalDifficultyAndCheckpoint(message: Message)(initialPeerInfo: PeerInfo): PeerInfo =
message match {
case newBlock: NewBlock =>
initialPeerInfo.copy(
totalDifficulty = newBlock.totalDifficulty,
latestCheckpointNumber = newBlock.latestCheckpointNumber
)
case _ => initialPeerInfo
}

/**
* Processes the message and updates if the fork block was accepted from the peer
Expand Down Expand Up @@ -228,11 +232,12 @@ class EtcPeerManagerActor(

object EtcPeerManagerActor {

val msgCodesWithInfo: Set[Int] = Set(BlockHeaders.code, NewBlock.code, NewBlockHashes.code)
val msgCodesWithInfo: Set[Int] = Set(BlockHeaders.code, NewBlock.code63, NewBlock.code64, NewBlockHashes.code)

case class PeerInfo(
remoteStatus: Status, // Updated only after handshaking
totalDifficulty: BigInt,
latestCheckpointNumber: BigInt,
forkAccepted: Boolean,
maxBlockNumber: BigInt,
bestBlockHash: ByteString
Expand All @@ -257,7 +262,14 @@ object EtcPeerManagerActor {

object PeerInfo {
def apply(remoteStatus: Status, forkAccepted: Boolean): PeerInfo = {
PeerInfo(remoteStatus, remoteStatus.totalDifficulty, forkAccepted, 0, remoteStatus.bestHash)
PeerInfo(
remoteStatus,
remoteStatus.totalDifficulty,
remoteStatus.latestCheckpointNumber,
forkAccepted,
0,
remoteStatus.bestHash
)
}

def withForkAccepted(remoteStatus: Status): PeerInfo = PeerInfo(remoteStatus, forkAccepted = true)
Expand Down
Expand Up @@ -5,7 +5,7 @@ import io.iohk.ethereum.domain.Blockchain
import io.iohk.ethereum.network.ForkResolver
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.utils.NodeStatus
import io.iohk.ethereum.utils.{BlockchainConfig, NodeStatus}
import java.util.concurrent.atomic.AtomicReference

case class EtcHandshaker private (
Expand All @@ -31,6 +31,7 @@ object EtcHandshaker {
trait EtcHandshakerConfiguration {
val nodeStatusHolder: AtomicReference[NodeStatus]
val blockchain: Blockchain
val blockchainConfig: BlockchainConfig
val appStateStorage: AppStateStorage
val peerConfiguration: PeerConfiguration
val forkResolverOpt: Option[ForkResolver]
Expand Down
Expand Up @@ -52,10 +52,15 @@ case class EtcNodeStatusExchangeState(handshakerConfiguration: EtcHandshakerConf
private def createStatusMsg(): Status = {
val bestBlockHeader = getBestBlockHeader()
val totalDifficulty = blockchain.getTotalDifficultyByHash(bestBlockHeader.hash).get
val latestCheckpointNumber =
if (bestBlockHeader.number < blockchainConfig.ecip1097BlockNumber) None
else Some(blockchain.getLatestCheckpointBlockNumber())

val status = Status(
protocolVersion = Versions.PV63,
networkId = peerConfiguration.networkId,
totalDifficulty = totalDifficulty,
latestCheckpointNumber = latestCheckpointNumber,
bestHash = bestBlockHeader.hash,
genesisHash = blockchain.genesisHeader.hash
)
Expand Down

0 comments on commit f0befc1

Please sign in to comment.