Skip to content

Commit

Permalink
PM-3187: Emit traces from ConsensusService.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed May 11, 2021
1 parent 892dcbb commit dcfe2a8
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 20 deletions.
Expand Up @@ -19,12 +19,13 @@ import io.iohk.metronome.hotstuff.consensus.basic.{
}
import io.iohk.metronome.hotstuff.service.pipes.BlockSyncPipe
import io.iohk.metronome.hotstuff.service.storage.BlockStorage
import io.iohk.metronome.hotstuff.service.tracing.ConsensusTracers
import io.iohk.metronome.networking.ConnectionHandler
import io.iohk.metronome.storage.KVStoreRunner
import monix.catnap.ConcurrentQueue
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import io.iohk.metronome.hotstuff.service.tracing.ConsensusTracers
import scala.util.control.NonFatal

/** An effectful executor wrapping the pure HotStuff ProtocolState.
*
Expand Down Expand Up @@ -83,15 +84,13 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
.as(none)

case Right(valid) if valid.message.viewNumber < state.viewNumber =>
// TODO: Trace that obsolete message was received.
// TODO: Also collect these for the round so we can realise if we're out of sync.
none.pure[F]
// TODO (PM-3063): Also collect these for the round so we can realise if we're out of sync.
tracers.fromPast(valid).as(none)

case Right(valid)
if valid.message.viewNumber > state.viewNumber + maxEarlyViewNumberDiff =>
// TODO: Trace that a message from view far ahead in the future was received.
// TODO: Also collect these for the round so we can realise if we're out of sync.
none.pure[F]
// TODO (PM-3063): Also collect these for the round so we can realise if we're out of sync.
tracers.fromFuture(valid).as(none)

case Right(valid) =>
// We know that the message is to/from the leader and it's properly signed,
Expand Down Expand Up @@ -140,12 +139,11 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
}
}

/** Report an invalid message. */
/** Trace an invalid message. Could include other penalties as well to the sender. */
private def protocolError(
error: ProtocolError[A]
): F[Unit] =
// TODO: Trace
().pure[F]
tracers.rejected(error)

/** Add a Prepare message to the synchronisation and validation queue.
*
Expand Down Expand Up @@ -193,9 +191,14 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
eventQueue.poll.flatMap { event =>
stateRef.get.flatMap { state =>
val handle: F[Unit] = event match {
case e @ Event.NextView(_) =>
case e @ Event.NextView(viewNumber)
if viewNumber < state.viewNumber =>
().pure[F]

case e @ Event.NextView(viewNumber) =>
// TODO (PM-3063): Check whether we have timed out because we are out of sync
handleTransition(state.handleNextView(e))
tracers.timeout(viewNumber) >>
handleTransition(state.handleNextView(e))

case e @ Event.MessageReceived(_, _) =>
handleTransitionAttempt(
Expand Down Expand Up @@ -245,7 +248,10 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](

requeue.whenA(
nextState.viewNumber != state.viewNumber || nextState.phase != state.phase
)
) >>
tracers
.newView(nextState.viewNumber)
.whenA(nextState.viewNumber != state.viewNumber)
}

/** Carry out local effects before anything else,
Expand Down Expand Up @@ -302,8 +308,8 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
transitionAttempt: ProtocolState.TransitionAttempt[A]
): F[Unit] = transitionAttempt match {
case Left(error @ ProtocolError.TooEarly(_, _, _)) =>
// TODO: Trace too early message.
stashRef.update { _.stash(error) }
tracers.stashed(error) >>
stashRef.update { _.stash(error) }

case Left(error) =>
protocolError(error)
Expand All @@ -328,8 +334,7 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
import Event._
import Effect._

// TODO: Trace errors.
effect match {
val process = effect match {
case ScheduleNextView(viewNumber, timeout) =>
val event = validated(NextView(viewNumber))
Timer[F].sleep(timeout) >> enqueueEvent(event)
Expand Down Expand Up @@ -358,11 +363,16 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
case SendMessage(recipient, message) =>
network.sendMessage(recipient, message)
}

process.handleErrorWith { case NonFatal(ex) =>
tracers.error(ex)
}
}

/** Update the view state with the last Commit Quorum Certificate. */
private def saveCommitQC(qc: QuorumCertificate[A]): F[Unit] = {
assert(qc.phase == Phase.Commit)
tracers.quorum(qc)
// TODO (PM-3112): Persist View State.
???
}
Expand Down
@@ -1,5 +1,47 @@
package io.iohk.metronome.hotstuff.service.tracing

