diff --git a/README.md b/README.md index be11e89a..d9def8d7 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ To run tests, use the wild cards again and the `.test` postix: ```console mill __.test -mill --watch metronome[2.13.4].rocksdb.test +mill --watch metronome[2.13.4].rocksdb.props.test ``` To run a single test class, use the `.single` method with the full path to the spec. Note that `ScalaTest` tests are in the `specs` subdirectories while `ScalaCheck` ones are in `props`. @@ -123,3 +123,70 @@ $ tail -f ~/.metronome/examples/robot/logs/node-0.log ``` To clear out everything before a restart, just run `rm -rf ~/.metronome/examples/robot`. + + +## Running the Checkpointing Service + +First generate some ECDSA keys to be used by the federation, as well as one to be +used by the PoW interpreter (it has to be different from the key used by the service): + +```console +$ mill metronome[2.13.4].checkpointing.app.run keygen > service-keys.json +[424/424] metronome[2.13.4].checkpointing.app.run +$ cat service-keys.json +{ + "publicKey" : "ab5944b35a12f87133b5cf525b7a2ecc698a059b4d46898c4f58970e73069aeebeb55765ade41d781120c27ef8a88ae1cb2ff5c2e70345373b524dcfcb6636d5", + "privateKey" : "057b39a793c06683b4ebec95456f576be4c44e4404e126f0a46689d259209a75" +} +$ mill metronome[2.13.4].checkpointing.app.run keygen > interpreter-keys.json +[424/424] metronome[2.13.4].checkpointing.app.run +``` + +The results can be parsed for example with [jq](https://stedolan.github.io/jq/), as seen in the example below. + +Create a config file to provide the necessary settings which the default `application.conf` doesn't have. For example: + +```shell +cat <example.conf +include "/application.conf" + +metronome { + checkpointing { + federation { + self { + host = $(dig +short myip.opendns.com @resolver4.opendns.com) + port = 40000 + private-key = $(jq -r ".privateKey" service-keys.json) + } + + # Append here other the other nodes you create. + others = [ + ] + } + local { + interpreter { + public-key = $(jq -r ".publicKey" interpreter-keys.json) + } + } + } +} +EOF +``` + +Build the service into a fat JAR so we can pass system properties when we run it: + +```shell +SCALA_VER=2.13.4 +ASSEMBLY_JAR=${PWD}/out/metronome/${SCALA_VER}/checkpointing/app/assembly/dest/out.jar +mill metoronme[$SCALA_VER].checkpointing.app.assembly +``` + +Start the service by pointing it at the example configuration: + +```console +$ java -cp $ASSEMBLY_JAR -Dconfig.file=example.conf io.iohk.metronome.checkpointing.app.CheckpointingApp service +13:22:02.853 WARN i.i.m.h.s.tracing.ConsensusEvent Timeout {"viewNumber":7,"messageCounter":{"past":0,"present":0,"future":0}} +13:22:02.895 WARN i.i.m.c.s.tracing.CheckpointingEvent InterpreterUnavailable {"messageType":"CreateBlockBodyRequest"} +``` + +Detailed logs should appear in `~/.metronome/checkpointing/logs/service.log`. diff --git a/build.sc b/build.sc index fd175963..3aaa377d 100644 --- a/build.sc +++ b/build.sc @@ -389,8 +389,11 @@ class MetronomeModule(val crossScalaVersion: String) extends CrossScalaModule { override def ivyDeps = super.ivyDeps() ++ Agg( ivy"ch.qos.logback:logback-classic:${VersionOf.logback}", - ivy"io.iohk::scalanet-discovery:${VersionOf.scalanet}" + ivy"io.iohk::scalanet-discovery:${VersionOf.scalanet}", + ivy"com.github.scopt::scopt:${VersionOf.scopt}" ) + + object specs extends SpecsModule } } diff --git a/metronome/checkpointing/app/resources/application.conf b/metronome/checkpointing/app/resources/application.conf new file mode 100644 index 00000000..43c06e0a --- /dev/null +++ b/metronome/checkpointing/app/resources/application.conf @@ -0,0 +1,80 @@ +metronome { + checkpointing { + # A name for the node that we can use to distinguish + # if we run multiple instances on the same machine. + name = service + + federation { + # Public address of this federation member; required. + self { + host = null + port = 9080 + # Private ECDSA key of this federation member in hexadecimal format; required. + # It can either the the key itself, or a path to a file which contains the key. + # The public key will be derived from the private key. + private-key = null + } + + # List of other federation members; records of {host, port, public-key}. + others = [] + + # The maximum number of tolerated Byzantine nodes; optional. + # At most (n-1)/3, but can be lower to require smaller quorum. + maxFaulty = null + } + + consensus { + # Minimum time to allow for a HotStuff round. + min-timeout = 5s + # Maximum time to allow for a HotStuff round, after numerous timeouts. + max-timeout = 15s + # Increment factor to apply on the timeout after a failed round. + timeout-factor = 1.2 + } + + # Network configuration to accept connections from remote federation nodes. + remote { + # Bind address for the checkpointing service remote interface. + listen { + host = 0.0.0.0 + port = ${metronome.checkpointing.federation.self.port} + } + # Request roundtrip timeout. + timeout = 3s + } + + # Network configuration to accept connection from the local interpreter. + local { + # Bind address for the checkpointing service local interface. + listen { + host = 127.0.0.1 + port = 9081 + } + # Node of the PoW Interpreter. + interpreter { + host = 127.0.0.1 + port = 9082 + # ECDSA key used by the interpreter to secure the connection; required. + public-key = null + } + # Request roundtrip timeout. + timeout = 3s + # Whether we should expect the Interpreter to send us notifications about + # the arrival of a checkpoint height, or check in every time we have to + # create a block. Depends on how the Interpreter is implemented, it's an + # optimisation to save unnecessary round trips. + expect-checkpoint-candidate-notifications = false + } + + database { + # Storage location for RocksDB. + path = ${user.home}"/.metronome/checkpointing/db/"${metronome.checkpointing.name} + # Size of the ring buffer for the checkpointing ledger. + state-history-size = 100 + # Number of blocks to keep before pruning. + block-history-size = 100 + # Time to wait before pruning a block from history. + prune-interval = 60s + } + } +} diff --git a/metronome/checkpointing/app/resources/logback.xml b/metronome/checkpointing/app/resources/logback.xml new file mode 100644 index 00000000..0b6d0782 --- /dev/null +++ b/metronome/checkpointing/app/resources/logback.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + ${log.console.level} + + + ${encoder.pattern} + + + + + ${log.file.dir}/${log.file.name}.log + true + + ${log.file.dir}/${log.file.name}.%i.log.zip + 1 + 10 + + + 10MB + + + ${encoder.pattern} + + + + + + + + + + + + diff --git a/metronome/checkpointing/app/specs/resources/test.conf b/metronome/checkpointing/app/specs/resources/test.conf new file mode 100644 index 00000000..50d01581 --- /dev/null +++ b/metronome/checkpointing/app/specs/resources/test.conf @@ -0,0 +1,28 @@ +# Extend the defaults. +# The leading "/"" works when a file refers to the default module included in the resources. +# Here we could use "application" or "application.conf" without it, but when executed with +# `java -Dconfig.file=example.conf` only the one that begins with "/" seems to work. +include "/application" + +metronome { + checkpointing { + federation { + self { + host = localhost + port = 40000 + private-key = cd2a249a76d8e9fd0e538e651b9e97c3fc5efcceeb10fc98dd57fbdd156457e6 + } + + others = [ + {host = localhost, port = 40001, public-key = ff7849206b7faef9557cf53333739ecd947698d76ba11ffabf2587435322b9a8b4f063faf97e5aace2a75b8f6714e5bd3d483cad6e830ae3036afcc4ff1b5369, private-key = 15cc92810f61bc705f939432197fee100bcc1a99d6cc66c7c28fa158d4144f84} + {host = localhost, port = 40002, public-key = cb020251d396614a35038dd2ff88fd2f1a5fd74c8bcad4b353fa605405c8b1b8c80ee12d2a10b1fca59424b16890c8115fbc94a68026369acc3c2603595e6387, private-key = a4769d076bb7eefeb1aba8aa97520d8f7f8bcd65049a128c3040f9dd5d3eeae6} + {host = localhost, port = 40003, public-key = 23fcab42e8f1078880b27aab4849092489bfa8d3e3b0faa54c9db89e89223c783ec7a3b2f8e6461b27778f78cea261a2272abe31c5601173b2964ef14af897dc, private-key = 9441f3e96104a11405cb0e03ceb693f889770dd2c155dab7573023e00e878ace} + ] + } + local { + interpreter { + public-key = 65e2f6da1bb1e7f0b07f5b892c568acb5429833e30af3974eedd2137ebc9f1fb8b0c462d4ca558dda64c5da8cf10280a1f579556ac8a611bd2fa7199f5a2c69a + } + } + } +} diff --git a/metronome/checkpointing/app/specs/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfigParserSpec.scala b/metronome/checkpointing/app/specs/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfigParserSpec.scala new file mode 100644 index 00000000..c6d8ee1d --- /dev/null +++ b/metronome/checkpointing/app/specs/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfigParserSpec.scala @@ -0,0 +1,43 @@ +package io.iohk.metronome.checkpointing.app.config + +import com.typesafe.config.ConfigFactory +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.Inside +import org.scalatest.matchers.should.Matchers +import java.nio.file.Path + +class CheckpointingConfigParserSpec + extends AnyFlatSpec + with Inside + with Matchers { + + behavior of "CheckpointingConfigParser" + + it should "not parse the default configuration due to missing data" in { + inside(CheckpointingConfigParser.parse(ConfigFactory.load())) { + case Left(_) => + succeed + } + } + + it should "parse the with the test overrides" in { + inside( + CheckpointingConfigParser.parse(ConfigFactory.load("test.conf")) + ) { case Right(config) => + config.remote.listen.port shouldBe config.federation.self.port + config.federation.self.privateKey.isLeft shouldBe true + } + } + + it should "parse when the private key is a path" in { + inside( + CheckpointingConfigParser.parse { + ConfigFactory.parseString( + "metronome.checkpointing.federation.self.private-key=./node.key" + ) withFallback ConfigFactory.load("test.conf") + } + ) { case Right(config) => + config.federation.self.privateKey shouldBe Right(Path.of("./node.key")) + } + } +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingApp.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingApp.scala new file mode 100644 index 00000000..f0dadcf9 --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingApp.scala @@ -0,0 +1,54 @@ +package io.iohk.metronome.checkpointing.app + +import cats.effect.ExitCode +import com.typesafe.config.ConfigFactory +import monix.eval.{Task, TaskApp} +import io.iohk.metronome.checkpointing.app.config.{ + CheckpointingConfigParser, + CheckpointingOptions +} + +object CheckpointingApp extends TaskApp { + override def run(args: List[String]): Task[ExitCode] = { + CheckpointingOptions.parse(args) match { + case None => + Task.pure(ExitCode.Error) + + case Some(opts) => + run(opts) + } + } + + def run(opts: CheckpointingOptions): Task[ExitCode] = + opts.mode match { + case CheckpointingOptions.KeyGen => + setLogProperties(opts, "keygen") >> + // Not parsing the configuration for this as it may be incomplete without the keys. + CheckpointingKeyGen.generateAndPrint.as(ExitCode.Success) + + case CheckpointingOptions.Service => + CheckpointingConfigParser.parse(ConfigFactory.load()) match { + case Left(error) => + Task + .delay(println(s"Error parsing configuration: $error")) + .as(ExitCode.Error) + + case Right(config) => + setLogProperties(opts, config.name) >> + CheckpointingComposition + .compose(config) + .use(_ => Task.never.as(ExitCode.Success)) + } + } + + /** Set dynamic system properties expected by `logback.xml` before any logging module is loaded. */ + def setLogProperties( + opts: CheckpointingOptions, + name: String + ): Task[Unit] = Task { + // Separate log file for each node. + System.setProperty("log.file.name", name) + // Control how much logging goes on the console. + System.setProperty("log.console.level", opts.logLevel.toString) + }.void +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingComposition.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingComposition.scala new file mode 100644 index 00000000..4d8d592e --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingComposition.scala @@ -0,0 +1,595 @@ +package io.iohk.metronome.checkpointing.app + +import cats.implicits._ +import cats.effect.{Resource, Blocker, Concurrent} +import io.iohk.metronome.crypto.{ + ECKeyPair, + ECPublicKey, + ECPrivateKey, + GroupSignature +} +import io.iohk.metronome.checkpointing.CheckpointingAgreement +import io.iohk.metronome.checkpointing.app.codecs.CheckpointingCodecs +import io.iohk.metronome.checkpointing.app.config.{ + CheckpointingConfig, + CheckpointingConfigParser +} +import io.iohk.metronome.checkpointing.app.tracing._ +import io.iohk.metronome.checkpointing.service.CheckpointingService +import io.iohk.metronome.checkpointing.service.messages.CheckpointingMessage +import io.iohk.metronome.checkpointing.service.tracing.CheckpointingEvent +import io.iohk.metronome.checkpointing.service.storage.LedgerStorage +import io.iohk.metronome.checkpointing.models.{ + Block, + Ledger, + CheckpointingSigning +} +import io.iohk.metronome.checkpointing.interpreter.InterpreterConnection +import io.iohk.metronome.checkpointing.interpreter.messages.InterpreterMessage +import io.iohk.metronome.checkpointing.interpreter.codecs.DefaultInterpreterCodecs +import io.iohk.metronome.hotstuff.consensus.{ + ViewNumber, + Federation, + LeaderSelection +} +import io.iohk.metronome.hotstuff.consensus.basic.{ + QuorumCertificate, + Phase, + ProtocolState +} +import io.iohk.metronome.hotstuff.service.{HotStuffService, ConsensusService} +import io.iohk.metronome.hotstuff.service.messages.{ + DuplexMessage, + HotStuffMessage +} +import io.iohk.metronome.hotstuff.service.tracing.{ + ConsensusTracers, + SyncTracers +} +import io.iohk.metronome.hotstuff.service.storage.{ + BlockStorage, + ViewStateStorage, + BlockPruning +} +import io.iohk.metronome.networking.{ + EncryptedConnectionProvider, + ScalanetConnectionProvider, + RemoteConnectionManager, + LocalConnectionManager, + NetworkTracers, + Network +} +import io.iohk.metronome.rocksdb.RocksDBStore +import io.iohk.metronome.storage.{ + KVStoreRunner, + KVStoreRead, + KVStore, + KVCollection, + KVTree +} +import io.iohk.scalanet.peergroup.dynamictls.DynamicTLSPeerGroup +import io.circe.Json +import java.security.SecureRandom +import java.nio.file.Files +import monix.eval.Task +import monix.execution.Scheduler +import scodec.Codec +import io.iohk.metronome.tracer.Tracer +import io.iohk.metronome.checkpointing.interpreter.messages.InterpreterMessage +import java.net.InetSocketAddress + +/** Object composition, allowing overrides in integration tests. */ +trait CheckpointingComposition { + import CheckpointingCodecs._ + import DefaultInterpreterCodecs.interpreterMessageCodec + + type RemoteNetworkMessage = + DuplexMessage[CheckpointingAgreement, CheckpointingMessage] + + type NS = RocksDBStore.Namespace + + type RNTS = NetworkTracers[Task, ECPublicKey, RemoteNetworkMessage] + type LNTS = NetworkTracers[Task, ECPublicKey, InterpreterMessage] + type CTS = ConsensusTracers[Task, CheckpointingAgreement] + type STS = SyncTracers[Task, CheckpointingAgreement] + + /** Wire together the Checkpointing Service. */ + def compose( + config: CheckpointingConfig + ): Resource[Task, Unit] = { + + implicit val remoteNetworkTracers = makeRemoteNetworkTracers + implicit val localNetworkTracers = makeLocalNetworkTracers + implicit val consesusTracers = makeConsensusTracers + implicit val syncTracers = makeSyncTracers + implicit val serviceTracer = makeServiceTracer + + for { + keyPair <- Resource.liftF(readPrivateKey(config)).map { privateKey => + ECKeyPair(privateKey.underlying) + } + + remoteConnectionProvider <- makeRemoteConnectionProvider( + config, + keyPair + ) + remoteConnectionManager <- makeRemoteConnectionManager( + config, + remoteConnectionProvider + ) + (hotstuffNetwork, applicationNetwork) <- makeNetworks( + remoteConnectionManager + ) + + localConnectionProvider <- makeLocalConnectionProvider( + config, + keyPair + ) + localConnectionManager <- makeLocalConnectionManager( + config, + localConnectionProvider + ) + + db <- makeRocksDBStore(config) + implicit0(storeRunner: KVStoreRunner[Task, NS]) = makeKVStoreRunner(db) + + blockStorage <- makeBlockStorage(Block.genesis) + viewStateStorage <- makeViewStateStorage(Block.genesis) + ledgerStorage <- makeLedgerStorage(config, Ledger.empty) + + _ <- makeBlockPruner(config, blockStorage, viewStateStorage) + + appService <- makeApplicationService( + config, + keyPair.pub, + applicationNetwork, + localConnectionManager, + blockStorage, + viewStateStorage, + ledgerStorage + ) + + _ <- makeHotstuffService( + config, + keyPair, + Block.genesis, + hotstuffNetwork, + appService, + blockStorage, + viewStateStorage + ) + + } yield () + } + + protected def makeRemoteNetworkTracers: RNTS = + CheckpointingRemoteNetworkTracers.networkHybridLogTracers + + protected def makeLocalNetworkTracers: LNTS = + CheckpointingLocalNetworkTracers.networkHybridLogTracers + + protected def makeConsensusTracers: CTS = + CheckpointingConsensusTracers.consensusHybridLogTracers + + protected def makeSyncTracers: STS = + CheckpointingSyncTracers.syncHybridLogTracers + + protected def makeServiceTracer: Tracer[Task, CheckpointingEvent] = + CheckpointingServiceTracers.serviceEventHybridLogTracer + + protected def makeConnectionProvider[M: Codec]( + bindAddress: InetSocketAddress, + keyPair: ECKeyPair, + name: String + ): Resource[Task, EncryptedConnectionProvider[Task, ECPublicKey, M]] = { + for { + implicit0(scheduler: Scheduler) <- Resource.make( + Task(Scheduler.io(s"scalanet-$name")) + )(scheduler => Task(scheduler.shutdown())) + + connectionProvider <- ScalanetConnectionProvider[ + Task, + ECPublicKey, + M + ]( + bindAddress = bindAddress, + nodeKeyPair = keyPair, + new SecureRandom(), + useNativeTlsImplementation = true, + framingConfig = DynamicTLSPeerGroup.FramingConfig + .buildStandardFrameConfig( + maxFrameLength = 1024 * 1024, + lengthFieldLength = 8 + ) + .fold(e => sys.error(e.description), identity), + maxIncomingQueueSizePerPeer = 100 + ) + } yield connectionProvider + } + + protected def makeRemoteConnectionProvider( + config: CheckpointingConfig, + keyPair: ECKeyPair + ): Resource[Task, EncryptedConnectionProvider[ + Task, + ECPublicKey, + RemoteNetworkMessage + ]] = { + makeConnectionProvider[RemoteNetworkMessage]( + bindAddress = config.remote.listen.address, + keyPair = keyPair, + name = "remote" + ) + } + + protected def makeLocalConnectionProvider( + config: CheckpointingConfig, + keyPair: ECKeyPair + ): Resource[Task, EncryptedConnectionProvider[ + Task, + ECPublicKey, + InterpreterMessage + ]] = { + makeConnectionProvider[InterpreterMessage]( + bindAddress = config.local.listen.address, + keyPair = keyPair, + name = "local" + ) + } + + protected def makeRemoteConnectionManager( + config: CheckpointingConfig, + connectionProvider: EncryptedConnectionProvider[ + Task, + ECPublicKey, + RemoteNetworkMessage + ] + )(implicit + networkTracers: RNTS + ): Resource[Task, RemoteConnectionManager[ + Task, + ECPublicKey, + RemoteNetworkMessage + ]] = { + val clusterConfig = RemoteConnectionManager.ClusterConfig( + clusterNodes = config.federation.others.map { node => + node.publicKey -> node.address + }.toSet + ) + val retryConfig = RemoteConnectionManager.RetryConfig.default + + RemoteConnectionManager[ + Task, + ECPublicKey, + RemoteNetworkMessage + ](connectionProvider, clusterConfig, retryConfig) + } + + protected def makeLocalConnectionManager( + config: CheckpointingConfig, + connectionProvider: EncryptedConnectionProvider[ + Task, + ECPublicKey, + InterpreterMessage + ] + )(implicit + networkTracers: LNTS + ): Resource[Task, LocalConnectionManager[ + Task, + ECPublicKey, + InterpreterMessage + ]] = { + LocalConnectionManager[ + Task, + ECPublicKey, + InterpreterMessage + ]( + connectionProvider, + targetKey = config.local.interpreter.publicKey, + targetAddress = config.local.interpreter.address, + retryConfig = RemoteConnectionManager.RetryConfig.default + ) + } + + protected def readPrivateKey( + config: CheckpointingConfig + ): Task[ECPrivateKey] = + config.federation.self.privateKey match { + case Left(privateKey) => + privateKey.pure[Task] + case Right(path) => + for { + content <- Task(Files.readString(path)) + json = Json.fromString(content) + privateKey <- Task.fromTry( + CheckpointingConfigParser.ecPrivateKeyDecoder.decodeJson(json).toTry + ) + } yield privateKey + } + + protected def makeNetworks( + connectionManager: RemoteConnectionManager[ + Task, + ECPublicKey, + RemoteNetworkMessage + ] + ): Resource[ + Task, + ( + Network[Task, ECPublicKey, HotStuffMessage[CheckpointingAgreement]], + Network[Task, ECPublicKey, CheckpointingMessage] + ) + ] = { + val network = Network + .fromRemoteConnnectionManager[ + Task, + CheckpointingAgreement.PKey, + RemoteNetworkMessage + ]( + connectionManager + ) + + for { + (hotstuffNetwork, applicationNetwork) <- Network.splitter[ + Task, + CheckpointingAgreement.PKey, + RemoteNetworkMessage, + HotStuffMessage[CheckpointingAgreement], + CheckpointingMessage + ](network)( + split = { + case DuplexMessage.AgreementMessage(m) => Left(m) + case DuplexMessage.ApplicationMessage(m) => Right(m) + }, + merge = { + case Left(m) => DuplexMessage.AgreementMessage(m) + case Right(m) => DuplexMessage.ApplicationMessage(m) + } + ) + } yield (hotstuffNetwork, applicationNetwork) + } + + protected def makeRocksDBStore( + config: CheckpointingConfig + ): Resource[Task, RocksDBStore[Task]] = { + val dbConfig = RocksDBStore.Config.default( + config.database.path.resolve(config.name) + ) + for { + dir <- Resource.liftF { + Task { + Files.createDirectories(dbConfig.path) + } + } + blocker <- makeDBBlocker + db <- RocksDBStore[Task](dbConfig, CheckpointingNamespaces.all, blocker) + } yield db + } + + protected def makeDBBlocker: Resource[Task, Blocker] = + Blocker[Task] + + protected def makeKVStoreRunner( + db: RocksDBStore[Task] + ): KVStoreRunner[Task, NS] = { + new KVStoreRunner[Task, NS] { + override def runReadOnly[A]( + query: KVStoreRead[NS, A] + ): Task[A] = db.runReadOnly(query) + + override def runReadWrite[A](query: KVStore[NS, A]): Task[A] = + db.runWithBatching(query) + } + } + + protected def makeBlockStorage(genesis: Block)(implicit + storeRunner: KVStoreRunner[Task, NS] + ): Resource[Task, BlockStorage[NS, CheckpointingAgreement]] = { + implicit def `Codec[Set[T]]`[T: Codec] = { + import scodec.codecs.implicits._ + Codec[List[T]].xmap[Set[T]](_.toSet, _.toList) + } + + val blockStorage = new BlockStorage[NS, CheckpointingAgreement]( + blockColl = + new KVCollection[NS, Block.Hash, Block](CheckpointingNamespaces.Block), + blockMetaColl = + new KVCollection[NS, Block.Hash, KVTree.NodeMeta[Block.Hash]]( + CheckpointingNamespaces.BlockMeta + ), + parentToChildrenColl = new KVCollection[NS, Block.Hash, Set[Block.Hash]]( + CheckpointingNamespaces.BlockToChildren + ) + ) + + // (Re)insert genesis. It's okay if it has been pruned before, + // but if the application is just starting it will need it. + Resource + .liftF { + storeRunner.runReadWrite { + blockStorage.put(genesis) + } + } + .as(blockStorage) + } + + protected def makeViewStateStorage(genesis: Block)(implicit + storeRunner: KVStoreRunner[Task, NS] + ): Resource[Task, ViewStateStorage[NS, CheckpointingAgreement]] = + Resource.liftF { + val genesisQC = QuorumCertificate[CheckpointingAgreement, Phase.Prepare]( + phase = Phase.Prepare, + viewNumber = ViewNumber(0), + blockHash = genesis.hash, + signature = GroupSignature(Nil) + ) + storeRunner.runReadWrite { + ViewStateStorage[NS, CheckpointingAgreement]( + CheckpointingNamespaces.ViewState, + genesis = ViewStateStorage.Bundle.fromGenesisQC(genesisQC) + ) + } + } + + protected def makeLedgerStorage( + config: CheckpointingConfig, + genesisState: Ledger + )(implicit + storeRunner: KVStoreRunner[Task, NS] + ): Resource[Task, LedgerStorage[NS]] = Resource.liftF { + for { + coll <- Task.pure { + new KVCollection[NS, Ledger.Hash, Ledger]( + CheckpointingNamespaces.Ledger + ) + } + // Insert the genesis state straight into the underlying collection, + // not the ringbuffer, so it doesn't get evicted if we restart the + // app a few times. + _ <- storeRunner.runReadWrite { + coll.put(genesisState.hash, genesisState) + } + stateStorage = + new LedgerStorage[NS]( + coll, + ledgerMetaNamespace = CheckpointingNamespaces.LedgerMeta, + maxHistorySize = config.database.stateHistorySize + ) + } yield stateStorage + } + + protected def makeBlockPruner( + config: CheckpointingConfig, + blockStorage: BlockStorage[NS, CheckpointingAgreement], + viewStateStorage: ViewStateStorage[NS, CheckpointingAgreement] + )(implicit storeRunner: KVStoreRunner[Task, NS]): Resource[Task, Unit] = + Concurrent[Task].background { + storeRunner + .runReadWrite { + BlockPruning.prune( + blockStorage, + viewStateStorage, + config.database.blockHistorySize + ) + } + .delayResult(config.database.pruneInterval) + .foreverM + }.void + + protected def makeApplicationService( + config: CheckpointingConfig, + publicKey: ECPublicKey, + applicationNetwork: Network[ + Task, + CheckpointingAgreement.PKey, + CheckpointingMessage + ], + interpreterConnection: InterpreterConnection[Task], + blockStorage: BlockStorage[NS, CheckpointingAgreement], + viewStateStorage: ViewStateStorage[NS, CheckpointingAgreement], + ledgerStorage: LedgerStorage[NS] + )(implicit + storeRunner: KVStoreRunner[Task, NS], + serviceTracers: Tracer[Task, CheckpointingEvent] + ): Resource[Task, CheckpointingService[Task, NS]] = { + CheckpointingService[Task, NS]( + publicKey = publicKey, + network = applicationNetwork, + ledgerStorage = ledgerStorage, + blockStorage = blockStorage, + viewStateStorage = viewStateStorage, + interpreterConnection = interpreterConnection, + config = CheckpointingService.Config( + expectCheckpointCandidateNotifications = + config.local.expectCheckpointCandidateNotifications, + interpreterTimeout = config.local.timeout, + networkTimeout = config.remote.timeout + ) + ) + } + + protected def makeHotstuffService( + config: CheckpointingConfig, + keyPair: ECKeyPair, + genesis: Block, + hotstuffNetwork: Network[ + Task, + CheckpointingAgreement.PKey, + HotStuffMessage[CheckpointingAgreement] + ], + appService: CheckpointingService[Task, NS], + blockStorage: BlockStorage[NS, CheckpointingAgreement], + viewStateStorage: ViewStateStorage[NS, CheckpointingAgreement] + )(implicit + storeRunner: KVStoreRunner[Task, NS], + consensusTracers: CTS, + syncTracers: STS + ): Resource[Task, Unit] = { + implicit val leaderSelection = LeaderSelection.Hashing + implicit val signing = new CheckpointingSigning(genesis.hash) + + for { + federation <- Resource.liftF { + Task.fromEither((e: String) => new IllegalArgumentException(e)) { + val orderedPublicKeys = + (keyPair.pub +: config.federation.others.map(_.publicKey)).distinct + .sortBy(_.bytes.toHex) + .toVector + + config.federation.maxFaulty match { + case None => Federation(orderedPublicKeys) + case Some(maxFaulty) => Federation(orderedPublicKeys, maxFaulty) + } + } + } + + (viewState, preparedBlock) <- Resource.liftF { + storeRunner.runReadOnly { + for { + bundle <- viewStateStorage.getBundle + maybePrepared <- blockStorage.get(bundle.prepareQC.blockHash) + prepared = maybePrepared.getOrElse { + throw new IllegalStateException( + s"Cannot get the last prepared block from storage." + ) + } + } yield (bundle, prepared) + } + } + + // Start from the next view number, so we aren't in Prepare state when it was, say, PreCommit before. + protocolState = ProtocolState[CheckpointingAgreement]( + viewNumber = viewState.viewNumber.next, + phase = Phase.Prepare, + publicKey = keyPair.pub, + signingKey = keyPair.prv, + federation = federation, + prepareQC = viewState.prepareQC, + lockedQC = viewState.lockedQC, + commitQC = viewState.commitQC, + preparedBlock = preparedBlock, + timeout = config.consensus.minTimeout, + votes = Set.empty, + newViews = Map.empty + ) + + _ <- HotStuffService[Task, NS, CheckpointingAgreement]( + hotstuffNetwork, + appService, + blockStorage, + viewStateStorage, + protocolState, + consensusConfig = ConsensusService.Config( + timeoutPolicy = ConsensusService.TimeoutPolicy.exponential( + factor = config.consensus.timeoutFactor, + minTimeout = config.consensus.minTimeout, + maxTimeout = config.consensus.maxTimeout + ) + ) + ) + } yield () + } + +} + +object CheckpointingComposition extends CheckpointingComposition diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingKeyGen.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingKeyGen.scala new file mode 100644 index 00000000..a07a694c --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingKeyGen.scala @@ -0,0 +1,30 @@ +package io.iohk.metronome.checkpointing.app + +import monix.eval.Task +import io.iohk.metronome.crypto.ECKeyPair +import io.circe.Json + +/** Generate an ECDSA key pair and print it on the console. + * + * Using JSON format so that it's obvious which part is what. + * We can use `jq` to parse on the command line if necessary. + */ +object CheckpointingKeyGen { + def format(pair: ECKeyPair): String = { + val json = Json.obj( + "publicKey" -> Json.fromString(pair.pub.bytes.toHex), + "privateKey" -> Json.fromString(pair.prv.bytes.toHex) + ) + json.spaces2 + } + + def print(pair: ECKeyPair): Task[Unit] = + Task(println(format(pair))) + + def generateAndPrint: Task[Unit] = + for { + rng <- Task(new java.security.SecureRandom()) + keys = ECKeyPair.generate(rng) + _ <- print(keys) + } yield () +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingNamespaces.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingNamespaces.scala new file mode 100644 index 00000000..3c7a67c2 --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/CheckpointingNamespaces.scala @@ -0,0 +1,12 @@ +package io.iohk.metronome.checkpointing.app + +import io.iohk.metronome.rocksdb.NamespaceRegistry + +object CheckpointingNamespaces extends NamespaceRegistry { + val Block = register("block") + val BlockMeta = register("block-meta") + val BlockToChildren = register("block-to-children") + val ViewState = register("view-state") + val Ledger = register("ledger") + val LedgerMeta = register("ledger-meta") +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/codecs/CheckpointingCodecs.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/codecs/CheckpointingCodecs.scala new file mode 100644 index 00000000..6b2a2172 --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/codecs/CheckpointingCodecs.scala @@ -0,0 +1,59 @@ +package io.iohk.metronome.checkpointing.app.codecs + +import io.iohk.ethereum.rlp, rlp.RLPCodec +import io.iohk.metronome.hotstuff.service.codecs._ +import io.iohk.metronome.checkpointing.CheckpointingAgreement +import io.iohk.metronome.checkpointing.service.messages.CheckpointingMessage +import io.iohk.metronome.checkpointing.models.{Block, RLPCodecs, Ledger} +import scodec.{Codec, Attempt} +import scodec.codecs._ +import scodec.bits.ByteVector +import scala.util.Try + +object CheckpointingCodecs + extends DefaultConsensusCodecs[CheckpointingAgreement] + with DefaultProtocolCodecs[CheckpointingAgreement] + with DefaultSecp256k1Codecs[CheckpointingAgreement] + with DefaultMessageCodecs[CheckpointingAgreement] + with DefaultDuplexMessageCodecs[ + CheckpointingAgreement, + CheckpointingMessage + ] { + + import scodec.codecs.implicits._ + import RLPCodecs.{rlpBlock, rlpLedger} + + private implicit def codecFromRLPCodec[T: RLPCodec]: Codec[T] = + Codec[ByteVector].exmap( + bytes => Attempt.fromTry(Try(rlp.decode[T](bytes.toArray))), + value => Attempt.successful(ByteVector(rlp.encode(value))) + ) + + override implicit lazy val hashCodec: Codec[Block.Header.Hash] = + Codec[ByteVector].xmap(Block.Header.Hash(_), identity) + + implicit lazy val ledgerHashCodec: Codec[Ledger.Hash] = + Codec[ByteVector].xmap(Ledger.Hash(_), identity) + + override implicit lazy val blockCodec: Codec[Block] = + codecFromRLPCodec[Block] + + implicit lazy val ledgerCodec: Codec[Ledger] = + codecFromRLPCodec[Ledger] + + implicit lazy val getStateRequestCodec + : Codec[CheckpointingMessage.GetStateRequest] = + Codec.deriveLabelledGeneric + + implicit lazy val getStateResponseCodec + : Codec[CheckpointingMessage.GetStateResponse] = + Codec.deriveLabelledGeneric + + override implicit lazy val applicationMessageCodec + : Codec[CheckpointingMessage] = + discriminated[CheckpointingMessage] + .by(uint2) + .typecase(0, getStateRequestCodec) + .typecase(1, getStateResponseCodec) + +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfig.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfig.scala new file mode 100644 index 00000000..c59530d0 --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfig.scala @@ -0,0 +1,71 @@ +package io.iohk.metronome.checkpointing.app.config + +import io.iohk.metronome.crypto.{ECPublicKey, ECPrivateKey} +import java.nio.file.Path +import java.net.InetSocketAddress +import scala.concurrent.duration.FiniteDuration + +case class CheckpointingConfig( + name: String, + federation: CheckpointingConfig.Federation, + consensus: CheckpointingConfig.Consensus, + remote: CheckpointingConfig.RemoteNetwork, + local: CheckpointingConfig.LocalNetwork, + database: CheckpointingConfig.Database +) + +object CheckpointingConfig { + trait HasAddress { + def host: String + def port: Int + lazy val address = new InetSocketAddress(host, port) + } + + case class Federation( + self: LocalNode, + others: List[RemoteNode], + maxFaulty: Option[Int] + ) + + case class Consensus( + minTimeout: FiniteDuration, + maxTimeout: FiniteDuration, + timeoutFactor: Double + ) + + case class RemoteNode( + val host: String, + val port: Int, + publicKey: ECPublicKey + ) extends HasAddress + + case class LocalNode( + val host: String, + val port: Int, + privateKey: Either[ECPrivateKey, Path] + ) extends HasAddress + + case class Socket( + val host: String, + val port: Int + ) extends HasAddress + + case class RemoteNetwork( + listen: Socket, + timeout: FiniteDuration + ) + + case class LocalNetwork( + listen: Socket, + interpreter: RemoteNode, + timeout: FiniteDuration, + expectCheckpointCandidateNotifications: Boolean + ) + + case class Database( + path: Path, + stateHistorySize: Int, + blockHistorySize: Int, + pruneInterval: FiniteDuration + ) +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfigParser.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfigParser.scala new file mode 100644 index 00000000..f47a8f45 --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingConfigParser.scala @@ -0,0 +1,68 @@ +package io.iohk.metronome.checkpointing.app.config + +import com.typesafe.config.Config +import io.iohk.metronome.config.{ConfigParser, ConfigDecoders} +import io.iohk.metronome.crypto.{ECPublicKey, ECPrivateKey} +import io.circe._, io.circe.generic.semiauto._ +import java.nio.file.Path +import scodec.bits.ByteVector +import scala.util.Try + +object CheckpointingConfigParser { + def parse(root: Config): ConfigParser.Result[CheckpointingConfig] = { + ConfigParser.parse[CheckpointingConfig]( + root.getConfig("metronome.checkpointing").root(), + prefix = "METRONOME_CHECKPOINTING" + ) + } + + import ConfigDecoders._ + + def hexDecoder[T](f: ByteVector => T): Decoder[T] = + Decoder[String].emap { str => + ByteVector.fromHex(str) match { + case None => + Left("$str is not a valid hexadecimal value") + case Some(bytes) => + Try(f(bytes)).toEither.left.map(_.getMessage) + } + } + + implicit val ecPublicKeyDecoder: Decoder[ECPublicKey] = + hexDecoder(ECPublicKey(_)) + + implicit val ecPrivateKeyDecoder: Decoder[ECPrivateKey] = + hexDecoder(ECPrivateKey(_)) + + implicit val pathDecoder: Decoder[Path] = + Decoder[String].map(Path.of(_)) + + implicit val ecPrivateKeyOrPathDecoder: Decoder[Either[ECPrivateKey, Path]] = + ecPrivateKeyDecoder.map(Left(_)) or pathDecoder.map(Right(_)) + + implicit val localNodeDecoder: Decoder[CheckpointingConfig.LocalNode] = + deriveDecoder + + implicit val remoteNodeDecoder: Decoder[CheckpointingConfig.RemoteNode] = + deriveDecoder + + implicit val federationDecoder: Decoder[CheckpointingConfig.Federation] = + deriveDecoder + + implicit val consensusDecoder: Decoder[CheckpointingConfig.Consensus] = + deriveDecoder + + implicit val socketDecoder: Decoder[CheckpointingConfig.Socket] = + deriveDecoder + + implicit val remoteDecoder: Decoder[CheckpointingConfig.RemoteNetwork] = + deriveDecoder + + implicit val localDecoder: Decoder[CheckpointingConfig.LocalNetwork] = + deriveDecoder + + implicit val dbDecoder: Decoder[CheckpointingConfig.Database] = + deriveDecoder + + implicit val configDecoder: Decoder[CheckpointingConfig] = deriveDecoder +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingOptions.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingOptions.scala new file mode 100644 index 00000000..c782a42e --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/config/CheckpointingOptions.scala @@ -0,0 +1,70 @@ +package io.iohk.metronome.checkpointing.app.config + +import scopt.OParser +import ch.qos.logback.classic.Level + +case class CheckpointingOptions( + mode: CheckpointingOptions.Mode, + logLevel: Level +) + +object CheckpointingOptions { + + sealed trait Mode + case object Service extends Mode + case object KeyGen extends Mode + + private val LogLevels = List( + Level.OFF, + Level.ERROR, + Level.WARN, + Level.INFO, + Level.DEBUG, + Level.TRACE + ) + + val default = CheckpointingOptions( + mode = Service, + logLevel = Level.INFO + ) + + /** Parse the options. Return `None` if there was an error, + * which has already been printed to the console. + */ + def parse( + args: List[String] + ): Option[CheckpointingOptions] = + OParser.parse( + CheckpointingOptions.oparser, + args, + CheckpointingOptions.default + ) + + private val oparser = { + val builder = OParser.builder[CheckpointingOptions] + import builder._ + + OParser.sequence( + programName("checkpointing"), + opt[String]('l', "log-level") + .action((x, opts) => opts.copy(logLevel = Level.toLevel(x))) + .text( + s"log level; one of [${LogLevels.map(_.toString).mkString("|")}]" + ) + .optional() + .validate(x => + Either.cond( + LogLevels.map(_.toString).contains(x.toUpperCase), + (), + s"Must be between one of ${LogLevels.map(_.toString)}" + ) + ), + cmd("service") + .text("run the checkpointing service") + .action((_, opts) => opts.copy(mode = Service)), + cmd("keygen") + .text("generate an ECDSA key pair") + .action((_, opts) => opts.copy(mode = KeyGen)) + ) + } +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingConsensusTracers.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingConsensusTracers.scala new file mode 100644 index 00000000..a48a5c31 --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingConsensusTracers.scala @@ -0,0 +1,128 @@ +package io.iohk.metronome.checkpointing.app.tracing + +import monix.eval.Task +import io.iohk.metronome.checkpointing.CheckpointingAgreement +import io.iohk.metronome.checkpointing.models.Block.Header.Hash +import io.iohk.metronome.hotstuff.service.tracing.{ + ConsensusEvent, + ConsensusTracers +} +import io.iohk.metronome.logging.{HybridLog, HybridLogObject, LogTracer} +import io.circe.{Encoder, JsonObject, Json} +import io.iohk.metronome.crypto.ECPublicKey +import io.iohk.metronome.hotstuff.consensus.ViewNumber +import io.iohk.metronome.hotstuff.consensus.basic.VotingPhase + +object CheckpointingConsensusTracers { + + type CheckpointingConsensusEvent = ConsensusEvent[CheckpointingAgreement] + + implicit val consensusEventHybridLog + : HybridLog[Task, ConsensusEvent[CheckpointingAgreement]] = { + import ConsensusEvent._ + import io.circe.syntax._ + + implicit val viewNumberEncoder: Encoder[ViewNumber] = + Encoder[Long].contramap[ViewNumber](identity) + + implicit val hashEncoder: Encoder[Hash] = + Encoder[String].contramap[Hash](_.toHex) + + implicit val phaseEncoder: Encoder[VotingPhase] = + Encoder[String].contramap[VotingPhase](_.toString) + + implicit val publicKeyEncoder: Encoder[ECPublicKey] = + Encoder[String].contramap[ECPublicKey](_.bytes.toHex) + + HybridLog.instance[Task, CheckpointingConsensusEvent]( + level = { + case _: Error => HybridLogObject.Level.Error + case _: Timeout => HybridLogObject.Level.Warn + case _: Rejected[_] => HybridLogObject.Level.Warn + case _: ViewSync => HybridLogObject.Level.Info + case _: AdoptView[_] => HybridLogObject.Level.Info + case _ => HybridLogObject.Level.Debug + }, + message = _.getClass.getSimpleName, + event = { + case e: Timeout => + JsonObject( + "viewNumber" -> e.viewNumber.asJson, + "messageCounter" -> Json.obj( + "past" -> e.messageCounter.past.asJson, + "present" -> e.messageCounter.present.asJson, + "future" -> e.messageCounter.future.asJson + ) + ) + + case e: ViewSync => + JsonObject("viewNumber" -> e.viewNumber.asJson) + + case e: AdoptView[_] => + JsonObject( + "viewNumber" -> e.status.viewNumber.asJson, + "blockHash" -> e.status.commitQC.blockHash.asJson + ) + + case e: NewView => + JsonObject("viewNumber" -> e.viewNumber.asJson) + + case e: Quorum[_] => + JsonObject( + "viewNumber" -> e.quorumCertificate.viewNumber.asJson, + "phase" -> e.quorumCertificate.phase.asJson, + "blockHash" -> e.quorumCertificate.blockHash.asJson + ) + + case e: FromPast[_] => + JsonObject( + "viewNumber" -> e.message.message.viewNumber.asJson, + "messageType" -> e.message.message.getClass.getSimpleName.asJson, + "sender" -> e.message.sender.asJson + ) + + case e: FromFuture[_] => + JsonObject( + "viewNumber" -> e.message.message.viewNumber.asJson, + "messageType" -> e.message.message.getClass.getSimpleName.asJson, + "sender" -> e.message.sender.asJson + ) + + case e: Stashed[_] => + JsonObject( + "viewNumber" -> e.error.event.message.viewNumber.asJson, + "messageType" -> e.error.event.message.getClass.getSimpleName.asJson, + "sender" -> e.error.event.sender.asJson + ) + + case e: Rejected[_] => + JsonObject( + "errorType" -> e.error.getClass.getSimpleName.asJson, + "error" -> e.error.toString.asJson + ) + + case e: ExecutionSkipped[_] => + JsonObject( + "blockHash" -> e.blockHash.asJson + ) + + case e: BlockExecuted[_] => + JsonObject( + "blockHash" -> e.blockHash.asJson + ) + + case e: Error => + JsonObject( + "message" -> e.message.asJson, + "error" -> e.error.getMessage.asJson + ) + } + ) + } + + implicit val consensusEventHybridLogTracer = + LogTracer.hybrid[Task, CheckpointingConsensusEvent] + + implicit val consensusHybridLogTracers = + ConsensusTracers(consensusEventHybridLogTracer) +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingNetworkTracers.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingNetworkTracers.scala new file mode 100644 index 00000000..094ea540 --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingNetworkTracers.scala @@ -0,0 +1,73 @@ +package io.iohk.metronome.checkpointing.app.tracing + +import monix.eval.Task +import io.iohk.metronome.checkpointing.CheckpointingAgreement +import io.iohk.metronome.checkpointing.service.messages.CheckpointingMessage +import io.iohk.metronome.hotstuff.service.messages.DuplexMessage +import io.iohk.metronome.checkpointing.interpreter.messages.InterpreterMessage +import io.iohk.metronome.networking.{NetworkTracers, NetworkEvent} +import io.iohk.metronome.logging.{HybridLog, HybridLogObject, LogTracer} +import io.circe.{Encoder, JsonObject} + +trait CheckpointingNetworkTracers[M] { + type CheckpointingNetworkEvent = + NetworkEvent[CheckpointingAgreement.PKey, M] + + implicit val networkEventHybridLog + : HybridLog[Task, CheckpointingNetworkEvent] = { + import NetworkEvent._ + import io.circe.syntax._ + + implicit val keyEncoder: Encoder[CheckpointingAgreement.PKey] = + Encoder[String].contramap[CheckpointingAgreement.PKey](_.bytes.toHex) + + implicit val peerEncoder + : Encoder.AsObject[Peer[CheckpointingAgreement.PKey]] = + Encoder.AsObject.instance { case Peer(key, address) => + JsonObject( + "publicKey" -> key.asJson, + "address" -> address.toString.asJson + ) + } + + HybridLog.instance[Task, CheckpointingNetworkEvent]( + level = { + case _: ConnectionRegistered[_] => HybridLogObject.Level.Info + case _: ConnectionDeregistered[_] => HybridLogObject.Level.Info + case _ => HybridLogObject.Level.Debug + }, + message = _.getClass.getSimpleName, + event = { + case e: ConnectionUnknown[_] => e.peer.asJsonObject + case e: ConnectionRegistered[_] => e.peer.asJsonObject + case e: ConnectionDeregistered[_] => e.peer.asJsonObject + case e: ConnectionDiscarded[_] => e.peer.asJsonObject + case e: ConnectionSendError[_] => e.peer.asJsonObject + case e: ConnectionFailed[_] => + e.peer.asJsonObject.add("error", e.error.toString.asJson) + case e: ConnectionReceiveError[_] => + e.peer.asJsonObject.add("error", e.error.toString.asJson) + case e: NetworkEvent.MessageReceived[_, _] => + e.peer.asJsonObject + .add("message", e.message.toString.asJson) + case e: NetworkEvent.MessageSent[_, _] => + e.peer.asJsonObject + .add("message", e.message.toString.asJson) + } + ) + } + + implicit val networkEventHybridLogTracer = + LogTracer.hybrid[Task, CheckpointingNetworkEvent] + + implicit val networkHybridLogTracers = + NetworkTracers(networkEventHybridLogTracer) +} + +object CheckpointingLocalNetworkTracers + extends CheckpointingNetworkTracers[InterpreterMessage] + +object CheckpointingRemoteNetworkTracers + extends CheckpointingNetworkTracers[ + DuplexMessage[CheckpointingAgreement, CheckpointingMessage] + ] diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingServiceTracers.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingServiceTracers.scala new file mode 100644 index 00000000..a712cc14 --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingServiceTracers.scala @@ -0,0 +1,102 @@ +package io.iohk.metronome.checkpointing.app.tracing + +import monix.eval.Task +import io.iohk.metronome.crypto.ECPublicKey +import io.iohk.metronome.hotstuff.consensus.ViewNumber +import io.iohk.metronome.logging.{HybridLog, HybridLogObject, LogTracer} +import io.iohk.metronome.checkpointing.models.{Block, Ledger} +import io.circe.{Encoder, JsonObject} +import io.iohk.metronome.checkpointing.service.tracing.CheckpointingEvent +import io.iohk.metronome.checkpointing.models.Transaction.CheckpointCandidate + +object CheckpointingServiceTracers { + import CheckpointingEvent._ + import io.circe.syntax._ + + implicit val serviceEventHybridLog: HybridLog[Task, CheckpointingEvent] = { + implicit val blockHashEncoder: Encoder[Block.Hash] = + Encoder[String].contramap[Block.Hash](_.toHex) + + implicit val ledgerHashEncoder: Encoder[Ledger.Hash] = + Encoder[String].contramap[Ledger.Hash](_.toHex) + + implicit val publicKeyEncoder: Encoder[ECPublicKey] = + Encoder[String].contramap[ECPublicKey](_.bytes.toHex) + + implicit val maybeErrorEncoder: Encoder[Throwable] = + Encoder[String].contramap[Throwable](_.getMessage) + + implicit val viewNumberEncoder: Encoder[ViewNumber] = + Encoder[Long].contramap[ViewNumber](identity) + + HybridLog.instance[Task, CheckpointingEvent]( + level = { + case _: Error => HybridLogObject.Level.Error + case _: StateUnavailable => HybridLogObject.Level.Error + case _: InterpreterValidationFailed => HybridLogObject.Level.Warn + case _: InterpreterUnavailable => HybridLogObject.Level.Warn + case _: InterpreterTimeout => HybridLogObject.Level.Warn + case _: InterpreterResponseIgnored => HybridLogObject.Level.Warn + case _: NewCheckpointCertificate => HybridLogObject.Level.Info + case _ => HybridLogObject.Level.Debug + }, + message = _.getClass.getSimpleName, + event = { + case e: InterpreterTimeout => + JsonObject("messageType" -> e.message.getClass.getSimpleName.asJson) + case e: InterpreterUnavailable => + JsonObject("messageType" -> e.message.getClass.getSimpleName.asJson) + case e: InterpreterResponseIgnored => + JsonObject( + "messageType" -> e.message.getClass.getSimpleName.asJson, + "error" -> e.maybeError.asJson + ) + case e: NetworkTimeout => + JsonObject( + "messageType" -> e.message.getClass.getSimpleName.asJson, + "recipient" -> e.recipient.asJson + ) + case e: NetworkResponseIgnored => + JsonObject( + "messageType" -> e.message.getClass.getSimpleName.asJson, + "from" -> e.from.asJson, + "error" -> e.maybeError.asJson + ) + case e: Proposing => + JsonObject( + "blockHash" -> e.block.hash.toHex.asJson, + "isEmpty" -> e.block.body.transactions.isEmpty.asJson + ) + case e: NewState => + JsonObject( + "ledgerHash" -> e.state.hash.asJson + ) + case e: NewCheckpointCertificate => + JsonObject( + "viewNumber" -> e.certificate.commitQC.viewNumber.asJson + ) + case e: StateUnavailable => + JsonObject( + "blockHash" -> e.block.hash.asJson, + "parentHash" -> e.block.header.parentHash.asJson + ) + case e: InterpreterValidationFailed => + JsonObject( + "blockHash" -> e.block.hash.asJson, + "parentHash" -> e.block.header.parentHash.asJson, + "hasCheckpoint" -> e.block.body.transactions + .collectFirst { case CheckpointCandidate(_) => } + .isDefined + .asJson + ) + case e: Error => + JsonObject( + "error" -> e.error.getMessage.asJson + ) + } + ) + } + + implicit val serviceEventHybridLogTracer = + LogTracer.hybrid[Task, CheckpointingEvent] +} diff --git a/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingSyncTracers.scala b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingSyncTracers.scala new file mode 100644 index 00000000..2256769e --- /dev/null +++ b/metronome/checkpointing/app/src/io/iohk/metronome/checkpointing/app/tracing/CheckpointingSyncTracers.scala @@ -0,0 +1,92 @@ +package io.iohk.metronome.checkpointing.app.tracing + +import monix.eval.Task +import io.iohk.metronome.crypto.ECPublicKey +import io.iohk.metronome.hotstuff.consensus.ViewNumber +import io.iohk.metronome.hotstuff.service.tracing.{SyncEvent, SyncTracers} +import io.iohk.metronome.logging.{HybridLog, HybridLogObject, LogTracer} +import io.iohk.metronome.checkpointing.CheckpointingAgreement +import io.iohk.metronome.checkpointing.models.Block.Header.Hash +import io.circe.{Encoder, JsonObject, Json} + +object CheckpointingSyncTracers { + + type CheckpointingSyncEvent = SyncEvent[CheckpointingAgreement] + + implicit val syncEventHybridLog + : HybridLog[Task, SyncEvent[CheckpointingAgreement]] = { + import SyncEvent._ + import io.circe.syntax._ + + implicit val viewNumberEncoder: Encoder[ViewNumber] = + Encoder[Long].contramap[ViewNumber](identity) + + implicit val hashEncoder: Encoder[Hash] = + Encoder[String].contramap[Hash](_.toHex) + + implicit val publicKeyEncoder: Encoder[ECPublicKey] = + Encoder[String].contramap[ECPublicKey](_.bytes.toHex) + + HybridLog.instance[Task, CheckpointingSyncEvent]( + level = { + case _: Error => HybridLogObject.Level.Error + case _: InvalidStatus[_] => HybridLogObject.Level.Warn + case _: RequestTimeout[_] => HybridLogObject.Level.Warn + case _: ResponseIgnored[_] => HybridLogObject.Level.Warn + case _: QueueFull[_] => HybridLogObject.Level.Warn + case _: StatusPoll[_] => HybridLogObject.Level.Info + case _ => HybridLogObject.Level.Debug + }, + message = _.getClass.getSimpleName, + event = { + case e: QueueFull[_] => + JsonObject("sender" -> e.sender.asJson) + + case e: RequestTimeout[_] => + JsonObject( + "recipient" -> e.recipient.asJson, + "requestType" -> e.request.getClass.getSimpleName.asJson + ) + + case e: ResponseIgnored[_] => + JsonObject( + "sender" -> e.sender.asJson, + "responseType" -> e.response.getClass.getSimpleName.asJson + ) + + case e: StatusPoll[_] => + JsonObject( + "statuses" -> Json.arr( + e.statuses.toSeq.map { case (publicKey, status) => + Json.obj( + "publicKey" -> publicKey.asJson, + "viewNumber" -> status.viewNumber.asJson, + "commitQC" -> Json.obj( + "viewNumber" -> status.commitQC.viewNumber.asJson, + "blockHash" -> status.commitQC.blockHash.asJson + ) + ) + }: _* + ) + ) + + case e: InvalidStatus[_] => + JsonObject( + "sender" -> e.error.sender.asJson, + "hint" -> e.hint.asJson + ) + + case e: Error => + JsonObject( + "error" -> e.error.getMessage.asJson + ) + } + ) + } + + implicit val syncEventHybridLogTracer = + LogTracer.hybrid[Task, CheckpointingSyncEvent] + + implicit val syncHybridLogTracers = + SyncTracers(syncEventHybridLogTracer) +} diff --git a/metronome/checkpointing/interpreter/src/io/iohk/metronome/checkpointing/interpreter/InterpreterService.scala b/metronome/checkpointing/interpreter/src/io/iohk/metronome/checkpointing/interpreter/InterpreterService.scala index d75764e4..923faba4 100644 --- a/metronome/checkpointing/interpreter/src/io/iohk/metronome/checkpointing/interpreter/InterpreterService.scala +++ b/metronome/checkpointing/interpreter/src/io/iohk/metronome/checkpointing/interpreter/InterpreterService.scala @@ -2,11 +2,9 @@ package io.iohk.metronome.checkpointing.interpreter import cats.implicits._ import cats.effect.{Concurrent, Timer, Resource} -import io.iohk.metronome.checkpointing.CheckpointingAgreement import io.iohk.metronome.checkpointing.interpreter.messages.InterpreterMessage import io.iohk.metronome.checkpointing.interpreter.tracing.InterpreterEvent import io.iohk.metronome.checkpointing.models.Transaction -import io.iohk.metronome.networking.LocalConnectionManager import io.iohk.metronome.tracer.Tracer import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal @@ -17,13 +15,6 @@ import io.iohk.metronome.core.messages.RPCPair */ object InterpreterService { - type InterpreterConnection[F[_]] = - LocalConnectionManager[ - F, - CheckpointingAgreement.PKey, - InterpreterMessage - ] - private class ServiceImpl[F[_]: Concurrent: Timer]( localConnectionManager: InterpreterConnection[F], interpreterRpc: InterpreterRPC[F], diff --git a/metronome/checkpointing/interpreter/src/io/iohk/metronome/checkpointing/interpreter/package.scala b/metronome/checkpointing/interpreter/src/io/iohk/metronome/checkpointing/interpreter/package.scala new file mode 100644 index 00000000..29b13a34 --- /dev/null +++ b/metronome/checkpointing/interpreter/src/io/iohk/metronome/checkpointing/interpreter/package.scala @@ -0,0 +1,13 @@ +package io.iohk.metronome.checkpointing + +import io.iohk.metronome.networking.LocalConnectionManager +import io.iohk.metronome.checkpointing.interpreter.messages.InterpreterMessage + +package object interpreter { + type InterpreterConnection[F[_]] = + LocalConnectionManager[ + F, + CheckpointingAgreement.PKey, + InterpreterMessage + ] +} diff --git a/metronome/checkpointing/models/specs/src/io/iohk/metronome/checkpointing/CheckpointSigningSpec.scala b/metronome/checkpointing/models/specs/src/io/iohk/metronome/checkpointing/CheckpointSigningSpec.scala deleted file mode 100644 index 0dd90099..00000000 --- a/metronome/checkpointing/models/specs/src/io/iohk/metronome/checkpointing/CheckpointSigningSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -package io.iohk.metronome.checkpointing - -import io.iohk.metronome.crypto.ECKeyPair -import io.iohk.metronome.hotstuff.consensus.basic.{Signing, VotingPhase} -import io.iohk.metronome.hotstuff.consensus.{ - Federation, - LeaderSelection, - ViewNumber -} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -import java.security.SecureRandom - -/** A single positive case spec to test type interoperability. - * See [[io.iohk.metronome.hotstuff.consensus.basic.Secp256k1SigningProps]] for a more in-depth test - */ -class CheckpointSigningSpec extends AnyFlatSpec with Matchers { - import models.ArbitraryInstances._ - - "Checkpoint signing" should "work :)" in { - val keyPairs = IndexedSeq.fill(2)(ECKeyPair.generate(new SecureRandom)) - val federation = Federation(keyPairs.map(_.pub))(LeaderSelection.RoundRobin) - .getOrElse(throw new Exception("Could not build federation")) - - val signing = implicitly[Signing[CheckpointingAgreement]] - - val phase = sample[VotingPhase] - val viewNumber = sample[ViewNumber] - val hash = sample[CheckpointingAgreement.Hash] - - val partialSigs = - keyPairs.map(kp => signing.sign(kp.prv, phase, viewNumber, hash)) - val groupSig = signing.combine(partialSigs) - - signing.validate( - federation, - groupSig, - phase, - viewNumber, - hash - ) shouldBe true - } -} diff --git a/metronome/checkpointing/models/specs/src/io/iohk/metronome/checkpointing/models/CheckpointingSigningSpec.scala b/metronome/checkpointing/models/specs/src/io/iohk/metronome/checkpointing/models/CheckpointingSigningSpec.scala new file mode 100644 index 00000000..3b62457f --- /dev/null +++ b/metronome/checkpointing/models/specs/src/io/iohk/metronome/checkpointing/models/CheckpointingSigningSpec.scala @@ -0,0 +1,86 @@ +package io.iohk.metronome.checkpointing.models + +import io.iohk.metronome.crypto.ECKeyPair +import io.iohk.metronome.checkpointing.CheckpointingAgreement +import io.iohk.metronome.hotstuff.consensus.basic.VotingPhase +import io.iohk.metronome.hotstuff.consensus.{ + Federation, + LeaderSelection, + ViewNumber +} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.security.SecureRandom +import scodec.bits.ByteVector + +/** Simple test cases to verify type interoperability. + * + * See [[io.iohk.metronome.hotstuff.consensus.basic.Secp256k1SigningProps]] for a more in-depth test + */ +class CheckpointingSigningSpec extends AnyFlatSpec with Matchers { + import ArbitraryInstances._ + + val keyPairs = IndexedSeq.fill(2)(ECKeyPair.generate(new SecureRandom)) + val federation = Federation(keyPairs.map(_.pub))(LeaderSelection.RoundRobin) + .getOrElse(throw new Exception("Could not build federation")) + + "Checkpoint signing" should "work :)" in { + val signing = new CheckpointingSigning(Block.Header.Hash(ByteVector.empty)) + + val phase = sample[VotingPhase] + val viewNumber = sample[ViewNumber] + val hash = sample[CheckpointingAgreement.Hash] + + val partialSigs = + keyPairs.map(kp => signing.sign(kp.prv, phase, viewNumber, hash)) + + val groupSig = signing.combine(partialSigs) + + signing.validate( + federation, + groupSig, + phase, + viewNumber, + hash + ) shouldBe true + } + + it should "accept the genesis with no signatures" in { + val genesisHash = sample[CheckpointingAgreement.Hash] + val signing = new CheckpointingSigning(genesisHash) + + val phase = sample[VotingPhase] + val viewNumber = sample[ViewNumber] + val groupSig = signing.combine(Nil) + + signing.validate( + federation, + groupSig, + phase, + viewNumber, + genesisHash + ) shouldBe true + } + + it should "not accept the genesis with signatures" in { + val genesisHash = sample[CheckpointingAgreement.Hash] + val signing = new CheckpointingSigning(genesisHash) + + val phase = sample[VotingPhase] + val viewNumber = sample[ViewNumber] + + val partialSigs = + keyPairs.map(kp => signing.sign(kp.prv, phase, viewNumber, genesisHash)) + + val groupSig = signing.combine(partialSigs) + + signing.validate( + federation, + groupSig, + phase, + viewNumber, + genesisHash + ) shouldBe false + } +} diff --git a/metronome/checkpointing/models/src/io/iohk/metronome/checkpointing/CheckpointingAgreement.scala b/metronome/checkpointing/models/src/io/iohk/metronome/checkpointing/CheckpointingAgreement.scala index 6c77bf31..86ad2618 100644 --- a/metronome/checkpointing/models/src/io/iohk/metronome/checkpointing/CheckpointingAgreement.scala +++ b/metronome/checkpointing/models/src/io/iohk/metronome/checkpointing/CheckpointingAgreement.scala @@ -3,16 +3,12 @@ package io.iohk.metronome.checkpointing import io.iohk.metronome.hotstuff.consensus import io.iohk.metronome.hotstuff.consensus.basic.{ Agreement, - Secp256k1Agreement, - Signing + Secp256k1Agreement } -import scodec.bits.ByteVector -import io.iohk.ethereum.rlp -import io.iohk.metronome.checkpointing.models.RLPCodecs._ object CheckpointingAgreement extends Secp256k1Agreement { override type Block = models.Block - override type Hash = models.Block.Header.Hash + override type Hash = models.Block.Hash implicit val block: consensus.basic.Block[CheckpointingAgreement] = new consensus.basic.Block[CheckpointingAgreement] { @@ -26,13 +22,5 @@ object CheckpointingAgreement extends Secp256k1Agreement { models.Block.isValid(b) } - // TODO: Deal with genesis validation. - implicit val signing: Signing[CheckpointingAgreement] = - Signing.secp256k1((phase, viewNumber, hash) => - ByteVector( - rlp.encode(phase) ++ rlp.encode(viewNumber) ++ rlp.encode(hash) - ) - ) - type GroupSignature = Agreement.GroupSignature[CheckpointingAgreement] } diff --git a/metronome/checkpointing/models/src/io/iohk/metronome/checkpointing/models/CheckpointingSigning.scala b/metronome/checkpointing/models/src/io/iohk/metronome/checkpointing/models/CheckpointingSigning.scala new file mode 100644 index 00000000..52c264f5 --- /dev/null +++ b/metronome/checkpointing/models/src/io/iohk/metronome/checkpointing/models/CheckpointingSigning.scala @@ -0,0 +1,43 @@ +package io.iohk.metronome.checkpointing.models + +import io.iohk.ethereum.rlp +import io.iohk.metronome.hotstuff.consensus.{Federation, ViewNumber} +import io.iohk.metronome.hotstuff.consensus.basic.{ + Secp256k1Signing, + VotingPhase, + Signing +} +import io.iohk.metronome.checkpointing.CheckpointingAgreement +import io.iohk.metronome.checkpointing.models.RLPCodecs._ +import scodec.bits.ByteVector + +class CheckpointingSigning( + genesisHash: Block.Hash +) extends Secp256k1Signing[CheckpointingAgreement]((phase, viewNumber, hash) => + ByteVector( + rlp.encode(phase) ++ rlp.encode(viewNumber) ++ rlp.encode(hash) + ) + ) { + + /** Override quorum certificate validation rule so we accept the quorum + * certificate we can determinsiticially fabricate without a group signature. + */ + override def validate( + federation: Federation[CheckpointingAgreement.PKey], + signature: Signing.GroupSig[CheckpointingAgreement], + phase: VotingPhase, + viewNumber: ViewNumber, + blockHash: CheckpointingAgreement.Hash + ): Boolean = + if (blockHash == genesisHash) { + signature.sig.isEmpty + } else { + super.validate( + federation, + signature, + phase, + viewNumber, + blockHash + ) + } +} diff --git a/metronome/checkpointing/service/src/io/iohk/metronome/checkpointing/service/CheckpointingService.scala b/metronome/checkpointing/service/src/io/iohk/metronome/checkpointing/service/CheckpointingService.scala index 4cedaf50..4af15285 100644 --- a/metronome/checkpointing/service/src/io/iohk/metronome/checkpointing/service/CheckpointingService.scala +++ b/metronome/checkpointing/service/src/io/iohk/metronome/checkpointing/service/CheckpointingService.scala @@ -7,8 +7,11 @@ import cats.implicits._ import io.iohk.metronome.core.messages.{RPCSupport, RPCTracker} import io.iohk.metronome.crypto.ECPublicKey import io.iohk.metronome.checkpointing.CheckpointingAgreement -import io.iohk.metronome.checkpointing.interpreter.InterpreterService.InterpreterConnection -import io.iohk.metronome.checkpointing.interpreter.{InterpreterRPC, ServiceRPC} +import io.iohk.metronome.checkpointing.interpreter.{ + InterpreterRPC, + ServiceRPC, + InterpreterConnection +} import io.iohk.metronome.checkpointing.models.Transaction.CheckpointCandidate import io.iohk.metronome.checkpointing.models.{ Block, diff --git a/metronome/checkpointing/service/src/io/iohk/metronome/checkpointing/service/InterpreterClient.scala b/metronome/checkpointing/service/src/io/iohk/metronome/checkpointing/service/InterpreterClient.scala index b99af091..94865712 100644 --- a/metronome/checkpointing/service/src/io/iohk/metronome/checkpointing/service/InterpreterClient.scala +++ b/metronome/checkpointing/service/src/io/iohk/metronome/checkpointing/service/InterpreterClient.scala @@ -4,8 +4,11 @@ import cats.implicits._ import cats.effect.{Concurrent, Timer, Resource, Sync} import io.iohk.metronome.core.messages.{RPCTracker, RPCSupport} import io.iohk.metronome.checkpointing.interpreter.messages.InterpreterMessage -import io.iohk.metronome.checkpointing.interpreter.{ServiceRPC, InterpreterRPC} -import io.iohk.metronome.checkpointing.interpreter.InterpreterService.InterpreterConnection +import io.iohk.metronome.checkpointing.interpreter.{ + ServiceRPC, + InterpreterRPC, + InterpreterConnection +} import io.iohk.metronome.checkpointing.models.{ Block, Ledger, diff --git a/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotApp.scala b/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotApp.scala index b1d1d931..63ec2619 100644 --- a/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotApp.scala +++ b/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotApp.scala @@ -36,7 +36,7 @@ object RobotApp extends TaskApp { def setLogProperties(opts: RobotOptions): Task[Unit] = Task { // Separate log file for each node. System.setProperty("log.file.name", s"robot/logs/node-${opts.nodeIndex}") - // Not logging to the console so we can display robot position. + // Less logging to the console so we can display robot position. System.setProperty("log.console.level", s"INFO") }.void } diff --git a/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotComposition.scala b/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotComposition.scala index 89d22629..ee06ee25 100644 --- a/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotComposition.scala +++ b/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotComposition.scala @@ -27,7 +27,8 @@ import io.iohk.metronome.hotstuff.service.messages.{ } import io.iohk.metronome.hotstuff.service.storage.{ BlockStorage, - ViewStateStorage + ViewStateStorage, + BlockPruning } import io.iohk.metronome.networking.{ EncryptedConnectionProvider, @@ -132,6 +133,8 @@ trait RobotComposition { viewStateStorage <- makeViewStateStorage(genesis) stateStorage <- makeStateStorage(config, genesisState) + _ <- makeBlockPruner(config, blockStorage, viewStateStorage) + appService <- makeApplicationService( config, opts, @@ -151,8 +154,6 @@ trait RobotComposition { viewStateStorage ) - _ <- makeBlockPruner(config, blockStorage, viewStateStorage) - } yield Storages(blockStorage, viewStateStorage, stateStorage) } @@ -485,29 +486,14 @@ trait RobotComposition { viewStateStorage: ViewStateStorage[NS, RobotAgreement] )(implicit storeRunner: KVStoreRunner[Task, NS]) = Concurrent[Task].background { - val query: KVStore[NS, Unit] = for { - // Always keep the last executed block. - lastExecutedBlock <- viewStateStorage.getLastExecutedBlockHash.lift - pathFromRoot <- blockStorage.getPathFromRoot(lastExecutedBlock).lift - - // Keep the last N blocks. - pruneable = pathFromRoot.reverse - .drop(config.db.blockHistorySize) - .reverse - - // Make the last pruneable block the new root. - _ <- pruneable.lastOption match { - case Some(newRoot) => - blockStorage.pruneNonDescendants(newRoot) >> - viewStateStorage.setRootBlockHash(newRoot) - - case None => - KVStore.instance[NS].unit - } - } yield () - storeRunner - .runReadWrite(query) + .runReadWrite { + BlockPruning.prune( + blockStorage, + viewStateStorage, + config.db.blockHistorySize + ) + } .delayResult(config.db.pruneInterval) .foreverM } diff --git a/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotNamespaces.scala b/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotNamespaces.scala index 37a68d13..37e5c9f9 100644 --- a/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotNamespaces.scala +++ b/metronome/examples/src/io/iohk/metronome/examples/robot/app/RobotNamespaces.scala @@ -1,24 +1,12 @@ package io.iohk.metronome.examples.robot.app -import io.iohk.metronome.rocksdb.RocksDBStore - -object RobotNamespaces { - private var registry: Vector[RocksDBStore.Namespace] = Vector.empty - - private def register(key: String): RocksDBStore.Namespace = { - val ns = key.map(_.toByte) - require(!registry.contains(ns)) - registry = registry :+ ns - ns - } - - def all: Seq[RocksDBStore.Namespace] = registry +import io.iohk.metronome.rocksdb.NamespaceRegistry +object RobotNamespaces extends NamespaceRegistry { val Block = register("block") val BlockMeta = register("block-meta") val BlockToChildren = register("block-to-children") val ViewState = register("view-state") val State = register("state") val StateMeta = register("state-meta") - } diff --git a/metronome/examples/src/io/iohk/metronome/examples/robot/models/RobotSigning.scala b/metronome/examples/src/io/iohk/metronome/examples/robot/models/RobotSigning.scala index a503ed98..1170a9ef 100644 --- a/metronome/examples/src/io/iohk/metronome/examples/robot/models/RobotSigning.scala +++ b/metronome/examples/src/io/iohk/metronome/examples/robot/models/RobotSigning.scala @@ -5,9 +5,9 @@ import io.iohk.metronome.examples.robot.codecs.RobotCodecs import io.iohk.metronome.hotstuff.consensus.Federation import io.iohk.metronome.hotstuff.consensus.basic.{ QuorumCertificate, - Secp256k1Signing + Secp256k1Signing, + VotingPhase } -import io.iohk.metronome.hotstuff.consensus.basic.VotingPhase class RobotSigning( genesisHash: RobotAgreement.Hash @@ -19,7 +19,7 @@ class RobotSigning( ) { /** Override quorum certificate validation rule so we accept the quorum - * certificate we can determinsitcially fabricate without a group signature. + * certificate we can determinsiticially fabricate without a group signature. */ override def validate( federation: Federation[RobotAgreement.PKey], diff --git a/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/storage/BlockPruning.scala b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/storage/BlockPruning.scala new file mode 100644 index 00000000..afb2e58a --- /dev/null +++ b/metronome/hotstuff/service/src/io/iohk/metronome/hotstuff/service/storage/BlockPruning.scala @@ -0,0 +1,36 @@ +package io.iohk.metronome.hotstuff.service.storage + +import io.iohk.metronome.hotstuff.consensus.basic.Agreement +import io.iohk.metronome.storage.KVStore + +import cats.implicits._ + +object BlockPruning { + + /** Prune blocks which are not descendants of the N-th ancestor of the last executed block. */ + def prune[N, A <: Agreement]( + blockStorage: BlockStorage[N, A], + viewStateStorage: ViewStateStorage[N, A], + blockHistorySize: Int + ): KVStore[N, Unit] = { + for { + // Always keep the last executed block. + lastExecutedBlock <- viewStateStorage.getLastExecutedBlockHash.lift + pathFromRoot <- blockStorage.getPathFromRoot(lastExecutedBlock).lift + + // Everything but the last N blocks in the chain leading up to the + // last executed block can be pruned. We do so by making the Nth + // ancestor of the last executed block the new root of the tree. + maybeNewRoot = pathFromRoot.reverse.lift(blockHistorySize - 1) + + _ <- maybeNewRoot match { + case Some(newRoot) => + blockStorage.pruneNonDescendants(newRoot) >> + viewStateStorage.setRootBlockHash(newRoot) + + case None => + KVStore.instance[N].unit + } + } yield () + } +} diff --git a/metronome/networking/src/io/iohk/metronome/networking/RemoteConnectionManager.scala b/metronome/networking/src/io/iohk/metronome/networking/RemoteConnectionManager.scala index b5a35e46..fb2cd806 100644 --- a/metronome/networking/src/io/iohk/metronome/networking/RemoteConnectionManager.scala +++ b/metronome/networking/src/io/iohk/metronome/networking/RemoteConnectionManager.scala @@ -22,6 +22,14 @@ trait RemoteConnectionManager[F[_], K, M] { def getLocalPeerInfo: (K, InetSocketAddress) def getAcquiredConnections: F[Set[K]] def incomingMessages: Iterant[F, MessageReceived[K, M]] + + /** Send message to a peer, if they are connected. + * + * If the recipient is the same as the local peer, the message is immediately + * delivered to self, so it's important that each recipient has a separate + * identifier, e.g. we don't try to use the same public key locally on two + * different ports which are supposed to talk to each other. + */ def sendMessage( recipient: K, message: M diff --git a/metronome/rocksdb/src/io/iohk/metronome/rocksdb/NamespaceRegistry.scala b/metronome/rocksdb/src/io/iohk/metronome/rocksdb/NamespaceRegistry.scala new file mode 100644 index 00000000..067c5a94 --- /dev/null +++ b/metronome/rocksdb/src/io/iohk/metronome/rocksdb/NamespaceRegistry.scala @@ -0,0 +1,14 @@ +package io.iohk.metronome.rocksdb + +trait NamespaceRegistry { + private var registry: Vector[RocksDBStore.Namespace] = Vector.empty + + protected def register(key: String): RocksDBStore.Namespace = { + val ns = key.map(_.toByte) + require(!registry.contains(ns)) + registry = registry :+ ns + ns + } + + def all: Seq[RocksDBStore.Namespace] = registry +}