Skip to content

Commit

Permalink
Merge branch 'develop' into chore/update-config-testnet-internal
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolás Tallar committed Nov 19, 2020
2 parents 41075d3 + aaf3c85 commit 6609bcf
Show file tree
Hide file tree
Showing 29 changed files with 1,052 additions and 147 deletions.
8 changes: 7 additions & 1 deletion build.sbt
Expand Up @@ -66,6 +66,11 @@ val root = {
val root = project
.in(file("."))
.configs(Integration, Benchmark, Evm, Ets, Snappy, Rpc)
.enablePlugins(BuildInfoPlugin)
.settings(
buildInfoKeys := Seq[BuildInfoKey](name, version, git.gitHeadCommit),
buildInfoPackage := "io.iohk.ethereum.utils"
)
.settings(commonSettings: _*)
.settings(
libraryDependencies ++= dep
Expand Down Expand Up @@ -116,8 +121,9 @@ Test / parallelExecution := true
testOptions in Test += Tests.Argument("-oDG")

// protobuf compilation
// Into a subdirectory of src_managed to avoid it deleting other generated files; see https://github.com/sbt/sbt-buildinfo/issues/149
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
scalapb.gen() -> (sourceManaged in Compile).value / "protobuf"
)

// have the protobuf API version file as a resource
Expand Down
2 changes: 2 additions & 0 deletions project/plugins.sbt
Expand Up @@ -6,5 +6,7 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.7.5")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.0")
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.25")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")

libraryDependencies += "com.trueaccord.scalapb" %% "compilerplugin" % "0.6.6"
200 changes: 200 additions & 0 deletions project/repo.nix

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/main/resources/application.conf
@@ -1,6 +1,6 @@
mantis {
# Identifier used when connecting to other clients
client-id = "mantis"
# Optionally augment the client ID sent in Hello messages.
client-identity = null

# Version string (reported by an RPC method)
client-version = "mantis/v2.0"
Expand Down
Expand Up @@ -114,12 +114,12 @@ class BlockFetcher(
} else {
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))

state.validatedHeaders(headers) match {
state.appendHeaders(headers) match {
case Left(err) =>
peersClient ! BlacklistPeer(peer.id, err)
log.info("Dismissed received headers due to: {}", err)
state.withHeaderFetchReceived
case Right(validHeaders) =>
state.withHeaderFetchReceived.appendHeaders(validHeaders)
case Right(updatedState) =>
updatedState.withHeaderFetchReceived
}
}

Expand All @@ -144,7 +144,7 @@ class BlockFetcher(
state.withBodiesFetchReceived
} else {
log.debug("Fetched {} block bodies", bodies.size)
state.withBodiesFetchReceived.addBodies(peer, bodies)
state.withBodiesFetchReceived.addBodies(peer.id, bodies)
}

fetchBlocks(newState)
Expand Down Expand Up @@ -188,40 +188,24 @@ class BlockFetcher(
case Left(_) => state
case Right(validHashes) => state.withPossibleNewTopAt(validHashes.lastOption.map(_.number))
}

supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)

fetchBlocks(newState)
case MessageFromPeer(NewBlock(_, block, _), peerId) =>
//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
log.debug("Received NewBlock {}", block.idTag)
val newBlockNr = block.number
val nextExpectedBlock = state.lastFullBlockNumber + 1

log.debug("Received NewBlock nr {}", newBlockNr)

