Skip to content

Commit

Permalink
PM-3134: Testing the BlockSynchronizer.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed May 11, 2021
1 parent 5385d9a commit 8d0172e
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 34 deletions.
4 changes: 2 additions & 2 deletions metronome/core/src/io/iohk/metronome/core/Pipe.scala
Expand Up @@ -6,8 +6,8 @@ import monix.tail.Iterant
import monix.catnap.ConcurrentQueue

/** A `Pipe` is a connection between two components where
* messages of type `L` are going from left to right, and
* message of type `R` are going from right to left.
* messages of type `L` are going from left to right and
* messages of type `R` are going from right to left.
*/
trait Pipe[F[_], L, R] {
type Left = Pipe.Side[F, L, R]
Expand Down
Expand Up @@ -165,7 +165,7 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
BlockSyncPipe.Request(sender, prepare)
)

/** Process the synchronization. result queue. */
/** Process the synchronization result queue. */
private def processBlockSyncPipe: F[Unit] =
blockSyncPipe.receive
.mapEval[Unit] { case BlockSyncPipe.Response(request, isValid) =>
Expand Down
Expand Up @@ -3,7 +3,6 @@ package io.iohk.metronome.hotstuff.service.storage
import cats.implicits._
import io.iohk.metronome.storage.{KVCollection, KVStoreState}
import io.iohk.metronome.hotstuff.consensus.basic.{Agreement, Block => BlockOps}
import java.util.UUID
import org.scalacheck._
import org.scalacheck.Arbitrary.arbitrary
import org.scalacheck.Prop.{all, forAll, propBoolean}
Expand All @@ -17,22 +16,22 @@ object BlockStorageProps extends Properties("BlockStorage") {
def isGenesis = parentId.isEmpty
}

object TestAggreement extends Agreement {
object TestAgreement extends Agreement {
type Block = TestBlock
type Hash = String
type PSig = Nothing
type GSig = Nothing
type PKey = Nothing
type GSig = Unit
type PKey = Int
type SKey = Nothing

implicit val block = new BlockOps[TestAggreement] {
implicit val block = new BlockOps[TestAgreement] {
override def blockHash(b: TestBlock) = b.id
override def parentBlockHash(b: TestBlock) = b.parentId
override def isValid(b: Block) = true
}
}
type TestAggreement = TestAggreement.type
type Hash = TestAggreement.Hash
type TestAgreement = TestAgreement.type
type Hash = TestAgreement.Hash

implicit def `Codec[Set[T]]`[T: Codec] =
implicitly[Codec[List[T]]].xmap[Set[T]](_.toSet, _.toList)
Expand All @@ -45,7 +44,7 @@ object BlockStorageProps extends Properties("BlockStorage") {
}

object TestBlockStorage
extends BlockStorage[Namespace, TestAggreement](
extends BlockStorage[Namespace, TestAgreement](
new KVCollection[Namespace, Hash, TestBlock](Namespace.Blocks),
new KVCollection[Namespace, Hash, Hash](Namespace.BlockToParent),
new KVCollection[Namespace, Hash, Set[Hash]](Namespace.BlockToChildren)
Expand Down Expand Up @@ -96,7 +95,7 @@ object BlockStorageProps extends Properties("BlockStorage") {
}

def genBlockId: Gen[Hash] =
Gen.delay(UUID.randomUUID().toString)
Gen.uuid.map(_.toString)

/** Generate a block with a given parent, using the next available ID. */
def genBlock(parentId: Hash): Gen[TestBlock] =
Expand Down
Expand Up @@ -22,21 +22,21 @@ object ViewStateStorageProps extends Properties("ViewStateStorage") {
}

object ViewStateStorageCommands extends Commands {
object TestAggreement extends Agreement {
object TestAgreement extends Agreement {
type Block = Nothing
type Hash = String
type PSig = Unit
type GSig = List[String]
type PKey = Nothing
type SKey = Nothing
}
type TestAggreement = TestAggreement.type
type TestAgreement = TestAgreement.type

type Namespace = String

object TestKVStoreState extends KVStoreState[Namespace]

type TestViewStateStorage = ViewStateStorage[Namespace, TestAggreement]
type TestViewStateStorage = ViewStateStorage[Namespace, TestAgreement]

class StorageWrapper(
viewStateStorage: TestViewStateStorage,
Expand All @@ -53,16 +53,17 @@ object ViewStateStorageCommands extends Commands {
def read[A](
f: TestViewStateStorage => KVStoreRead[Namespace, A]
): A = {
val b = scodec.bits.ByteVector.empty
TestKVStoreState.compile(f(viewStateStorage)).run(store)
}
}

type State = ViewStateStorage.Bundle[TestAggreement]
type State = ViewStateStorage.Bundle[TestAgreement]
type Sut = StorageWrapper

val genesisState = ViewStateStorage.Bundle
.fromGenesisQC[TestAggreement] {
QuorumCertificate[TestAggreement](
.fromGenesisQC[TestAgreement] {
QuorumCertificate[TestAgreement](
Phase.Prepare,
ViewNumber(1),
"",
Expand All @@ -89,7 +90,7 @@ object ViewStateStorageCommands extends Commands {

override def newSut(state: State): Sut = {
val init = TestKVStoreState.compile(
ViewStateStorage[Namespace, TestAggreement]("test-namespace", state)
ViewStateStorage[Namespace, TestAgreement]("test-namespace", state)
)
val (store, storage) = init.run(Map.empty).value
new StorageWrapper(storage, store)
Expand All @@ -116,9 +117,9 @@ object ViewStateStorageCommands extends Commands {
def genSetQuorumCertificate(state: State) =
for {
p <- Gen.oneOf(Phase.Prepare, Phase.PreCommit, Phase.Commit)
h <- arbitrary[TestAggreement.Hash]
s <- arbitrary[TestAggreement.GSig]
qc = QuorumCertificate[TestAggreement](
h <- arbitrary[TestAgreement.Hash]
s <- arbitrary[TestAgreement.GSig]
qc = QuorumCertificate[TestAgreement](
p,
state.viewNumber,
h,
Expand Down Expand Up @@ -147,7 +148,7 @@ object ViewStateStorageCommands extends Commands {
override def postCondition(state: State, success: Boolean): Prop = success
}

case class SetQuorumCertificateCommand(qc: QuorumCertificate[TestAggreement])
case class SetQuorumCertificateCommand(qc: QuorumCertificate[TestAgreement])
extends UnitCommand {
override def run(sut: Sut): Result =
sut.write(_.setQuorumCertificate(qc))
Expand All @@ -165,7 +166,7 @@ object ViewStateStorageCommands extends Commands {
override def postCondition(state: State, success: Boolean): Prop = success
}

case class SetLastExecutedBlockHashCommand(blockHash: TestAggreement.Hash)
case class SetLastExecutedBlockHashCommand(blockHash: TestAgreement.Hash)
extends UnitCommand {
override def run(sut: Sut): Result =
sut.write(_.setLastExecutedBlockHash(blockHash))
Expand All @@ -182,7 +183,7 @@ object ViewStateStorageCommands extends Commands {
}

case object GetBundleCommand extends Command {
type Result = ViewStateStorage.Bundle[TestAggreement]
type Result = ViewStateStorage.Bundle[TestAgreement]

override def run(sut: Sut): Result = sut.read(_.getBundle)
override def nextState(state: State): State = state
Expand Down
@@ -0,0 +1,189 @@
package io.iohk.metronome.hotstuff.service.sync

import cats.effect.concurrent.{Ref, Semaphore}
import io.iohk.metronome.crypto.GroupSignature
import io.iohk.metronome.hotstuff.consensus.ViewNumber
import io.iohk.metronome.hotstuff.consensus.basic.{QuorumCertificate, Phase}
import io.iohk.metronome.hotstuff.service.storage.BlockStorageProps
import io.iohk.metronome.storage.InMemoryKVStore
import org.scalacheck.{Properties, Arbitrary, Gen}, Arbitrary.arbitrary
import org.scalacheck.Prop.{all, forAll, propBoolean}
import monix.eval.Task
import monix.execution.schedulers.TestScheduler
import scala.util.Random
import scala.concurrent.duration._

object BlockSynchronizerProps extends Properties("BlockSynchronizer") {
import BlockStorageProps.{
TestAgreement,
TestBlock,
TestBlockStorage,
TestKVStore,
Namespace,
genNonEmptyBlockTree,
genBlockTree
}

// Insert the prefix three into "persistent" storage,
// then start multiple concurrent download processes
// from random federation members pointing at various
// nodes in the subtree.
//
// In the end all synced subtree elements should be
// persisted and the ephemeral storage left empty.
// At no point during the process should the persistent
// storage contain a forest.
case class TestFixture(
ancestorTree: List[TestBlock],
descendantTree: List[TestBlock],
requests: List[(TestAgreement.PKey, QuorumCertificate[TestAgreement])]
) {
val persistentRef = Ref.unsafe[Task, TestKVStore.Store] {
TestKVStore.build(ancestorTree)
}
val ephemeralRef = Ref.unsafe[Task, TestKVStore.Store](Map.empty)

val persistentStore = InMemoryKVStore[Task, Namespace](persistentRef)
val inMemoryStore = InMemoryKVStore[Task, Namespace](ephemeralRef)

val blockMap = (ancestorTree ++ descendantTree).map { block =>
block.id -> block
}.toMap

def getBlock(
from: TestAgreement.PKey,
blockHash: TestAgreement.Hash
): Task[Option[TestAgreement.Block]] = {
val timeout = 5000
val delay = Random.nextDouble() * 1000
val isLost = Random.nextDouble() < 0.1
val isCorrupt = Random.nextDouble() < 0.1

val block = if (isCorrupt) {
blockMap(blockHash).copy(id = TestFixture.CorruptId)
} else {
blockMap(blockHash)
}

if (isLost) {
Task.pure(None).delayResult(timeout.millis)
} else {
Task.pure(Some(block)).delayResult(delay.millis)
}
}

implicit val storeRunner = persistentStore

val synchronizer = new BlockSynchronizer[Task, Namespace, TestAgreement](
blockStorage = TestBlockStorage,
getBlock = getBlock,
inMemoryStore = inMemoryStore,
semaphore = makeSemapshore()
)

private def makeSemapshore() = {
import monix.execution.Scheduler.Implicits.global
Semaphore[Task](1).runSyncUnsafe()
}
}
object TestFixture {
val CorruptId = "corrupt"

implicit val arb: Arbitrary[TestFixture] = Arbitrary {
for {
ancestorTree <- genNonEmptyBlockTree
leaf = ancestorTree.last
descendantTree <- genBlockTree(parentId = leaf.id)

federationSize <- Gen.choose(1, 10)
federationKeys = Range(0, federationSize).toVector

existingPrepares <- Gen.someOf(ancestorTree)
newPrepares <- Gen.someOf(descendantTree)

prepares = (existingPrepares ++ newPrepares).toList
proposerKeys <- Gen.listOfN(prepares.size, Gen.oneOf(federationKeys))

requests = (prepares zip proposerKeys).zipWithIndex.map {
case ((parent, publicKey), idx) =>
publicKey -> QuorumCertificate[TestAgreement](
phase = Phase.Prepare,
viewNumber = ViewNumber(100L + idx),
blockHash = parent.id,
signature = GroupSignature(())
)
}

} yield TestFixture(ancestorTree, descendantTree, requests)
}
}

property("persists") = forAll { (fixture: TestFixture) =>
implicit val scheduler = TestScheduler()

val test = for {
fibers <- Task.parTraverse(fixture.requests) { case (publicKey, qc) =>
fixture.synchronizer.sync(publicKey, qc).start
}
_ <- Task.traverse(fibers)(_.join)
persistent <- fixture.persistentRef.get
ephemeral <- fixture.ephemeralRef.get
} yield {
all(
"ephermeral empty" |: ephemeral.isEmpty,
"persistent contains all" |: fixture.requests.forall { case (_, qc) =>
persistent(Namespace.Blocks).contains(qc.blockHash)
},
"all uncorrupted" |: persistent(Namespace.Blocks).forall {
case (blockHash, block: TestBlock) =>
blockHash == block.id && blockHash != TestFixture.CorruptId
}
)
}

// Schedule the execution, using a Future so we can check the value.
val testFuture = test.runToFuture

// Simulate a long time, which should be enough for all downloads to finish.
scheduler.tick(1.day)

testFuture.value.get.get
}

property("no forest") = forAll(
for {
fixture <- arbitrary[TestFixture]
duration <- arbitrary[Int].map(_.seconds)
} yield (fixture, duration)
) { case (fixture: TestFixture, duration: FiniteDuration) =>
implicit val scheduler = TestScheduler()

// Schedule the downloads in the background.
Task
.parTraverse(fixture.requests) { case (publicKey, qc) =>
fixture.synchronizer.sync(publicKey, qc).startAndForget
}
.runAsyncAndForget

// Simulate a some random time, which may or may not be enough to finish the downloads.
scheduler.tick(duration)

// Check now that there the persistent store has just one tree.
val test = for {
persistent <- fixture.persistentRef.get
} yield {
persistent(Namespace.Blocks).forall { case (_, block: TestBlock) =>
block.parentId.isEmpty || persistent(Namespace.Blocks).contains(
block.parentId
)
}
}

val testFuture = test.runToFuture

// Just simulate the immediate tasks.
scheduler.tick()

testFuture.value.get.get
}
}
Expand Up @@ -7,15 +7,18 @@ import cats.effect.concurrent.Ref
/** Simple in-memory key-value store based on `KVStoreState` and `KVStoreRunner`. */
object InMemoryKVStore {
def apply[F[_]: Sync, N]: F[KVStoreRunner[F, N]] =
Ref.of[F, KVStoreState[N]#Store](Map.empty).map { storeRef =>
new KVStoreState[N] with KVStoreRunner[F, N] {
def runReadOnly[A](query: KVStoreRead[N, A]): F[A] =
storeRef.get.map(compile(query).run)
Ref.of[F, KVStoreState[N]#Store](Map.empty).map(apply(_))

def runReadWrite[A](query: KVStore[N, A]): F[A] =
storeRef.modify { store =>
compile(query).run(store).value
}
}
def apply[F[_]: Sync, N](
storeRef: Ref[F, KVStoreState[N]#Store]
): KVStoreRunner[F, N] =
new KVStoreState[N] with KVStoreRunner[F, N] {
def runReadOnly[A](query: KVStoreRead[N, A]): F[A] =
storeRef.get.map(compile(query).run)

def runReadWrite[A](query: KVStore[N, A]): F[A] =
storeRef.modify { store =>
compile(query).run(store).value
}
}
}

0 comments on commit 8d0172e

Please sign in to comment.