Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/phase/3/txExecution' into featur…
Browse files Browse the repository at this point in the history
…e/precompiledContracts
  • Loading branch information
rtkaczyk committed Apr 19, 2017
2 parents e8301e1 + a737877 commit ced6cce
Show file tree
Hide file tree
Showing 21 changed files with 436 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ trait FastSync {
val (nonMptNodesToGet, remainingNonMptNodes) = nonMptNodesQueue.splitAt(nodesPerRequest)
val (mptNodesToGet, remainingMptNodes) = mptNodesQueue.splitAt(nodesPerRequest - nonMptNodesToGet.size)
val nodesToGet = nonMptNodesToGet ++ mptNodesToGet
val handler = context.actorOf(FastSyncNodesRequestHandler.props(peer, nodesToGet, blockchain, mptNodeStorage))
val handler = context.actorOf(FastSyncNodesRequestHandler.props(peer, nodesToGet, blockchain, blockchainStorages.mptNodeStorage))
context watch handler
assignedHandlers += (handler -> peer)
nonMptNodesQueue = remainingNonMptNodes
Expand Down
93 changes: 59 additions & 34 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/RegularSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import akka.actor._
import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlacklistPeer
import io.iohk.ethereum.blockchain.sync.SyncRequestHandler.Done
import io.iohk.ethereum.blockchain.sync.SyncController.{BlockBodiesReceived, BlockHeadersReceived, BlockHeadersToResolve, PrintStatus}
import io.iohk.ethereum.domain.BlockHeader
import io.iohk.ethereum.domain.{Block, BlockHeader}
import io.iohk.ethereum.ledger.BlockExecutionError
import io.iohk.ethereum.network.PeerActor.Status.Handshaked
import io.iohk.ethereum.network.PeerActor._
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.network.p2p.messages.PV62._
import io.iohk.ethereum.utils.Config
import org.spongycastle.util.encoders.Hex

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext.Implicits.global

trait RegularSync {
Expand Down Expand Up @@ -112,57 +114,80 @@ trait RegularSync {

private def handleBlockBodies(peer: ActorRef, m: Seq[BlockBody]) = {
if (m.nonEmpty) {
val result = headersQueue.zip(m).map { case (h, b) => blockValidator(h, b) }

if (!result.exists(_.isLeft) && result.nonEmpty) {
val blocks = result.collect { case Right(b) => b }

blockchain.getBlockHeaderByHash(blocks.head.header.parentHash)
.flatMap(b => blockchain.getTotalDifficultyByHash(b.hash)) match {
case Some(td) =>
var currentTd = td
val newBlocks = blocks.map { b =>
val blockHashToDelete = blockchain.getBlockHeaderByNumber(b.header.number).map(_.hash).filter(_ != b.header.hash)
blockchain.save(b)
appStateStorage.putBestBlockNumber(b.header.number)
currentTd += b.header.difficulty
blockchain.save(b.header.hash, currentTd)
blockHashToDelete.foreach(blockchain.removeBlock)

NewBlock(b, currentTd)
}
val blocks = headersQueue.zip(m).map{ case (header, body) => Block(header, body) }

blockchain.getBlockHeaderByHash(blocks.head.header.parentHash)
.flatMap(b => blockchain.getTotalDifficultyByHash(b.hash)) match {
case Some(blockParentTd) =>
val (newBlocks, error) = processBlocks(blocks, blockParentTd)

if(newBlocks.nonEmpty){
context.self ! BroadcastBlocks(newBlocks)
log.info(s"got new blocks up till block: ${newBlocks.last.block.header.number} " +
s"with hash ${Hex.toHexString(newBlocks.last.block.header.hash.toArray[Byte])}")
case None =>
log.error("no total difficulty for latest block")
}
}

if(error.isDefined){
val numberBlockFailed = blocks.head.header.number + newBlocks.length
resumeWithDifferentPeer(peer, reason = s"a block execution error: ${error.get.toString}, in block $numberBlockFailed")
}
case None =>
log.error("no total difficulty for latest block") //FIXME: How do we handle this error on our blockchain?
}

headersQueue = headersQueue.drop(result.length)
if (headersQueue.nonEmpty) {
val hashes = headersQueue.take(blockBodiesPerRequest).map(_.hash)
waitingForActor = Some(context.actorOf(SyncBlockBodiesRequestHandler.props(peer, hashes)))
} else {
context.self ! ResumeRegularSync
}
headersQueue = headersQueue.drop(blocks.length)
if (headersQueue.nonEmpty) {
val hashes = headersQueue.take(blockBodiesPerRequest).map(_.hash)
waitingForActor = Some(context.actorOf(SyncBlockBodiesRequestHandler.props(peer, hashes)))
} else {
//blacklist for errors in blocks
resumeWithDifferentPeer(peer)
context.self ! ResumeRegularSync
}
} else {
//we got empty response for bodies from peer but we got block headers earlier
resumeWithDifferentPeer(peer)
}
}

/**
* Inserts and executes all the blocks, up to the point to which one of them fails (or we run out of blocks).
* If the execution of any block were to fail, newBlocks only contains the NewBlock msgs for all the blocks executed before it,
* and only the blocks successfully executed are inserted into the blockchain.
*
* @param blocks to execute
* @param blockParentTd, td of the parent of the blocks.head block
* @param newBlocks which, after adding the corresponding NewBlock msg for blocks, will be broadcasted
* @return list of NewBlocks to broadcast (one per block successfully executed) and an error if one happened during execution
*/
@tailrec
private def processBlocks(blocks: Seq[Block], blockParentTd: BigInt,
newBlocks: Seq[NewBlock] = Nil): (Seq[NewBlock], Option[BlockExecutionError]) = blocks match {
case Nil =>
newBlocks -> None

case Seq(block, otherBlocks@_*) =>
val blockHashToDelete = blockchain.getBlockHeaderByNumber(block.header.number).map(_.hash).filter(_ != block.header.hash)
val blockExecResult = ledger.executeBlock(block, blockchainStorages, validators)
blockExecResult match {
case Right(_) =>
blockchain.save(block)
appStateStorage.putBestBlockNumber(block.header.number)
val newTd = blockParentTd + block.header.difficulty
blockchain.save(block.header.hash, newTd)
blockHashToDelete.foreach(blockchain.removeBlock)
processBlocks(otherBlocks, newTd, newBlocks :+ NewBlock(block, newTd))

case Left(error) =>
newBlocks -> Some(error)
}
}

private def scheduleResume() = {
headersQueue = Nil
scheduler.scheduleOnce(checkForNewBlockInterval, context.self, ResumeRegularSync)
}

private def resumeWithDifferentPeer(currentPeer: ActorRef) = {
self ! BlacklistPeer(currentPeer, "because of error in response")
private def resumeWithDifferentPeer(currentPeer: ActorRef, reason: String = "error in response") = {
self ! BlacklistPeer(currentPeer, "because of " + reason)
headersQueue = Nil
context.self ! ResumeRegularSync
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.util.ByteString
import io.iohk.ethereum.db.storage._
import io.iohk.ethereum.domain.{Block, BlockHeader, Blockchain}
import io.iohk.ethereum.domain._
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.network.PeerActor.{Status => PeerStatus}
import io.iohk.ethereum.network.p2p.messages.PV62.BlockBody
import io.iohk.ethereum.validators.BlockValidator.BlockError
import io.iohk.ethereum.network.{PeerActor, PeerManagerActor}
import io.iohk.ethereum.utils.Config
import io.iohk.ethereum.validators.Validators

class SyncController(
val peerManager: ActorRef,
val appStateStorage: AppStateStorage,
val blockchain: Blockchain,
val mptNodeStorage: MptNodeStorage,
val blockchainStorages: BlockchainStorages,
val fastSyncStateStorage: FastSyncStateStorage,
val blockValidator: (BlockHeader, BlockBody) => Either[BlockError, Block],
val ledger: Ledger,
val validators: Validators,
externalSchedulerOpt: Option[Scheduler] = None)
extends Actor
with ActorLogging
Expand Down Expand Up @@ -102,10 +104,11 @@ object SyncController {
def props(peerManager: ActorRef,
appStateStorage: AppStateStorage,
blockchain: Blockchain,
mptNodeStorage: MptNodeStorage,
blockchainStorages: BlockchainStorages,
syncStateStorage: FastSyncStateStorage,
blockValidator: (BlockHeader, BlockBody) => Either[BlockError, Block]):
Props = Props(new SyncController(peerManager, appStateStorage, blockchain, mptNodeStorage, syncStateStorage, blockValidator))
ledger: Ledger,
validators: Validators):
Props = Props(new SyncController(peerManager, appStateStorage, blockchain, blockchainStorages, syncStateStorage, ledger, validators))

case class BlockHeadersToResolve(peer: ActorRef, headers: Seq[BlockHeader])

Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/io/iohk/ethereum/domain/BlockHeader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ case class BlockHeader(
* @return - hash that can be used to get block bodies / receipts
*/
lazy val hash: ByteString = ByteString(kec256(rlpEncode[BlockHeader](this)))

lazy val hashAsHexString: String = Hex.toHexString(hash.toArray)
}

object BlockHeader {
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/io/iohk/ethereum/domain/Blockchain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ trait BlockchainStorages {
val receiptStorage: ReceiptStorage
val evmCodeStorage: EvmCodeStorage
val mptNodeStorage: MptNodeStorage
val nodeStorage: NodeStorage
val totalDifficultyStorage: TotalDifficultyStorage
}

Expand Down

0 comments on commit ced6cce

Please sign in to comment.