// we're on top, so we can pass block directly to importer
if (newBlockNr == nextExpectedBlock && state.isOnTop) {
log.debug("Pass block directly to importer")
if (state.isOnTop && newBlockNr == nextExpectedBlock) {
log.debug("Passing block directly to importer")
val newState = state.withPeerForBlocks(peerId, Seq(newBlockNr)).withKnownTopAt(newBlockNr)
state.importer ! OnTop
state.importer ! ImportNewBlock(block, peerId)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
context become started(newState)
// there are some blocks waiting for import but it seems that we reached top on fetch side so we can enqueue new block for import
} else if (newBlockNr == nextExpectedBlock && !state.isFetching && state.waitingHeaders.isEmpty) {
log.debug("Enqueue new block for import")
val newState = state.appendNewBlock(block, peerId)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
context become started(newState)
// waiting for some bodies but we don't have this header yet - at least we can use new block header
} else if (newBlockNr == state.nextToLastBlock && !state.isFetchingHeaders) {
log.debug("Waiting for bodies. Add only headers")
val newState = state.appendHeaders(List(block.header))
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
fetchBlocks(newState)
// we're far from top
} else if (newBlockNr > nextExpectedBlock) {
log.debug("Far from top")
val newState = state.withKnownTopAt(newBlockNr)
} else {
log.debug("Ignoring received block as it doesn't match local state or fetch side is not on top")
val newState = state.withPossibleNewTopAt(block.number)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
fetchBlocks(newState)
}
Expand All @@ -236,17 +220,17 @@ class BlockFetcher(
//ex. After a successful handshake, fetcher will receive the info about the header of the peer best block
case MessageFromPeer(BlockHeaders(headers), _) =>
headers.lastOption.map { bh =>
log.debug(s"Candidate for new top at block ${bh.number}, current know top ${state.knownTop}")
log.debug(s"Candidate for new top at block ${bh.number}, current known top ${state.knownTop}")
val newState = state.withPossibleNewTopAt(bh.number)
fetchBlocks(newState)
}
//keep fetcher state updated in case new checkpoint block or mined block was imported
case InternalLastBlockImport(blockNr) => {
case InternalLastBlockImport(blockNr) =>
log.debug(s"New last block $blockNr imported from the inside")
val newLastBlock = blockNr.max(state.lastBlock)
val newState = state.withLastBlock(newLastBlock).withPossibleNewTopAt(blockNr)

fetchBlocks(newState)
}
}

private def handlePickedBlocks(
Expand Down Expand Up @@ -277,7 +261,7 @@ class BlockFetcher(
.getOrElse(fetcherState)

private def fetchHeaders(state: BlockFetcherState): Unit = {
val blockNr = state.nextToLastBlock
val blockNr = state.nextBlockToFetch
val amount = syncConfig.blockHeadersPerRequest

fetchHeadersFrom(blockNr, amount) pipeTo self
Expand Down
Expand Up @@ -3,12 +3,11 @@ package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.ActorRef
import akka.util.ByteString
import cats.data.NonEmptyList
import io.iohk.ethereum.domain.{Block, BlockHeader, BlockBody, HeadersSeq}
import io.iohk.ethereum.network.{Peer, PeerId}
import cats.implicits._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState._
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
import BlockFetcherState._
import cats.syntax.either._
import cats.syntax.option._

import scala.collection.immutable.Queue

Expand Down Expand Up @@ -52,7 +51,7 @@ case class BlockFetcherState(

def hasFetchedTopHeader: Boolean = lastBlock == knownTop

def isOnTop: Boolean = !isFetching && hasFetchedTopHeader && hasEmptyBuffer
def isOnTop: Boolean = hasFetchedTopHeader && hasEmptyBuffer

def hasReachedSize(size: Int): Boolean = (readyBlocks.size + waitingHeaders.size) >= size

Expand All @@ -68,26 +67,43 @@ case class BlockFetcherState(
.orElse(waitingHeaders.headOption.map(_.number))
.getOrElse(lastBlock)

def nextToLastBlock: BigInt = lastBlock + 1
/**
* Next block number to be fetched, calculated in a way to maintain local queues consistency,
* even if `lastBlock` property is much higher - it's more important to have this consistency
* here and allow standard rollback/reorganization mechanisms to kick in if we get too far with mining,
* therefore `lastBlock` is used here only if blocks and headers queues are empty
*/
def nextBlockToFetch: BigInt = waitingHeaders.lastOption
.map(_.number)
.orElse(readyBlocks.lastOption.map(_.number))
.getOrElse(lastBlock) + 1

def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)

def appendHeaders(headers: Seq[BlockHeader]): BlockFetcherState =
withPossibleNewTopAt(headers.lastOption.map(_.number))
.copy(
waitingHeaders = waitingHeaders ++ headers.filter(_.number > lastBlock).sortBy(_.number),
lastBlock = HeadersSeq.lastNumber(headers).getOrElse(lastBlock)
)
def appendHeaders(headers: Seq[BlockHeader]): Either[String, BlockFetcherState] =
validatedHeaders(headers.sortBy(_.number)).map(validHeaders => {
val lastNumber = HeadersSeq.lastNumber(validHeaders)
withPossibleNewTopAt(lastNumber)
.copy(
waitingHeaders = waitingHeaders ++ validHeaders,
lastBlock = lastNumber.getOrElse(lastBlock)
)
})

def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
/**
* Validates received headers consistency and their compatibilty with the state
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
*/
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
if (headers.isEmpty) {
Right(headers)
} else {
headers
.asRight[String]
.ensure("Given headers are not sequence with already fetched ones")(_.head.number <= nextToLastBlock)
.ensure("Given headers aren't better than already fetched ones")(_.last.number > lastBlock)
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
.ensure("Given headers do not form a chain with already stored ones")(headers =>
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
)
}

def validateNewBlockHashes(hashes: Seq[BlockHash]): Either[String, Seq[BlockHash]] =
Expand All @@ -100,15 +116,26 @@ case class BlockFetcherState(
}
)

def addBodies(peer: Peer, bodies: Seq[BlockBody]): BlockFetcherState = {
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
val blocks = matching.zip(bodies).map((Block.apply _).tupled)

withPeerForBlocks(peer.id, blocks.map(_.header.number))
.copy(
readyBlocks = readyBlocks.enqueue(blocks),
waitingHeaders = waiting
)
/**
* Matches bodies with headers in queue and adding matched bodies to the blocks.
* If bodies is empty collection - headers in queue are removed as the cause is:
* - the headers are from rejected fork and therefore it won't be possible to resolve bodies for them
* - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
* when making a request)
*/
def addBodies(peerId: PeerId, bodies: Seq[BlockBody]): BlockFetcherState = {
if (bodies.isEmpty) {
copy(waitingHeaders = Queue.empty)
} else {
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
val blocks = matching.zip(bodies).map((Block.apply _).tupled)

withPeerForBlocks(peerId, blocks.map(_.header.number))
.copy(
readyBlocks = readyBlocks.enqueue(blocks),
waitingHeaders = waiting
)
}
}

def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
Expand Down
Expand Up @@ -70,11 +70,13 @@ class BlockImporter(
SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body))
importBlocks(blocks)(state)

//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
case MinedBlock(block) =>
if (!state.importing) {
importMinedBlock(block, state)
}

//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
case nc @ NewCheckpoint(parentHash, signatures) =>
if (state.importing) {
//We don't want to lose a checkpoint
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/iohk/ethereum/domain/Block.scala
Expand Up @@ -23,7 +23,7 @@ case class Block(header: BlockHeader, body: BlockBody) {

val hasCheckpoint: Boolean = header.hasCheckpoint

def isParentOf(child: Block): Boolean = number + 1 == child.number && child.header.parentHash == hash
def isParentOf(child: Block): Boolean = header.isParentOf(child.header)
}

object Block {
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/io/iohk/ethereum/domain/BlockHeader.scala
Expand Up @@ -47,6 +47,8 @@ case class BlockHeader(

val hasCheckpoint: Boolean = checkpoint.isDefined

def isParentOf(child: BlockHeader): Boolean = number + 1 == child.number && child.parentHash == hash

override def toString: String = {
val (treasuryOptOutString: String, checkpointString: String) = extraFields match {
case HefPostEcip1097(definedOptOut, maybeCheckpoint) =>
Expand Down
20 changes: 14 additions & 6 deletions src/main/scala/io/iohk/ethereum/faucet/FaucetConfig.scala
@@ -1,13 +1,13 @@
package io.iohk.ethereum.faucet

import com.typesafe.config.{ConfigFactory, Config => TypesafeConfig}
import com.typesafe.config.{ConfigFactory, Config}
import io.iohk.ethereum.domain.Address

import scala.concurrent.duration.{FiniteDuration, _}

trait FaucetConfigBuilder {
lazy val rawConfig: TypesafeConfig = ConfigFactory.load()
lazy val rawMantisConfig: TypesafeConfig = rawConfig.getConfig("mantis")
lazy val rawConfig: Config = ConfigFactory.load()
lazy val rawMantisConfig: Config = rawConfig.getConfig("mantis")
lazy val faucetConfig: FaucetConfig = FaucetConfig(rawConfig)
}

Expand All @@ -19,11 +19,15 @@ case class FaucetConfig(
txValue: BigInt,
rpcAddress: String,
keyStoreDir: String,
minRequestInterval: FiniteDuration
minRequestInterval: FiniteDuration,
handlerTimeout: FiniteDuration,
responseTimeout: FiniteDuration,
supervisor: SupervisorConfig,
shutdownTimeout: FiniteDuration
)

object FaucetConfig {
def apply(typesafeConfig: TypesafeConfig): FaucetConfig = {
def apply(typesafeConfig: Config): FaucetConfig = {
val faucetConfig = typesafeConfig.getConfig("faucet")

FaucetConfig(
Expand All @@ -34,7 +38,11 @@ object FaucetConfig {
txValue = faucetConfig.getLong("tx-value"),
rpcAddress = faucetConfig.getString("rpc-address"),
keyStoreDir = faucetConfig.getString("keystore-dir"),
minRequestInterval = faucetConfig.getDuration("min-request-interval").toMillis.millis
minRequestInterval = faucetConfig.getDuration("min-request-interval").toMillis.millis,
handlerTimeout = faucetConfig.getDuration("handler-timeout").toMillis.millis,
responseTimeout = faucetConfig.getDuration("response-timeout").toMillis.millis,
supervisor = SupervisorConfig(faucetConfig),
shutdownTimeout = faucetConfig.getDuration("shutdown-timeout").toMillis.millis
)
}
}

0 comments on commit 6609bcf

Please sign in to comment.