Skip to content

Commit

Permalink
[ETCM-76] network messages for checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
rtkaczyk committed Oct 26, 2020
1 parent 17358d2 commit cde4e85
Show file tree
Hide file tree
Showing 34 changed files with 541 additions and 106 deletions.
Expand Up @@ -19,7 +19,7 @@ import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.network.{ForkResolver, PeerEventBusActor, PeerManagerActor}
import io.iohk.ethereum.nodebuilder.{AuthHandshakerBuilder, NodeKeyBuilder, SecureRandomBuilder}
import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus}
import io.iohk.ethereum.utils.{BlockchainConfig, Config, NodeStatus, ServerStatus}
import java.util.concurrent.atomic.AtomicReference

import io.iohk.ethereum.db.dataSource.DataSourceBatchUpdate
Expand Down Expand Up @@ -81,6 +81,7 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
override val nodeStatusHolder: AtomicReference[NodeStatus] = DumpChainApp.nodeStatusHolder
override val peerConfiguration: PeerConfiguration = peerConfig
override val blockchain: Blockchain = DumpChainApp.blockchain
override val blockchainConfig: BlockchainConfig = DumpChainApp.blockchainConfig
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
}

Expand Down
Expand Up @@ -35,7 +35,9 @@ class BlockBroadcast(val etcPeerManager: ActorRef, syncConfig: SyncConfig) {
}

private def shouldSendNewBlock(newBlock: NewBlock, peerInfo: PeerInfo): Boolean =
newBlock.block.header.number > peerInfo.maxBlockNumber || newBlock.totalDifficulty > peerInfo.totalDifficulty
newBlock.block.header.number > peerInfo.maxBlockNumber ||
newBlock.totalDifficulty > peerInfo.totalDifficulty ||
newBlock.latestCheckpointNumber > peerInfo.latestCheckpointNumber

