Skip to content

Commit

Permalink
PM-3323: Trace individual events in their original format. Return sto…
Browse files Browse the repository at this point in the history
…rages for inspection.
  • Loading branch information
aakoshh committed Jun 18, 2021
1 parent ef66c9e commit 970fc6a
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 26 deletions.
Expand Up @@ -81,10 +81,17 @@ trait RobotComposition {
type CTS = ConsensusTracers[Task, RobotAgreement]
type STS = SyncTracers[Task, RobotAgreement]

/** Storages to be returned so we can look at state in tests. */
case class Storages(
blockStorage: BlockStorage[NS, RobotAgreement],
viewStateStorage: ViewStateStorage[NS, RobotAgreement],
stateStorage: KVRingBuffer[NS, Hash, Robot.State]
)(implicit val storeRunner: KVStoreRunner[Task, NS])

def compose(
opts: RobotOptions,
config: RobotConfig
): Resource[Task, Unit] = {
): Resource[Task, Storages] = {

val genesisState = Robot
.State(
Expand Down Expand Up @@ -140,17 +147,17 @@ trait RobotComposition {

_ <- makeBlockPruner(config, blockStorage, viewStateStorage)

} yield ()
} yield Storages(blockStorage, viewStateStorage, stateStorage)
}

protected def makeNetworkTracers =
RobotNetworkTracers.networkTracers
RobotNetworkTracers.networkHybridLogTracers

protected def makeConsensusTracers =
RobotConsensusTracers.consensusTracers
RobotConsensusTracers.consensusHybridLogTracers

protected def makeSyncTracers =
RobotSyncTracers.syncTracers
RobotSyncTracers.syncHybridLogTracers

protected def makeConnectionProvider(
config: RobotConfig,
Expand Down
Expand Up @@ -120,9 +120,9 @@ object RobotConsensusTracers {
)
}

implicit val consensusEventTracer =
implicit val consensusEventHybridLogTracer =
LogTracer.hybrid[Task, RobotConsensusEvent]

implicit val consensusTracers =
ConsensusTracers(consensusEventTracer)
implicit val consensusHybridLogTracers =
ConsensusTracers(consensusEventHybridLogTracer)
}
Expand Up @@ -55,9 +55,9 @@ object RobotNetworkTracers {
)
}

implicit val networkEventTracer =
implicit val networkEventHybridLogTracer =
LogTracer.hybrid[Task, RobotNetworkEvent]

implicit val networkTracers =
NetworkTracers(networkEventTracer)
implicit val networkHybridLogTracers =
NetworkTracers(networkEventHybridLogTracer)
}
Expand Up @@ -83,9 +83,9 @@ object RobotSyncTracers {
)
}

implicit val syncEventTracer =
implicit val syncEventHybridLogTracer =
LogTracer.hybrid[Task, RobotSyncEvent]

implicit val syncTracers =
SyncTracers(syncEventTracer)
implicit val syncHybridLogTracers =
SyncTracers(syncEventHybridLogTracer)
}
Expand Up @@ -2,13 +2,15 @@ package io.iohk.metronome.examples.robot.app

import cats.implicits._
import cats.effect.{Blocker, Resource}
import cats.effect.concurrent.Ref
import io.iohk.metronome.crypto.{ECKeyPair, ECPublicKey}
import io.iohk.metronome.networking.{
RemoteConnectionManager,
ConnectionHandler,
NetworkTracers
}
import io.iohk.metronome.hotstuff.service.tracing.{
ConsensusEvent,
ConsensusTracers,
SyncTracers
}
Expand All @@ -19,6 +21,7 @@ import io.iohk.metronome.examples.robot.app.config.{
RobotOptions
}
import io.iohk.metronome.logging.{InMemoryLogTracer, HybridLog, HybridLogObject}
import io.iohk.metronome.tracer.Tracer
import java.nio.file.Files
import java.net.InetSocketAddress
import monix.eval.Task
Expand All @@ -29,6 +32,7 @@ import org.scalatest.compatible.Assertion
import scala.concurrent.duration._
import org.scalatest.Inspectors
import org.scalatest.matchers.should.Matchers
import scala.reflect.ClassTag

class RobotCompositionSpec extends AnyFlatSpec with Matchers {
import RobotCompositionSpec._
Expand Down Expand Up @@ -57,31 +61,62 @@ class RobotCompositionSpec extends AnyFlatSpec with Matchers {
}
}

