Skip to content

Commit

Permalink
PM-3241: Syncing state.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed May 14, 2021
1 parent 1cacb10 commit 3a85b37
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 4 deletions.
Expand Up @@ -3,6 +3,7 @@ package io.iohk.metronome.examples.robot.service
import cats.implicits._
import cats.effect.{Sync, Timer}
import cats.data.{NonEmptyList, NonEmptyVector}
import io.iohk.metronome.core.messages.RPCTracker
import io.iohk.metronome.crypto.ECPublicKey
import io.iohk.metronome.crypto.hash.Hash
import io.iohk.metronome.hotstuff.service.ApplicationService
Expand All @@ -19,6 +20,7 @@ import io.iohk.metronome.storage.{KVStoreRunner, KVRingBuffer}
import io.iohk.metronome.storage.KVStoreRead
import scala.util.Random
import scala.concurrent.duration._
import cats.effect.Concurrent

class RobotService[F[_]: Sync: Timer, N](
maxRow: Int,
Expand All @@ -27,7 +29,8 @@ class RobotService[F[_]: Sync: Timer, N](
blockStorage: BlockStorage[N, RobotAgreement],
viewStateStorage: ViewStateStorage[N, RobotAgreement],
stateStorage: KVRingBuffer[N, Hash, Robot.State],
simulatedDecisionTime: FiniteDuration = 1.second
rpcTracker: RPCTracker[F, RobotMessage],
simulatedDecisionTime: FiniteDuration
)(implicit storeRunner: KVStoreRunner[F, N])
extends ApplicationService[F, RobotAgreement] {

Expand Down Expand Up @@ -145,6 +148,68 @@ class RobotService[F[_]: Sync: Timer, N](
def syncState(
sources: NonEmptyVector[ECPublicKey],
block: RobotBlock
): F[Unit] = ???
): F[Unit] = {
def loop(sources: List[ECPublicKey]): F[Unit] = {
sources match {
case source :: sources =>
getState(source, block.postStateHash).flatMap {
case None =>
loop(sources)

case Some(state) =>
storeRunner.runReadWrite {
stateStorage.put(state.hash, state)
}
}
case Nil =>
Sync[F].raiseError(
new IllegalStateException(
"Could not get the state from any of the sources."
)
)
}
}

loop(sources.toList)
}

private def getState(
from: RobotAgreement.PKey,
stateHash: Hash
): F[Option[Robot.State]] = {
for {
requestId <- RobotMessage.RequestId[F]
request = RobotMessage.GetStateRequest(requestId, stateHash)
join <- rpcTracker.register(request)
_ <- network.sendMessage(from, request)
maybeResponse <- join
} yield maybeResponse.map(_.state).filter(_.hash == stateHash)
}

}

object RobotService {
def apply[F[_]: Concurrent: Timer, N](
maxRow: Int,
maxCol: Int,
network: Network[F, RobotAgreement, RobotMessage],
blockStorage: BlockStorage[N, RobotAgreement],
viewStateStorage: ViewStateStorage[N, RobotAgreement],
stateStorage: KVRingBuffer[N, Hash, Robot.State],
simulatedDecisionTime: FiniteDuration = 1.second,
timeout: FiniteDuration = 5.seconds
)(implicit storeRunner: KVStoreRunner[F, N]): F[RobotService[F, N]] =
for {
rpcTracker <- RPCTracker[F, RobotMessage](timeout)
service = new RobotService[F, N](
maxRow,
maxCol,
network,
blockStorage,
viewStateStorage,
stateStorage,
rpcTracker,
simulatedDecisionTime
)
} yield service
}
Expand Up @@ -64,13 +64,12 @@ class SyncService[F[_]: Concurrent: ContextShift, N, A <: Agreement: Block](
}

/** Request a block from a peer. */
private def getBlock(from: A#PKey, blockHash: A#Hash): F[Option[A#Block]] = {
private def getBlock(from: A#PKey, blockHash: A#Hash): F[Option[A#Block]] =
for {
requestId <- RequestId[F]
request = GetBlockRequest(requestId, blockHash)
maybeResponse <- sendRequest(from, request)
} yield maybeResponse.map(_.block)
}

/** Request the status of a peer. */
private def getStatus(from: A#PKey): F[Option[Status[A]]] =
Expand Down

0 comments on commit 3a85b37

Please sign in to comment.