private def broadcastNewBlock(newBlock: NewBlock, peers: Set[Peer]): Unit =
obtainRandomPeerSubset(peers).foreach { peer =>
Expand Down
Expand Up @@ -161,12 +161,12 @@ object PeersClient {

def bestPeer(peersToDownloadFrom: Map[Peer, PeerInfo]): Option[Peer] = {
val peersToUse = peersToDownloadFrom
.collect { case (ref, PeerInfo(_, totalDifficulty, true, _, _)) =>
(ref, totalDifficulty)
.collect { case (ref, PeerInfo(_, totalDifficulty, latestChkp, true, _, _)) =>
(ref, totalDifficulty, latestChkp)
}

if (peersToUse.nonEmpty) {
val (peer, _) = peersToUse.maxBy { case (_, td) => td }
val (peer, _, _) = peersToUse.maxBy { case (_, td, latestChkp) => latestChkp -> td }
Some(peer)
} else {
None
Expand Down
Expand Up @@ -173,8 +173,9 @@ class PivotBlockSelector(
}

private def collectVoters: ElectionDetails = {
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) =>
(peer, maxBlockNumber)
val peersUsedToChooseTarget = peersToDownloadFrom.collect {
case (peer, PeerInfo(_, _, _, true, maxBlockNumber, _)) =>
(peer, maxBlockNumber)
}

val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => -number }
Expand Down
Expand Up @@ -7,11 +7,13 @@ import io.iohk.ethereum.consensus.validators.Validators
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
import io.iohk.ethereum.domain.Blockchain
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.utils.BlockchainConfig
import io.iohk.ethereum.utils.Config.SyncConfig

class SyncController(
appStateStorage: AppStateStorage,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
fastSyncStateStorage: FastSyncStateStorage,
ledger: Ledger,
validators: Validators,
Expand Down Expand Up @@ -101,6 +103,7 @@ class SyncController(
peerEventBus,
ledger,
blockchain,
blockchainConfig,
syncConfig,
ommersPool,
pendingTransactionsManager,
Expand All @@ -120,6 +123,7 @@ object SyncController {
def props(
appStateStorage: AppStateStorage,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncStateStorage: FastSyncStateStorage,
ledger: Ledger,
validators: Validators,
Expand All @@ -134,6 +138,7 @@ object SyncController {
new SyncController(
appStateStorage,
blockchain,
blockchainConfig,
syncStateStorage,
ledger,
validators,
Expand Down
Expand Up @@ -49,7 +49,9 @@ class BlockFetcher(

private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) =>
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
peerEventBus ! Subscribe(MessageClassifier(Set(NewBlock.code, NewBlockHashes.code), PeerSelector.AllPeers))
peerEventBus ! Subscribe(
MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers)
)
}

def handleCommonMessages(state: Option[BlockFetcherState]): Receive = { case PrintStatus =>
Expand Down Expand Up @@ -149,7 +151,7 @@ class BlockFetcher(
}

fetchBlocks(newState)
case MessageFromPeer(NewBlock(block, _), peerId) =>
case MessageFromPeer(NewBlock(block, _, _), peerId) =>
val newBlockNr = block.number
val nextExpectedBlock = state.lastFullBlockNumber + 1

Expand Down
Expand Up @@ -14,23 +14,24 @@ import io.iohk.ethereum.domain.{Block, Blockchain, Checkpoint, SignedTransaction
import io.iohk.ethereum.ledger._
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.network.p2p.messages.CommonMessages.{NewBlock, NewBlock63, NewBlock64}
import io.iohk.ethereum.ommers.OmmersPool.{AddOmmers, RemoveOmmers}
import io.iohk.ethereum.transactions.PendingTransactionsManager
import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddUncheckedTransactions, RemoveTransactions}
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.{BlockchainConfig, ByteStringUtils}
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.utils.FunctorOps._

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

// scalastyle:off cyclomatic.complexity
// scalastyle:off cyclomatic.complexity parameter.number
class BlockImporter(
fetcher: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
Expand Down Expand Up @@ -249,8 +250,22 @@ class BlockImporter(
}
}

private def broadcastBlocks(blocks: List[Block], totalDifficulties: List[BigInt]): Unit = {
val newBlocks = (blocks, totalDifficulties).mapN(NewBlock.apply)
private def broadcastBlocks(
blocks: List[Block],
totalDifficulties: List[BigInt]
): Unit = {
val constructNewBlock = {
//FIXME: instead of choosing the message version based on block we should rely on the receiving
// peer's `Capability`. To be addressed in ETCM-280
if (blocks.lastOption.exists(_.number < blockchainConfig.ecip1097BlockNumber))
NewBlock63.apply _
else
//FIXME: we should use checkpoint number corresponding to the block we're broadcasting. This will be addressed
// in ETCM-263 by using ChainWeight for that block
NewBlock64.apply(_, _, blockchain.getLatestCheckpointBlockNumber())
}

val newBlocks = (blocks, totalDifficulties).mapN(constructNewBlock)
broadcastNewBlocks(newBlocks)
}

Expand Down Expand Up @@ -320,6 +335,7 @@ object BlockImporter {
fetcher: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
Expand All @@ -331,6 +347,7 @@ object BlockImporter {
fetcher,
ledger,
blockchain,
blockchainConfig,
syncConfig,
ommersPool,
broadcaster,
Expand Down
Expand Up @@ -7,7 +7,7 @@ import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.crypto.ECDSASignature
import io.iohk.ethereum.domain.{Block, Blockchain}
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.{BlockchainConfig, ByteStringUtils}
import io.iohk.ethereum.utils.Config.SyncConfig

class RegularSync(
Expand All @@ -16,6 +16,7 @@ class RegularSync(
peerEventBus: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
Expand All @@ -38,6 +39,7 @@ class RegularSync(
fetcher,
ledger,
blockchain,
blockchainConfig,
syncConfig,
ommersPool,
broadcaster,
Expand Down Expand Up @@ -91,6 +93,7 @@ object RegularSync {
peerEventBus: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
Expand All @@ -104,6 +107,7 @@ object RegularSync {
peerEventBus,
ledger,
blockchain,
blockchainConfig,
syncConfig,
ommersPool,
pendingTransactionsManager,
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/io/iohk/ethereum/ledger/BlockImport.scala
Expand Up @@ -274,8 +274,11 @@ case object BlockEnqueued extends BlockImportResult

case object DuplicateBlock extends BlockImportResult

case class ChainReorganised(oldBranch: List[Block], newBranch: List[Block], totalDifficulties: List[BigInt])
extends BlockImportResult
case class ChainReorganised(
oldBranch: List[Block],
newBranch: List[Block],
totalDifficulties: List[BigInt]
) extends BlockImportResult

case class BlockImportFailed(error: String) extends BlockImportResult

Expand Down
28 changes: 20 additions & 8 deletions src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala
Expand Up @@ -143,7 +143,7 @@ class EtcPeerManagerActor(
* @return new updated peer info
*/
private def handleReceivedMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo = {
(updateTotalDifficulty(message) _
(updateTotalDifficultyAndCheckpoint(message) _
andThen updateForkAccepted(message, initialPeerWithInfo.peer)
andThen updateMaxBlock(message))(initialPeerWithInfo.peerInfo)
}
Expand All @@ -155,11 +155,15 @@ class EtcPeerManagerActor(
* @param initialPeerInfo from before the message was processed
* @return new peer info with the total difficulty updated
*/
private def updateTotalDifficulty(message: Message)(initialPeerInfo: PeerInfo): PeerInfo = message match {
case newBlock: NewBlock =>
initialPeerInfo.withTotalDifficulty(newBlock.totalDifficulty)
case _ => initialPeerInfo
}
private def updateTotalDifficultyAndCheckpoint(message: Message)(initialPeerInfo: PeerInfo): PeerInfo =
message match {
case newBlock: NewBlock =>
initialPeerInfo.copy(
totalDifficulty = newBlock.totalDifficulty,
latestCheckpointNumber = newBlock.latestCheckpointNumber
)
case _ => initialPeerInfo
}

/**
* Processes the message and updates if the fork block was accepted from the peer
Expand Down Expand Up @@ -228,11 +232,12 @@ class EtcPeerManagerActor(

object EtcPeerManagerActor {

val msgCodesWithInfo: Set[Int] = Set(BlockHeaders.code, NewBlock.code, NewBlockHashes.code)
val msgCodesWithInfo: Set[Int] = Set(BlockHeaders.code, NewBlock.code63, NewBlock.code64, NewBlockHashes.code)

case class PeerInfo(
remoteStatus: Status, // Updated only after handshaking
totalDifficulty: BigInt,
latestCheckpointNumber: BigInt,
forkAccepted: Boolean,
maxBlockNumber: BigInt,
bestBlockHash: ByteString
Expand All @@ -257,7 +262,14 @@ object EtcPeerManagerActor {

object PeerInfo {
def apply(remoteStatus: Status, forkAccepted: Boolean): PeerInfo = {
PeerInfo(remoteStatus, remoteStatus.totalDifficulty, forkAccepted, 0, remoteStatus.bestHash)
PeerInfo(
remoteStatus,
remoteStatus.totalDifficulty,
remoteStatus.latestCheckpointNumber,
forkAccepted,
0,
remoteStatus.bestHash
)
}

def withForkAccepted(remoteStatus: Status): PeerInfo = PeerInfo(remoteStatus, forkAccepted = true)
Expand Down
Expand Up @@ -5,7 +5,7 @@ import io.iohk.ethereum.domain.Blockchain
import io.iohk.ethereum.network.ForkResolver
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.utils.NodeStatus
import io.iohk.ethereum.utils.{BlockchainConfig, NodeStatus}
import java.util.concurrent.atomic.AtomicReference

case class EtcHandshaker private (
Expand All @@ -31,6 +31,7 @@ object EtcHandshaker {
trait EtcHandshakerConfiguration {
val nodeStatusHolder: AtomicReference[NodeStatus]
val blockchain: Blockchain
val blockchainConfig: BlockchainConfig
val appStateStorage: AppStateStorage
val peerConfiguration: PeerConfiguration
val forkResolverOpt: Option[ForkResolver]
Expand Down
Expand Up @@ -52,10 +52,15 @@ case class EtcNodeStatusExchangeState(handshakerConfiguration: EtcHandshakerConf
private def createStatusMsg(): Status = {
val bestBlockHeader = getBestBlockHeader()
val totalDifficulty = blockchain.getTotalDifficultyByHash(bestBlockHeader.hash).get
val latestCheckpointNumber =
if (bestBlockHeader.number < blockchainConfig.ecip1097BlockNumber) None
else Some(blockchain.getLatestCheckpointBlockNumber())

val status = Status(
protocolVersion = Versions.PV63,
networkId = peerConfiguration.networkId,
totalDifficulty = totalDifficulty,
latestCheckpointNumber = latestCheckpointNumber,
bestHash = bestBlockHeader.hash,
genesisHash = blockchain.genesisHeader.hash
)
Expand Down
26 changes: 13 additions & 13 deletions src/main/scala/io/iohk/ethereum/network/p2p/MessageDecoders.scala
Expand Up @@ -25,28 +25,28 @@ import io.iohk.ethereum.network.p2p.messages.Versions._

object NetworkMessageDecoder extends MessageDecoder {

override def fromBytes(`type`: Int, payload: Array[Byte], protocolVersion: Version): Message =
(protocolVersion, `type`) match {
override def fromBytes(msgCode: Int, payload: Array[Byte], protocolVersion: Version): Message =
(protocolVersion, msgCode) match {
case (_, Disconnect.code) => payload.toDisconnect
case (_, Ping.code) => payload.toPing
case (_, Pong.code) => payload.toPong
case _ => throw new RuntimeException(s"Unknown message type: ${`type`}")
case _ => throw new RuntimeException(s"Unknown message type: ${msgCode}")
}

}

// scalastyle:off
object EthereumMessageDecoder extends MessageDecoder {

override def fromBytes(`type`: Int, payload: Array[Byte], protocolVersion: Version): Message =
(protocolVersion, `type`) match {
override def fromBytes(msgCode: Int, payload: Array[Byte], protocolVersion: Version): Message =
(protocolVersion, msgCode) match {
//wire protocol
case (_, Hello.code) => payload.toHello

//common
case (_, Status.code) => payload.toStatus
case (_, Status.code63 | Status.code64) => payload.toStatus(msgCode)
case (_, SignedTransactions.code) => payload.toSignedTransactions
case (_, NewBlock.code) => payload.toNewBlock
case (_, NewBlock.code63 | NewBlock.code64) => payload.toNewBlock(msgCode)

case (PV61, t) => handlePV61(t, payload)

Expand All @@ -58,23 +58,23 @@ object EthereumMessageDecoder extends MessageDecoder {

case (PV63, t) => handlePV63(t, payload)

case _ => throw new RuntimeException(s"Unknown message type: ${`type`}")
case _ => throw new RuntimeException(s"Unknown message type: ${msgCode}")
}

private def handlePV61(`type`: Int, payload: Array[Byte]): Message = {
private def handlePV61(msgCode: Int, payload: Array[Byte]): Message = {
import io.iohk.ethereum.network.p2p.messages.PV61.NewBlockHashes._
`type` match {
msgCode match {
case pv61.NewBlockHashes.code => payload.toNewBlockHashes
case pv61.BlockHashesFromNumber.code => payload.toBlockHashesFromNumber
case _ => throw new RuntimeException(s"Unknown message type: ${`type`}")
case _ => throw new RuntimeException(s"Unknown message type: ${msgCode}")
}
}

private def handlePV63(`type`: Int, payload: Array[Byte]): Message = `type` match {
private def handlePV63(msgCode: Int, payload: Array[Byte]): Message = msgCode match {
case pv63.GetNodeData.code => payload.toGetNodeData
case pv63.NodeData.code => payload.toNodeData
case pv63.GetReceipts.code => payload.toGetReceipts
case pv63.Receipts.code => payload.toReceipts
case _ => throw new RuntimeException(s"Unknown message type: ${`type`}")
case _ => throw new RuntimeException(s"Unknown message type: ${msgCode}")
}
}

0 comments on commit cde4e85

Please sign in to comment.