def eventCount[A: ClassTag](events: Vector[_]): Int =
events.collect { case e: A =>
e
}.size

behavior of "RobotComposition"

it should "compose components that can run and stay in sync" in test {
new Fixture(10.minutes) {
// Wait with the test result to keep the resources working.
override def test(envs: List[Env]) =
for {
_ <- Task.sleep(duration - 1.minute)
logs <- envs.traverse(_.logTracer.getLogs)
_ <- Task.sleep(duration - 1.minute)
logs <- envs.traverse(_.logTracer.getLogs)
consensusEvents <- envs.traverse(_.consensusEventTracer.getEvents)
syncEvents <- envs.traverse(_.syncEventTracer.getEvents)
} yield {
Inspectors.forAll(logs) { logs =>
// printLogs(logs)
Inspectors.forAll(consensusEvents) { events =>
// Networking isn't yet implemented, so it should just warn about timeouts.
logs.count(_.level == HybridLogObject.Level.Warn) should be > 0
eventCount[ConsensusEvent.Timeout](events) should be > 0
eventCount[ConsensusEvent.Quorum[_]](events) shouldBe 0
}
}
}
}
}

object RobotCompositionSpec {
import RobotConsensusTracers.RobotConsensusEvent
import RobotSyncTracers.RobotSyncEvent

class EventTracer[A](eventLogRef: Ref[Task, Vector[A]])
extends Tracer[Task, A] {

override def apply(a: => A): Task[Unit] =
eventLogRef.update(_ :+ a)

val clear = eventLogRef.set(Vector.empty)
val getEvents = eventLogRef.get
}
object EventTracer {
def apply[A] =
new EventTracer[A](Ref.unsafe[Task, Vector[A]](Vector.empty))
}

/** Things we may want to access in tests. */
case class Env(
logTracer: InMemoryLogTracer.HybridLogTracer[Task]
)
storages: TestComposition#Storages,
logTracer: InMemoryLogTracer.HybridLogTracer[Task],
consensusEventTracer: EventTracer[RobotConsensusEvent],
syncEventTracer: EventTracer[RobotSyncEvent]
) {
val clear =
logTracer.clear >> consensusEventTracer.clear >> syncEventTracer.clear
}

abstract class Fixture(val duration: FiniteDuration)
extends RobotComposition {
Expand Down Expand Up @@ -130,14 +165,22 @@ object RobotCompositionSpec {
val resources =
for {
config <- config

nodeEnvs <- (0 until config.network.nodes.size).toList.map { i =>
val opts = RobotOptions(nodeIndex = i)
val comp = makeComposition(scheduler)
val env = Env(comp.logTracer)
comp.compose(opts, config).as(env)
comp.compose(opts, config).map { storages =>
Env(
storages,
comp.logTracer,
comp.consensusEventTracer,
comp.syncEventTracer
)
}
}.sequence

_ <- Resource.pure[Task, Unit](()).onFinalize {
nodeEnvs.traverse(_.logTracer.clear).void
nodeEnvs.traverse(_.clear).void
}
} yield nodeEnvs

Expand All @@ -152,7 +195,9 @@ object RobotCompositionSpec {
*
* If the composer is reused, this should be cleared between tests.
*/
val logTracer = InMemoryLogTracer.hybrid[Task]
val logTracer = InMemoryLogTracer.hybrid[Task]
val consensusEventTracer = EventTracer[RobotConsensusEvent]
val syncEventTracer = EventTracer[RobotSyncEvent]

private def makeLogTracer[T: HybridLog] =
InMemoryLogTracer.hybrid[Task, T](logTracer)
Expand All @@ -164,12 +209,14 @@ object RobotCompositionSpec {

override protected def makeConsensusTracers = {
import RobotConsensusTracers._
ConsensusTracers(makeLogTracer[RobotConsensusEvent])
ConsensusTracers(
makeLogTracer[RobotConsensusEvent] |+| consensusEventTracer
)
}

override protected def makeSyncTracers = {
import RobotSyncTracers._
SyncTracers(makeLogTracer[RobotSyncEvent])
SyncTracers(makeLogTracer[RobotSyncEvent] |+| syncEventTracer)
}

// Use the `TestScheduler` to block on queries, otherwise the test hangs.
Expand Down

0 comments on commit 970fc6a

Please sign in to comment.