Skip to content

Commit

Permalink
PM-3133: Execute batches of blocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed May 5, 2021
1 parent ad5a754 commit 196d8e1
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 17 deletions.
Expand Up @@ -14,6 +14,9 @@ trait ApplicationService[F[_], A <: Agreement] {
// TODO (PM-3132, PM-3133): Block validation.
def validateBlock(block: A#Block): F[Boolean]

// TODO (PM-3108, PM-3107, PM-3137, PM-3110): Tell the application to execute a block.
def executeBlock(block: A#Block): F[Unit]

// TODO (PM-3135): Tell the application to sync any state of the block, i.e. the Ledger.
// The `sources` are peers who most probably have this state.
// The full `block` is given because it may not be persisted yet.
Expand Down
Expand Up @@ -450,7 +450,7 @@ class ConsensusService[F[
}

process.handleErrorWith { case NonFatal(ex) =>
tracers.error(ex)
tracers.error(s"Error processing effect $effect", ex)
}
}

Expand Down
Expand Up @@ -7,13 +7,21 @@ import io.iohk.metronome.hotstuff.service.storage.{
BlockStorage,
ViewStateStorage
}
import io.iohk.metronome.hotstuff.consensus.basic.{Agreement, Effect}
import io.iohk.metronome.hotstuff.consensus.basic.{
Agreement,
Effect,
QuorumCertificate
}
import io.iohk.metronome.hotstuff.service.tracing.ConsensusTracers
import io.iohk.metronome.storage.KVStoreRunner
import monix.catnap.ConcurrentQueue

/** The `BlockExecutor` receives ranges of committed blocks from the
* `ConsensusService` and carries out their effects, marking the last
* executed block in the `ViewStateStorage`. It delegates other state
* updates to the `ApplicationService`.
* executed block in the `ViewStateStorage`, so that we can resume
* from where we left off last time after a restart.
*
* It delegates other state updates to the `ApplicationService`.
*
* The `BlockExecutor` is prepared for gaps to appear in the ranges,
* which happens if the node is out of sync with the federation and
Expand All @@ -24,23 +32,124 @@ class BlockExecutor[F[_]: Sync, N, A <: Agreement](
blockStorage: BlockStorage[N, A],
viewStateStorage: ViewStateStorage[N, A],
executionQueue: ConcurrentQueue[F, Effect.ExecuteBlocks[A]]
) {
)(implicit tracers: ConsensusTracers[F, A], storeRunner: KVStoreRunner[F, N]) {
def enqueue(effect: Effect.ExecuteBlocks[A]): F[Unit] =
executionQueue.offer(effect)

/** Execute blocks in order, updating pesistent storage along the way. */
private def executeBlocks: F[Unit] = {
executionQueue.poll.flatMap {
case Effect.ExecuteBlocks(lastExecutedBlockHash, commitQC) =>
// Retrieve the blocks from the storage from the last executed
// to the one in the Quorum Certificate and tell the application
// to execute them one by one. Update the persistent view state
// after reach execution to remember which blocks we have truly
// done.
def loop(lastExecutedBlockHash: A#Hash): F[Unit] =
executionQueue.poll
.flatMap {
case Effect.ExecuteBlocks(lastCommittedBlockHash, commitQC) =>
// Retrieve the blocks from the storage from the last executed
// to the one in the Quorum Certificate and tell the application
// to execute them one by one. Update the persistent view state
// after reach execution to remember which blocks we have truly
// done.
getBlockPath(
lastExecutedBlockHash,
lastCommittedBlockHash,
commitQC
).flatMap {
case _ :: newBlockHashes =>
tryExecuteBatch(newBlockHashes)

case Nil =>
none[A#Hash].pure[F]
}.map {
_ getOrElse lastExecutedBlockHash
}
}
.flatMap(loop)

storeRunner
.runReadOnly {
viewStateStorage.getBundle.map(_.lastExecutedBlockHash)
}
.flatMap(loop)
}

/** Get the more complete path. We may not have the last executed block any more.
*
* The first hash in the return value is a block that has already been executed.
*/
private def getBlockPath(
lastExecutedBlockHash: A#Hash,
lastCommittedBlockHash: A#Hash,
commitQC: QuorumCertificate[A]
): F[List[A#Hash]] = {
def readPath(ancestorBlockHash: A#Hash) =
storeRunner
.runReadOnly {
blockStorage.getPathFromAncestor(
lastExecutedBlockHash,
commitQC.blockHash
)
}

readPath(lastExecutedBlockHash)
.flatMap {
case Nil =>
readPath(lastCommittedBlockHash)
case path =>
path.pure[F]
}
}

// TODO (PM-3133): Execute block
???
} >> executeBlocks
/** Try to execute a batch of newly committed blocks.
*
* Return the last successfully executed hash, if any.
*/
private def tryExecuteBatch(
newBlockHashes: List[A#Hash]
): F[Option[A#Hash]] = {
def loop(
newBlockHashes: List[A#Hash],
lastExecutedBlockHash: Option[A#Hash]
): F[Option[A#Hash]] =
newBlockHashes match {
case Nil =>
lastExecutedBlockHash.pure[F]

case blockHash :: newBlockHashes =>
executeBlock(blockHash).attempt.flatMap {
case Left(ex) =>
// If a block fails, return what we managed to do so far,
// so we can re-attempt it next time if the block is still
// available in the storage.
tracers
.error(s"Error executiong block $blockHash", ex)
.as(lastExecutedBlockHash)

case Right(()) =>
loop(newBlockHashes, blockHash.some)
}
}

loop(newBlockHashes, none)
}

/** Execute the next block in line and update the view state.
* Be prepared that it may not exist, if execution took so long that
* the `SyncService` skipped ahead to the latest Commit Q.C.
*/
private def executeBlock(blockHash: A#Hash): F[Unit] = {
storeRunner.runReadOnly {
blockStorage.get(blockHash)
} flatMap {
case None =>
tracers.executionSkipped(blockHash)

case Some(block) =>
for {
_ <- appService.executeBlock(block)
_ <- storeRunner.runReadWrite {
viewStateStorage.setLastExecutedBlockHash(blockHash)
}
_ <- tracers.blockExecuted(blockHash)
} yield ()
}
}
}

Expand All @@ -49,6 +158,9 @@ object BlockExecutor {
appService: ApplicationService[F, A],
blockStorage: BlockStorage[N, A],
viewStateStorage: ViewStateStorage[N, A]
)(implicit
tracers: ConsensusTracers[F, A],
storeRunner: KVStoreRunner[F, N]
): Resource[F, BlockExecutor[F, N, A]] = for {
executionQueue <- Resource.liftF {
ConcurrentQueue[F].unbounded[Effect.ExecuteBlocks[A]](None)
Expand Down
Expand Up @@ -55,8 +55,19 @@ object ConsensusEvent {
error: ProtocolError[A]
) extends ConsensusEvent[A]

/** A block has been removed from storage by the time it was to be executed. */
case class ExecutionSkipped[A <: Agreement](
blockHash: A#Hash
) extends ConsensusEvent[A]

/** A block has been executed. */
case class BlockExecuted[A <: Agreement](
blockHash: A#Hash
) extends ConsensusEvent[A]

/** An unexpected error in one of the background tasks. */
case class Error(
message: String,
error: Throwable
) extends ConsensusEvent[Nothing]
}
Expand Up @@ -22,7 +22,9 @@ case class ConsensusTracers[F[_], A <: Agreement](
fromFuture: Tracer[F, Event.MessageReceived[A]],
stashed: Tracer[F, ProtocolError.TooEarly[A]],
rejected: Tracer[F, ProtocolError[A]],
error: Tracer[F, Throwable]
executionSkipped: Tracer[F, A#Hash],
blockExecuted: Tracer[F, A#Hash],
error: Tracer[F, (String, Throwable)]
)

object ConsensusTracers {
Expand All @@ -43,6 +45,8 @@ object ConsensusTracers {
fromFuture = tracer.contramap[Event.MessageReceived[A]](FromFuture(_)),
stashed = tracer.contramap[ProtocolError.TooEarly[A]](Stashed(_)),
rejected = tracer.contramap[ProtocolError[A]](Rejected(_)),
error = tracer.contramap[Throwable](Error(_))
executionSkipped = tracer.contramap[A#Hash](ExecutionSkipped(_)),
blockExecuted = tracer.contramap[A#Hash](BlockExecuted(_)),
error = tracer.contramap[(String, Throwable)]((Error.apply _).tupled)
)
}

0 comments on commit 196d8e1

Please sign in to comment.