From 2d3a433068a0ba50c033b3b387ca06d78899abbf Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Fri, 18 Jun 2021 09:59:51 +0100 Subject: [PATCH] PM-3133: Move syncState to the BlockExecutor so we can take out a Semaphore. --- .../hotstuff/service/ApplicationService.scala | 4 +- .../hotstuff/service/ConsensusService.scala | 7 +- .../hotstuff/service/HotStuffService.scala | 9 ++ .../hotstuff/service/SyncService.scala | 47 ++----- .../service/execution/BlockExecutor.scala | 132 +++++++++++++----- .../execution/BlockExecutorProps.scala | 2 +- 6 files changed, 116 insertions(+), 85 deletions(-) diff --git a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/ApplicationService.scala b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/ApplicationService.scala index 491bc8ee..deeda9d3 100644 --- a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/ApplicationService.scala +++ b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/ApplicationService.scala @@ -31,5 +31,7 @@ trait ApplicationService[F[_], A <: Agreement] { // TODO (PM-3135): Tell the application to sync any state of the block, i.e. the Ledger. // The `sources` are peers who most probably have this state. // The full `block` is given because it may not be persisted yet. - def syncState(sources: NonEmptyVector[A#PKey], block: A#Block): F[Unit] + // Return `true` if the block storage can be pruned after this operation from earlier blocks, + // which may not be the case if the application syncs by downloading all the blocks. + def syncState(sources: NonEmptyVector[A#PKey], block: A#Block): F[Boolean] } diff --git a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/ConsensusService.scala b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/ConsensusService.scala index dd4fd860..cd92ed66 100644 --- a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/ConsensusService.scala +++ b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/ConsensusService.scala @@ -535,6 +535,7 @@ object ConsensusService { publicKey: A#PKey, network: Network[F, A#PKey, Message[A]], appService: ApplicationService[F, A], + blockExecutor: BlockExecutor[F, N, A], blockStorage: BlockStorage[N, A], viewStateStorage: ViewStateStorage[N, A], syncPipe: SyncPipe[F, A]#Left, @@ -547,12 +548,6 @@ object ConsensusService { for { fiberSet <- FiberSet[F] - blockExecutor <- BlockExecutor[F, N, A]( - appService, - blockStorage, - viewStateStorage - ) - service <- Resource.liftF( build[F, N, A]( publicKey, diff --git a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/HotStuffService.scala b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/HotStuffService.scala index df7c2bf0..1aa875e2 100644 --- a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/HotStuffService.scala +++ b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/HotStuffService.scala @@ -9,6 +9,7 @@ import io.iohk.metronome.hotstuff.consensus.basic.{ Block, Signing } +import io.iohk.metronome.hotstuff.service.execution.BlockExecutor import io.iohk.metronome.hotstuff.service.messages.{ HotStuffMessage, SyncMessage @@ -60,10 +61,17 @@ object HotStuffService { syncPipe <- Resource.liftF { SyncPipe[F, A] } + blockExecutor <- BlockExecutor[F, N, A]( + appService, + blockStorage, + viewStateStorage + ) + consensusService <- ConsensusService( initState.publicKey, consensusNetwork, appService, + blockExecutor, blockStorage, viewStateStorage, syncPipe.left, @@ -75,6 +83,7 @@ object HotStuffService { initState.federation, syncNetwork, appService, + blockExecutor, blockStorage, viewStateStorage, syncPipe.right, diff --git a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/SyncService.scala b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/SyncService.scala index d450baef..9be8e304 100644 --- a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/SyncService.scala +++ b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/SyncService.scala @@ -16,6 +16,7 @@ import io.iohk.metronome.hotstuff.consensus.basic.{ Block, Signing } +import io.iohk.metronome.hotstuff.service.execution.BlockExecutor import io.iohk.metronome.hotstuff.service.messages.SyncMessage import io.iohk.metronome.hotstuff.service.pipes.SyncPipe import io.iohk.metronome.hotstuff.service.storage.{ @@ -28,7 +29,7 @@ import io.iohk.metronome.hotstuff.service.sync.{ } import io.iohk.metronome.hotstuff.service.tracing.SyncTracers import io.iohk.metronome.networking.{ConnectionHandler, Network} -import io.iohk.metronome.storage.{KVStoreRunner, KVStore} +import io.iohk.metronome.storage.KVStoreRunner import scala.util.control.NonFatal import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -47,6 +48,7 @@ class SyncService[F[_]: Concurrent: ContextShift, N, A <: Agreement: Block]( publicKey: A#PKey, network: Network[F, A#PKey, SyncMessage[A]], appService: ApplicationService[F, A], + blockExecutor: BlockExecutor[F, N, A], blockStorage: BlockStorage[N, A], viewStateStorage: ViewStateStorage[N, A], syncPipe: SyncPipe[F, A]#Right, @@ -298,48 +300,13 @@ class SyncService[F[_]: Concurrent: ContextShift, N, A <: Agreement: Block]( ) .rethrow - // Sync any application specific state, e.g. a ledger. - // Do this before we prune the existing blocks and set the new root. - _ <- appService.syncState(federationStatus.sources, block) - - // Prune the block store from earlier blocks that are no longer traversable. - _ <- fastForwardStorage(status, block) + // Sync any application specific state, e.g. a ledger, + // then potentially prune old blocks from the storage. + _ <- blockExecutor.syncState(federationStatus.sources, block) // Tell the ConsensusService about the new Status. _ <- syncPipe.send(SyncPipe.StatusResponse(status)) } yield status.viewNumber - - /** Replace the state we have persisted with what we synced with the federation. - * - * Prunes old blocks, the Commit Q.C. will be the new root. - */ - private def fastForwardStorage(status: Status[A], block: A#Block): F[Unit] = { - val blockHash = Block[A].blockHash(block) - assert(blockHash == status.commitQC.blockHash) - - val query: KVStore[N, Unit] = - for { - viewState <- viewStateStorage.getBundle.lift - // Insert the new block. - _ <- blockStorage.put(block) - - // Prune old data, but keep the new block. - ds <- blockStorage - .getDescendants( - viewState.rootBlockHash, - skip = Set(blockHash) - ) - .lift - _ <- ds.traverse(blockStorage.deleteUnsafe(_)) - - // Considering the committed block as executed, we have its state already. - _ <- viewStateStorage.setLastExecutedBlockHash(blockHash) - _ <- viewStateStorage.setRootBlockHash(blockHash) - // The rest of the fields will be set by the ConsensusService. - } yield () - - storeRunner.runReadWrite(query) - } } object SyncService { @@ -357,6 +324,7 @@ object SyncService { federation: Federation[A#PKey], network: Network[F, A#PKey, SyncMessage[A]], appService: ApplicationService[F, A], + blockExecutor: BlockExecutor[F, N, A], blockStorage: BlockStorage[N, A], viewStateStorage: ViewStateStorage[N, A], syncPipe: SyncPipe[F, A]#Right, @@ -376,6 +344,7 @@ object SyncService { publicKey, network, appService, + blockExecutor, blockStorage, viewStateStorage, syncPipe, diff --git a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/execution/BlockExecutor.scala b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/execution/BlockExecutor.scala index 79c62233..31581cee 100644 --- a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/execution/BlockExecutor.scala +++ b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/execution/BlockExecutor.scala @@ -1,8 +1,9 @@ package io.iohk.metronome.hotstuff.service.execution import cats.implicits._ -import cats.data.NonEmptyList +import cats.data.{NonEmptyList, NonEmptyVector} import cats.effect.{Sync, Concurrent, ContextShift, Resource} +import cats.effect.concurrent.Semaphore import io.iohk.metronome.hotstuff.service.ApplicationService import io.iohk.metronome.hotstuff.service.storage.{ BlockStorage, @@ -10,6 +11,7 @@ import io.iohk.metronome.hotstuff.service.storage.{ } import io.iohk.metronome.hotstuff.consensus.basic.{ Agreement, + Block, Effect, QuorumCertificate } @@ -28,17 +30,37 @@ import monix.catnap.ConcurrentQueue * which happens if the node is out of sync with the federation and * needs to jump ahead. */ -class BlockExecutor[F[_]: Sync, N, A <: Agreement]( +class BlockExecutor[F[_]: Sync, N, A <: Agreement: Block]( appService: ApplicationService[F, A], blockStorage: BlockStorage[N, A], viewStateStorage: ViewStateStorage[N, A], - executionQueue: ConcurrentQueue[F, Effect.ExecuteBlocks[A]] + executionQueue: ConcurrentQueue[F, Effect.ExecuteBlocks[A]], + executionSemaphore: Semaphore[F] )(implicit tracers: ConsensusTracers[F, A], storeRunner: KVStoreRunner[F, N]) { /** Add a newly committed batch of blocks to the execution queue. */ def enqueue(effect: Effect.ExecuteBlocks[A]): F[Unit] = executionQueue.offer(effect) + /** Fast forward state to a given block. + * + * This operation is delegated to the `BlockExecutor` so that it can make sure + * that it's not executing other blocks at the same time. + */ + def syncState( + sources: NonEmptyVector[A#PKey], + block: A#Block + ): F[Unit] = + executionSemaphore.withPermit { + for { + // Sync any application specific state, e.g. a ledger. + // Do this before we prune the existing blocks and set the new root. + canPrune <- appService.syncState(sources, block) + // Prune the block store from earlier blocks that are no longer traversable. + _ <- fastForwardStorage(block, canPrune) + } yield () + } + /** Execute blocks in order, updating pesistent storage along the way. */ private def executeBlocks: F[Unit] = { executionQueue.poll @@ -48,35 +70,38 @@ class BlockExecutor[F[_]: Sync, N, A <: Agreement]( // to execute them one by one. Update the persistent view state // after reach execution to remember which blocks we have truly // done. - for { - lastExecutedBlockHash <- getLastExecutedBlockHash - blockHashes <- getBlockPath( - lastExecutedBlockHash, - lastCommittedBlockHash, - commitQC - ) - _ <- blockHashes match { - case _ :: newBlockHashes => - tryExecuteBatch(newBlockHashes, commitQC, lastExecutedBlockHash) - case Nil => - ().pure[F] - } - } yield () + // Protect the whole thing with a semaphore from `syncState` being + // carried out at the same time. + executionSemaphore.withPermit { + for { + lastExecutedBlockHash <- getLastExecutedBlockHash + blockHashes <- getBlockPath( + lastExecutedBlockHash, + lastCommittedBlockHash, + commitQC + ) + _ <- blockHashes match { + case _ :: newBlockHashes => + tryExecuteBatch(newBlockHashes, commitQC, lastExecutedBlockHash) + case Nil => + ().pure[F] + } + } yield () + } } >> executeBlocks } - /** Read whatever was the last executed block that we persisted, - * either here or by the fast-forward synchronizer. + /** Read whatever was the last executed block that we peristed, + * either by doing individual execution or state sync. */ private def getLastExecutedBlockHash: F[A#Hash] = storeRunner.runReadOnly { viewStateStorage.getLastExecutedBlockHash } - /** Update the last executed block hash, unless the jump synchronizer did so - * while we were executing blocks. NOTE: Would be good to stop executions - * if that's happening, currently we expect that some blocks will just be - * missing and the next batch will jump ahead. This is to avoid a race condition. + /** Update the last executed block hash, unless something else updated it + * while we were executing blocks. This shouldn't happen if we used the + * executor to carry out the state sync within the semaphore. */ private def setLastExecutedBlockHash( blockHash: A#Hash, @@ -125,17 +150,9 @@ class BlockExecutor[F[_]: Sync, N, A <: Agreement]( * * In general we cannot expect to be able to cancel an ongoing execution, * it may be in the middle of carrying out some real-world effects that - * don't support cancellation. To protect against race conditions between - * executing blocks here and the fast-forward synchroniser making changes - * to state, the `ApplicationService` implementation can use an exclusive - * lock in `executeBlock` and `syncState`. - * - * The `BlockExecutor` may not detect perfectly that it should stop executing - * a batch because the `lastExecutedBlockHash` changed, it might try to execute - * one extra block. If that is unacceptable, the `ApplicationService` can - * track in memory what the last executed/synced block was and check that - * it was indeed the parent of the next block it's asked to execute, raise - * an error if it's not, to stop the batch. + * don't support cancellation. We use the semaphore to protect against + * race conditions between executing blocks here and the fast-forward + * synchroniser making changes to state. */ private def tryExecuteBatch( newBlockHashes: List[A#Hash], @@ -179,8 +196,6 @@ class BlockExecutor[F[_]: Sync, N, A <: Agreement]( } /** Execute the next block in line and update the view state. - * Be prepared that it may not exist, if execution took so long that - * the `SyncService` skipped ahead to the latest Commit Q.C. * * The last executed block hash is only updated if the application * indicates that it has persisted the results, and if no other @@ -213,6 +228,11 @@ class BlockExecutor[F[_]: Sync, N, A <: Agreement]( // Keep the last for the next compare and set below. lastExecutedBlockHash.some.pure[F] } else { + // Check that nothing else changed the view state, + // which should be true as long as we use the semaphore. + // Otherwise it would be up to the `ApplicationService` to + // take care of isolation, and check that the block being + // executed is the one we expected. setLastExecutedBlockHash(blockHash, lastExecutedBlockHash).map { case true => blockHash.some case false => none @@ -221,10 +241,44 @@ class BlockExecutor[F[_]: Sync, N, A <: Agreement]( } yield maybeLastExecutedBlockHash } } + + /** Replace the state we have persisted with what we synced with the federation. + * + * Prunes old blocks, the Commit Q.C. will be the new root. + */ + private def fastForwardStorage( + block: A#Block, + canPrune: Boolean + ): F[Unit] = { + val blockHash = Block[A].blockHash(block) + + val prune = for { + viewState <- viewStateStorage.getBundle.lift + // Prune old data, but keep the new block. + ds <- blockStorage + .getDescendants( + viewState.rootBlockHash, + skip = Set(blockHash) + ) + .lift + _ <- ds.traverse(blockStorage.deleteUnsafe(_)) + _ <- viewStateStorage.setRootBlockHash(blockHash) + } yield () + + val query = for { + // Insert the new block. + _ <- blockStorage.put(block) + _ <- prune.whenA(canPrune) + // Considering the committed block as executed, we have its state already. + _ <- viewStateStorage.setLastExecutedBlockHash(blockHash) + } yield () + + storeRunner.runReadWrite(query) + } } object BlockExecutor { - def apply[F[_]: Concurrent: ContextShift, N, A <: Agreement]( + def apply[F[_]: Concurrent: ContextShift, N, A <: Agreement: Block]( appService: ApplicationService[F, A], blockStorage: BlockStorage[N, A], viewStateStorage: ViewStateStorage[N, A] @@ -235,11 +289,13 @@ object BlockExecutor { executionQueue <- Resource.liftF { ConcurrentQueue[F].unbounded[Effect.ExecuteBlocks[A]](None) } + executionSemaphore <- Resource.liftF(Semaphore[F](1)) executor = new BlockExecutor[F, N, A]( appService, blockStorage, viewStateStorage, - executionQueue + executionQueue, + executionSemaphore ) _ <- Concurrent[F].background { executor.executeBlocks diff --git a/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/execution/BlockExecutorProps.scala b/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/execution/BlockExecutorProps.scala index 73d355b7..168bbebf 100644 --- a/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/execution/BlockExecutorProps.scala +++ b/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/execution/BlockExecutorProps.scala @@ -81,7 +81,7 @@ object BlockExecutorProps extends Properties("BlockExecutor") { def syncState( sources: NonEmptyVector[Int], block: TestBlock - ): Task[Unit] = ??? + ): Task[Boolean] = ??? def executeBlock( block: TestBlock,