Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into etcm-366/consensus…
Browse files Browse the repository at this point in the history
…-white-listing
  • Loading branch information
KonradStaniec committed Nov 19, 2020
2 parents 40d057b + aaf3c85 commit 793196d
Show file tree
Hide file tree
Showing 38 changed files with 1,250 additions and 259 deletions.
8 changes: 7 additions & 1 deletion build.sbt
Expand Up @@ -66,6 +66,11 @@ val root = {
val root = project
.in(file("."))
.configs(Integration, Benchmark, Evm, Ets, Snappy, Rpc)
.enablePlugins(BuildInfoPlugin)
.settings(
buildInfoKeys := Seq[BuildInfoKey](name, version, git.gitHeadCommit),
buildInfoPackage := "io.iohk.ethereum.utils"
)
.settings(commonSettings: _*)
.settings(
libraryDependencies ++= dep
Expand Down Expand Up @@ -116,8 +121,9 @@ Test / parallelExecution := true
testOptions in Test += Tests.Argument("-oDG")

// protobuf compilation
// Into a subdirectory of src_managed to avoid it deleting other generated files; see https://github.com/sbt/sbt-buildinfo/issues/149
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
scalapb.gen() -> (sourceManaged in Compile).value / "protobuf"
)

// have the protobuf API version file as a resource
Expand Down
2 changes: 2 additions & 0 deletions project/plugins.sbt
Expand Up @@ -6,5 +6,7 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.7.5")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.0")
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.25")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")

libraryDependencies += "com.trueaccord.scalapb" %% "compilerplugin" % "0.6.6"
200 changes: 200 additions & 0 deletions project/repo.nix

Large diffs are not rendered by default.

Expand Up @@ -294,7 +294,6 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
val currentWolrd = getMptForBlock(block)
val (newBlock, newWeight, _) = createChildBlock(block, currentWeight, currentWolrd)(updateWorldForBlock)
bl.save(newBlock, Seq(), newWeight, saveAsBestBlock = true)
bl.persistCachedNodes()
broadcastBlock(newBlock, newWeight)
}.flatMap(_ => importBlocksUntil(n)(updateWorldForBlock))
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.conf
@@ -1,6 +1,6 @@
mantis {
# Identifier used when connecting to other clients
client-id = "mantis"
# Optionally augment the client ID sent in Hello messages.
client-identity = null

# Version string (reported by an RPC method)
client-version = "mantis/v2.0"
Expand Down
Expand Up @@ -787,6 +787,7 @@ class FastSync(
val bestReceivedBlock = fullBlocks.maxBy(_.number)
val lastStoredBestBlockNumber = appStateStorage.getBestBlockNumber()
if (lastStoredBestBlockNumber < bestReceivedBlock.number) {
blockchain.saveBestKnownBlocks(bestReceivedBlock.number)
appStateStorage.putBestBlockNumber(bestReceivedBlock.number).commit()
}
syncState = syncState.copy(lastFullBlockNumber = bestReceivedBlock.number.max(lastStoredBestBlockNumber))
Expand Down
Expand Up @@ -114,12 +114,12 @@ class BlockFetcher(
} else {
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))

state.validatedHeaders(headers) match {
state.appendHeaders(headers) match {
case Left(err) =>
peersClient ! BlacklistPeer(peer.id, err)
log.info("Dismissed received headers due to: {}", err)
state.withHeaderFetchReceived
case Right(validHeaders) =>
state.withHeaderFetchReceived.appendHeaders(validHeaders)
case Right(updatedState) =>
updatedState.withHeaderFetchReceived
}
}

Expand All @@ -144,7 +144,7 @@ class BlockFetcher(
state.withBodiesFetchReceived
} else {
log.debug("Fetched {} block bodies", bodies.size)
state.withBodiesFetchReceived.addBodies(peer, bodies)
state.withBodiesFetchReceived.addBodies(peer.id, bodies)
}

fetchBlocks(newState)
Expand Down Expand Up @@ -188,40 +188,24 @@ class BlockFetcher(
case Left(_) => state
case Right(validHashes) => state.withPossibleNewTopAt(validHashes.lastOption.map(_.number))
}

supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)

fetchBlocks(newState)
case MessageFromPeer(NewBlock(_, block, _), peerId) =>
//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
log.debug("Received NewBlock {}", block.idTag)
val newBlockNr = block.number
val nextExpectedBlock = state.lastFullBlockNumber + 1

log.debug("Received NewBlock nr {}", newBlockNr)

