Skip to content

Commit

Permalink
PM-3146: Rename processing methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed May 11, 2021
1 parent 5dcb6b8 commit 9d7eea5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Expand Up @@ -47,7 +47,7 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
stateRef.get

/** Process incoming network messages. */
private def processMessages: F[Unit] =
private def processNetworkMessages: F[Unit] =
network.incomingMessages
.mapEval[Unit] { case ConnectionHandler.MessageReceived(from, message) =>
validateMessage(Event.MessageReceived(from, message)).flatMap {
Expand Down Expand Up @@ -150,8 +150,8 @@ class ConsensusService[F[_]: Timer: Concurrent, N, A <: Agreement: Block](
SyncPipe.Request(sender, prepare)
)

/** Process the validation result queue. */
private def processSyncAndValidateResponses: F[Unit] =
/** Process the synchronization. result queue. */
private def processSyncPipe: F[Unit] =
syncPipe.receive
.mapEval[Unit] { case SyncPipe.Response(request, isValid) =>
if (isValid) {
Expand Down Expand Up @@ -454,8 +454,8 @@ object ConsensusService {
)(
_.cancelEffects
)
_ <- Concurrent[F].background(service.processMessages)
_ <- Concurrent[F].background(service.processSyncAndValidateResponses)
_ <- Concurrent[F].background(service.processNetworkMessages)
_ <- Concurrent[F].background(service.processSyncPipe)
_ <- Concurrent[F].background(service.processEvents)
_ <- Concurrent[F].background(service.executeBlocks)
initEffects = ProtocolState.init(initState)
Expand Down
Expand Up @@ -43,7 +43,7 @@ class SyncService[F[_]: Sync, N, A <: Agreement](
def getStatus(from: A#PKey): F[Option[Status[A]]] = ???

/** Process incoming network messages */
private def processMessages: F[Unit] = {
private def processNetworkMessages: F[Unit] = {
import SyncMessage._
// TODO (PM-3186): Rate limiting per source.
network.incomingMessages
Expand Down Expand Up @@ -99,7 +99,8 @@ class SyncService[F[_]: Sync, N, A <: Agreement](
.completedL
}

def processSyncAndValidateRequests: F[Unit] = {
/** Read Requests from the SyncPipe and send Responses. */
def processSyncPipe: F[Unit] = {
syncPipe.receive
.mapEval[Unit] { case request @ SyncPipe.Request(sender, prepare) =>
// TODO (PM-3134): Block sync.
Expand Down Expand Up @@ -148,7 +149,7 @@ object SyncService {
consensusService,
fiberPool
)
_ <- Concurrent[F].background(service.processMessages)
_ <- Concurrent[F].background(service.processSyncAndValidateRequests)
_ <- Concurrent[F].background(service.processNetworkMessages)
_ <- Concurrent[F].background(service.processSyncPipe)
} yield service
}

0 comments on commit 9d7eea5

Please sign in to comment.