import io.iohk.metronome.hotstuff.consensus.basic.Agreement
import io.iohk.metronome.hotstuff.consensus.ViewNumber
import io.iohk.metronome.hotstuff.consensus.basic.{
Agreement,
Event,
ProtocolError
}
import io.iohk.metronome.hotstuff.consensus.basic.QuorumCertificate

sealed trait ConsensusEvent[A <: Agreement]
sealed trait ConsensusEvent[+A <: Agreement]

object ConsensusEvent {

/** The round ended without having reached decision. */
case class Timeout(viewNumber: ViewNumber) extends ConsensusEvent[Nothing]

/** The state advanced to a new view. */
case class NewView(viewNumber: ViewNumber) extends ConsensusEvent[Nothing]

/** Quorum over some block. */
case class Quorum[A <: Agreement](quorumCertificate: QuorumCertificate[A])
extends ConsensusEvent[A]

/** A formally valid message was received from an earlier view number. */
case class FromPast[A <: Agreement](message: Event.MessageReceived[A])
extends ConsensusEvent[A]

/** A formally valid message was received from a future view number. */
case class FromFuture[A <: Agreement](message: Event.MessageReceived[A])
extends ConsensusEvent[A]

/** An event that arrived too early but got stashed and will be redelivered. */
case class Stashed[A <: Agreement](
error: ProtocolError.TooEarly[A]
) extends ConsensusEvent[A]

/** A rejected event. */
case class Rejected[A <: Agreement](
error: ProtocolError[A]
) extends ConsensusEvent[A]

/** An unexpected error in one of the background tasks. */
case class Error(
error: Throwable
) extends ConsensusEvent[Nothing]
}
@@ -1,7 +1,40 @@
package io.iohk.metronome.hotstuff.service.tracing

import cats.implicits._
import io.iohk.metronome.tracer.Tracer
import io.iohk.metronome.hotstuff.consensus.basic.Agreement
import io.iohk.metronome.hotstuff.consensus.ViewNumber
import io.iohk.metronome.hotstuff.consensus.basic.{
Agreement,
Event,
ProtocolError,
QuorumCertificate
}

case class ConsensusTracers[F[_], A <: Agreement](
timeout: Tracer[F, ViewNumber],
newView: Tracer[F, ViewNumber],
quorum: Tracer[F, QuorumCertificate[A]],
fromPast: Tracer[F, Event.MessageReceived[A]],
fromFuture: Tracer[F, Event.MessageReceived[A]],
stashed: Tracer[F, ProtocolError.TooEarly[A]],
rejected: Tracer[F, ProtocolError[A]],
error: Tracer[F, Throwable]
)

object ConsensusTracers {
import ConsensusEvent._

def apply[F[_], A <: Agreement](
tracer: Tracer[F, ConsensusEvent[A]]
): ConsensusTracers[F, A] =
ConsensusTracers[F, A](
timeout = tracer.contramap[ViewNumber](Timeout(_)),
newView = tracer.contramap[ViewNumber](NewView(_)),
quorum = tracer.contramap[QuorumCertificate[A]](Quorum(_)),
fromPast = tracer.contramap[Event.MessageReceived[A]](FromPast(_)),
fromFuture = tracer.contramap[Event.MessageReceived[A]](FromFuture(_)),
stashed = tracer.contramap[ProtocolError.TooEarly[A]](Stashed(_)),
rejected = tracer.contramap[ProtocolError[A]](Rejected(_)),
error = tracer.contramap[Throwable](Error(_))
)
}

0 comments on commit dcfe2a8

Please sign in to comment.