// we're on top, so we can pass block directly to importer
if (newBlockNr == nextExpectedBlock && state.isOnTop) {
log.debug("Pass block directly to importer")
if (state.isOnTop && newBlockNr == nextExpectedBlock) {
log.debug("Passing block directly to importer")
val newState = state.withPeerForBlocks(peerId, Seq(newBlockNr)).withKnownTopAt(newBlockNr)
state.importer ! OnTop
state.importer ! ImportNewBlock(block, peerId)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
context become started(newState)
// there are some blocks waiting for import but it seems that we reached top on fetch side so we can enqueue new block for import
} else if (newBlockNr == nextExpectedBlock && !state.isFetching && state.waitingHeaders.isEmpty) {
log.debug("Enqueue new block for import")
val newState = state.appendNewBlock(block, peerId)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
context become started(newState)
// waiting for some bodies but we don't have this header yet - at least we can use new block header
} else if (newBlockNr == state.nextToLastBlock && !state.isFetchingHeaders) {
log.debug("Waiting for bodies. Add only headers")
val newState = state.appendHeaders(List(block.header))
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
fetchBlocks(newState)
// we're far from top
} else if (newBlockNr > nextExpectedBlock) {
log.debug("Far from top")
val newState = state.withKnownTopAt(newBlockNr)
} else {
log.debug("Ignoring received block as it doesn't match local state or fetch side is not on top")
val newState = state.withPossibleNewTopAt(block.number)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
fetchBlocks(newState)
}
Expand All @@ -236,17 +220,17 @@ class BlockFetcher(
//ex. After a successful handshake, fetcher will receive the info about the header of the peer best block
case MessageFromPeer(BlockHeaders(headers), _) =>
headers.lastOption.map { bh =>
log.debug(s"Candidate for new top at block ${bh.number}, current know top ${state.knownTop}")
log.debug(s"Candidate for new top at block ${bh.number}, current known top ${state.knownTop}")
val newState = state.withPossibleNewTopAt(bh.number)
fetchBlocks(newState)
}
//keep fetcher state updated in case new checkpoint block or mined block was imported
case InternalLastBlockImport(blockNr) => {
case InternalLastBlockImport(blockNr) =>
log.debug(s"New last block $blockNr imported from the inside")
val newLastBlock = blockNr.max(state.lastBlock)
val newState = state.withLastBlock(newLastBlock).withPossibleNewTopAt(blockNr)

fetchBlocks(newState)
}
}

private def handlePickedBlocks(
Expand Down Expand Up @@ -277,7 +261,7 @@ class BlockFetcher(
.getOrElse(fetcherState)

private def fetchHeaders(state: BlockFetcherState): Unit = {
val blockNr = state.nextToLastBlock
val blockNr = state.nextBlockToFetch
val amount = syncConfig.blockHeadersPerRequest

fetchHeadersFrom(blockNr, amount) pipeTo self
Expand Down
Expand Up @@ -3,12 +3,11 @@ package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.ActorRef
import akka.util.ByteString
import cats.data.NonEmptyList
import io.iohk.ethereum.domain.{Block, BlockHeader, BlockBody, HeadersSeq}
import io.iohk.ethereum.network.{Peer, PeerId}
import cats.implicits._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState._
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
import BlockFetcherState._
import cats.syntax.either._
import cats.syntax.option._

import scala.collection.immutable.Queue

Expand Down Expand Up @@ -52,7 +51,7 @@ case class BlockFetcherState(

def hasFetchedTopHeader: Boolean = lastBlock == knownTop

def isOnTop: Boolean = !isFetching && hasFetchedTopHeader && hasEmptyBuffer
def isOnTop: Boolean = hasFetchedTopHeader && hasEmptyBuffer

def hasReachedSize(size: Int): Boolean = (readyBlocks.size + waitingHeaders.size) >= size

Expand All @@ -68,26 +67,43 @@ case class BlockFetcherState(
.orElse(waitingHeaders.headOption.map(_.number))
.getOrElse(lastBlock)

def nextToLastBlock: BigInt = lastBlock + 1
/**
* Next block number to be fetched, calculated in a way to maintain local queues consistency,
* even if `lastBlock` property is much higher - it's more important to have this consistency
* here and allow standard rollback/reorganization mechanisms to kick in if we get too far with mining,
* therefore `lastBlock` is used here only if blocks and headers queues are empty
*/
def nextBlockToFetch: BigInt = waitingHeaders.lastOption
.map(_.number)
.orElse(readyBlocks.lastOption.map(_.number))
.getOrElse(lastBlock) + 1

def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)

def appendHeaders(headers: Seq[BlockHeader]): BlockFetcherState =
withPossibleNewTopAt(headers.lastOption.map(_.number))
.copy(
waitingHeaders = waitingHeaders ++ headers.filter(_.number > lastBlock).sortBy(_.number),
lastBlock = HeadersSeq.lastNumber(headers).getOrElse(lastBlock)
)
def appendHeaders(headers: Seq[BlockHeader]): Either[String, BlockFetcherState] =
validatedHeaders(headers.sortBy(_.number)).map(validHeaders => {
val lastNumber = HeadersSeq.lastNumber(validHeaders)
withPossibleNewTopAt(lastNumber)
.copy(
waitingHeaders = waitingHeaders ++ validHeaders,
lastBlock = lastNumber.getOrElse(lastBlock)
)
})

def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
/**
* Validates received headers consistency and their compatibilty with the state
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
*/
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
if (headers.isEmpty) {
Right(headers)
} else {
headers
.asRight[String]
.ensure("Given headers are not sequence with already fetched ones")(_.head.number <= nextToLastBlock)
.ensure("Given headers aren't better than already fetched ones")(_.last.number > lastBlock)
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
.ensure("Given headers do not form a chain with already stored ones")(headers =>
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
)
}

def validateNewBlockHashes(hashes: Seq[BlockHash]): Either[String, Seq[BlockHash]] =
Expand All @@ -100,15 +116,26 @@ case class BlockFetcherState(
}
)

def addBodies(peer: Peer, bodies: Seq[BlockBody]): BlockFetcherState = {
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
val blocks = matching.zip(bodies).map((Block.apply _).tupled)

withPeerForBlocks(peer.id, blocks.map(_.header.number))
.copy(
readyBlocks = readyBlocks.enqueue(blocks),
waitingHeaders = waiting
)
/**
* Matches bodies with headers in queue and adding matched bodies to the blocks.
* If bodies is empty collection - headers in queue are removed as the cause is:
* - the headers are from rejected fork and therefore it won't be possible to resolve bodies for them
* - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
* when making a request)
*/
def addBodies(peerId: PeerId, bodies: Seq[BlockBody]): BlockFetcherState = {
if (bodies.isEmpty) {
copy(waitingHeaders = Queue.empty)
} else {
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
val blocks = matching.zip(bodies).map((Block.apply _).tupled)

withPeerForBlocks(peerId, blocks.map(_.header.number))
.copy(
readyBlocks = readyBlocks.enqueue(blocks),
waitingHeaders = waiting
)
}
}

def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
Expand Down
Expand Up @@ -70,11 +70,13 @@ class BlockImporter(
SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body))
importBlocks(blocks)(state)

//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
case MinedBlock(block) =>
if (!state.importing) {
importMinedBlock(block, state)
}

//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
case nc @ NewCheckpoint(parentHash, signatures) =>
if (state.importing) {
//We don't want to lose a checkpoint
Expand Down
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit
import io.iohk.ethereum.db.cache.{LruCache, MapCache}
import io.iohk.ethereum.db.dataSource.{DataSource, EphemDataSource}
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
import io.iohk.ethereum.db.storage.StateStorage.{FlushSituation, GenesisDataLoad, RollBackFlush}
import io.iohk.ethereum.db.storage.StateStorage.{FlushSituation, GenesisDataLoad}
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
import io.iohk.ethereum.mpt.MptNode
import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._
Expand Down Expand Up @@ -119,7 +119,6 @@ class CachedReferenceCountedStateStorage(
override def forcePersist(reason: FlushSituation): Boolean = {
reason match {
case GenesisDataLoad => CachedReferenceCountedStorage.persistCache(lruCache, nodeStorage, forced = true)
case RollBackFlush => false
}
}

Expand Down Expand Up @@ -194,7 +193,6 @@ object StateStorage {
}

sealed abstract class FlushSituation
case object RollBackFlush extends FlushSituation
case object GenesisDataLoad extends FlushSituation

}
2 changes: 1 addition & 1 deletion src/main/scala/io/iohk/ethereum/domain/Block.scala
Expand Up @@ -23,7 +23,7 @@ case class Block(header: BlockHeader, body: BlockBody) {

val hasCheckpoint: Boolean = header.hasCheckpoint

def isParentOf(child: Block): Boolean = number + 1 == child.number && child.header.parentHash == hash
def isParentOf(child: Block): Boolean = header.isParentOf(child.header)
}

object Block {
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/io/iohk/ethereum/domain/BlockHeader.scala
Expand Up @@ -53,6 +53,8 @@ case class BlockHeader(

val hasCheckpoint: Boolean = checkpoint.isDefined

def isParentOf(child: BlockHeader): Boolean = number + 1 == child.number && child.parentHash == hash

override def toString: String = {
val (treasuryOptOutString: String, checkpointString: String) = extraFields match {
case HefPostEcip1097(definedOptOut, maybeCheckpoint) =>
Expand Down

0 comments on commit 793196d

Please sign in to comment.