Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster ledger state #287

Merged
merged 10 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Gen.Marconi.ChainIndex.Legacy.Types (
genChainPoints,
genChainPoint,
genChainPoint',
genExecutionUnits,
CGen.genExecutionUnits,
genSlotNo,
genTxBodyContentWithTxInsCollateral,
genTxBodyContentForPlutusScripts,
Expand Down Expand Up @@ -280,7 +280,7 @@ genWitnessAndHashInEra era = do
C.ScriptWitness C.ScriptWitnessForSpending <$> case script of
C.PlutusScript version plutusScript -> do
scriptData <- CGen.genHashableScriptData
executionUnits <- genExecutionUnits
executionUnits <- CGen.genExecutionUnits
pure $
C.PlutusScriptWitness
scriptLanguageInEra
Expand All @@ -293,15 +293,6 @@ genWitnessAndHashInEra era = do
pure $ C.SimpleScriptWitness scriptLanguageInEra (C.SScript simpleScript)
pure (witness, C.hashScript script)

{- | TODO Copy-paste from cardano-node: cardano-api/gen/Gen/Cardano/Api/Typed.hs
Copied from cardano-api. Delete when this function is reexported
-}
genExecutionUnits :: Gen C.ExecutionUnits
genExecutionUnits =
C.ExecutionUnits
<$> Gen.integral (Range.constant 0 1000)
<*> Gen.integral (Range.constant 0 1000)

genTxOutTxContext :: C.CardanoEra era -> Gen (C.TxOut C.CtxTx era)
genTxOutTxContext era =
C.TxOut
Expand Down Expand Up @@ -446,8 +437,8 @@ genProtocolParametersForPlutusScripts =
]
)
<*> (Just <$> genExecutionUnitPrices)
<*> (Just <$> genExecutionUnits)
<*> (Just <$> genExecutionUnits)
<*> (Just <$> CGen.genExecutionUnits)
<*> (Just <$> CGen.genExecutionUnits)
<*> (Just <$> genNat)
<*> (Just <$> genNat)
<*> (Just <$> genNat)
Expand Down
13 changes: 13 additions & 0 deletions marconi-cardano-chain-index/src/Marconi/Cardano/ChainIndex/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ data CommonOptions = CommonOptions
-- ^ The starting point of the indexers
, optionsRetryConfig :: !RetryConfig
-- ^ set up retry configuration when the node socket is unavailable
, batchSizeConfig :: !Word64
berewt marked this conversation as resolved.
Show resolved Hide resolved
-- ^ Size of the batches sent to the indexers
}
deriving stock (Show, Generic)
deriving anyclass (FromJSON, ToJSON)
Expand Down Expand Up @@ -236,6 +238,7 @@ commonOptionsParser =
<*> commonNetworkIdParser
<*> commonStartFromParser
<*> commonRetryConfigParser
<*> commonBatchSizeParser

optionsParser :: Opt.Parser Options
optionsParser =
Expand Down Expand Up @@ -357,6 +360,16 @@ commonPortParser =
<> Opt.help "JSON-RPC http port number"
<> Opt.showDefault

commonBatchSizeParser :: Opt.Parser Word64
commonBatchSizeParser =
Opt.option
Opt.auto
$ Opt.long "batch-size"
<> Opt.metavar "INT"
<> Opt.value 3000
<> Opt.help "Number of blocks sent as a batch to the indexers"
<> Opt.showDefault

{- | Parse the addresses to index. Addresses should be given in Bech32 format
Several addresses can be given in a single string, if they are separated by a space
-}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import Control.Lens qualified as Lens
import Control.Monad.Cont (MonadIO)
import Control.Monad.Except (ExceptT, MonadError, MonadTrans (lift))
import Data.List.NonEmpty (NonEmpty)
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Data.Text qualified as Text
import Data.Traversable (for)
Expand Down Expand Up @@ -67,8 +66,6 @@ import Marconi.Cardano.Indexers.Spent qualified as Spent
import Marconi.Cardano.Indexers.Utxo qualified as Utxo
import Marconi.Cardano.Indexers.UtxoQuery qualified as UtxoQuery
import Marconi.Core qualified as Core
import Marconi.Core.Indexer.SQLiteIndexer qualified as Core
import Marconi.Core.Preprocessor qualified as Core
import System.FilePath ((</>))

-- Point instances used only in this module
Expand All @@ -91,6 +88,13 @@ type CurrentSyncPointIndexer =
type EpochNonceIndexer = Core.WithTrace IO Core.SQLiteIndexer Nonce.EpochNonce
type EpochSDDIndexer = Core.WithTrace IO Core.SQLiteIndexer (NonEmpty SDD.EpochSDD)

data EpochEvent = EpochEvent
{ epochNo :: C.EpochNo
, epochSDD :: Maybe (NonEmpty SDD.EpochSDD)
, epochNonce :: Maybe Nonce.EpochNonce
}
type instance Core.Point EpochEvent = C.ChainPoint

-- | Container for all the queryable indexers.
data MarconiCardanoQueryables = MarconiCardanoQueryables
{ _queryableEpochNonce :: !(MVar EpochNonceIndexer)
Expand All @@ -110,7 +114,7 @@ buildIndexers
-> Core.CatchupConfig
-> Utxo.UtxoIndexerConfig
-> MintTokenEvent.MintTokenEventConfig
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig IO (WithDistance BlockEvent)
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig EpochEvent (WithDistance BlockEvent)
-> BM.Trace IO Text
-> MarconiTrace IO
-> FilePath
Expand All @@ -126,7 +130,7 @@ buildIndexers
catchupConfig
utxoConfig
mintEventConfig
epochStateConfig
ledgerStateConfig
textLogger
prettyLogger
path = do
Expand All @@ -135,20 +139,19 @@ buildIndexers
blockEventTextLogger = BM.appendName "blockEvent" textLogger
blockEventLogger = BM.appendName "blockEvent" mainLogger
txBodyCoordinatorLogger = BM.appendName "txBody" blockEventTextLogger
epochStateTextLogger = BM.appendName "epochState" blockEventTextLogger
epochSDDTextLogger = BM.appendName "epochSDD" epochStateTextLogger
epochNonceTextLogger = BM.appendName "epochNonce" epochStateTextLogger
ledgerStateTextLogger = BM.appendName "ledgerState" blockEventTextLogger

StandardWorker blockInfoMVar blockInfoWorker <-
BlockInfo.blockInfoBuilder securityParam catchupConfig blockEventTextLogger path

Core.WorkerIndexer epochSDDMVar epochSDDWorker <-
epochSDDBuilder securityParam catchupConfig epochSDDTextLogger path
epochSDDBuilder securityParam catchupConfig ledgerStateTextLogger path
Core.WorkerIndexer epochNonceMVar epochNonceWorker <-
epochNonceBuilder securityParam catchupConfig epochNonceTextLogger path
Core.WorkerIndexer _epochStateMVar epochStateWorker <-
epochNonceBuilder securityParam catchupConfig ledgerStateTextLogger path
Core.WorkerIndexer _ledgerStateMVar ledgerStateWorker <-
ExtLedgerStateCoordinator.extLedgerStateWorker
epochStateConfig
ledgerStateConfig
ledgerStateTextLogger
[epochSDDWorker, epochNonceWorker]
path

Expand Down Expand Up @@ -189,7 +192,7 @@ buildIndexers
lift $
buildBlockEventCoordinator
blockEventLogger
[blockInfoWorker, epochStateWorker, coordinatorTxBodyWorkers]
[blockInfoWorker, ledgerStateWorker, coordinatorTxBodyWorkers]

Core.WorkerIndexer chainTipMVar chainTipWorker <-
ChainTip.chainTipBuilder mainLogger path
Expand Down Expand Up @@ -251,7 +254,7 @@ epochNonceBuilder
-> n
( Core.WorkerIndexer
m
(ExtLedgerStateCoordinator.ExtLedgerStateEvent, WithDistance BlockEvent)
EpochEvent
Nonce.EpochNonce
(Core.WithTrace m Core.SQLiteIndexer)
)
Expand All @@ -263,12 +266,11 @@ epochNonceBuilder securityParam catchupConfig textLogger path =
indexerName
securityParam
catchupConfig
(pure . Nonce.getEpochNonce . fst)
(pure . epochNonce)
(BM.appendName indexerName indexerEventLogger)
getEpochNo (ExtLedgerStateCoordinator.ExtLedgerStateEvent ledgerState _) = Nonce.getEpochNo ledgerState
in Nonce.epochNonceWorker
epochNonceWorkerConfig
(Nonce.EpochNonceWorkerConfig $ fromMaybe 0 . getEpochNo . fst)
(Nonce.EpochNonceWorkerConfig epochNo)
(Core.parseDBLocation (path </> "epochNonce.db"))

-- | Configure and start the @EpochSDD@ indexer
Expand All @@ -281,7 +283,7 @@ epochSDDBuilder
-> n
( Core.WorkerIndexer
m
(ExtLedgerStateCoordinator.ExtLedgerStateEvent, WithDistance BlockEvent)
EpochEvent
(NonEmpty SDD.EpochSDD)
(Core.WithTrace m Core.SQLiteIndexer)
)
Expand All @@ -293,12 +295,11 @@ epochSDDBuilder securityParam catchupConfig textLogger path =
indexerName
securityParam
catchupConfig
(pure . SDD.getEpochSDD . fst)
(pure . epochSDD)
(BM.appendName indexerName indexerEventLogger)
getEpochNo (ExtLedgerStateCoordinator.ExtLedgerStateEvent ledgerState _) = Nonce.getEpochNo ledgerState
in SDD.epochSDDWorker
epochSDDWorkerConfig
(SDD.EpochSDDWorkerConfig $ fromMaybe 0 . getEpochNo . fst)
(SDD.EpochSDDWorkerConfig epochNo)
(Core.parseDBLocation (path </> "epochSDD.db"))

-- | Configure and start the @SnapshotBlockEvent@ indexer
Expand Down Expand Up @@ -346,7 +347,9 @@ extractSnapshotBlockEvent =
buildIndexersForSnapshot
:: SecurityParam
-> Core.CatchupConfig
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig IO (WithDistance BlockEvent)
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig
(ExtLedgerStateEvent, WithDistance BlockEvent)
(WithDistance BlockEvent)
-> BM.Trace IO Text
-> MarconiTrace IO
-> FilePath
Expand All @@ -359,7 +362,7 @@ buildIndexersForSnapshot
buildIndexersForSnapshot
securityParam
catchupConfig
epochStateConfig
ledgerStateConfig
textLogger
prettyLogger
path
Expand All @@ -369,6 +372,7 @@ buildIndexersForSnapshot
mainLogger = BM.contramap (fmap (fmap $ Text.pack . show)) textLogger
blockEventTextLogger = BM.appendName "blockEvent" textLogger
blockEventLogger = BM.appendName "blockEvent" mainLogger
ledgerStateTextLogger = BM.appendName "ledgerState" blockEventTextLogger
snapshotBlockEventTextLogger = BM.appendName "snapshotBlockEvent" blockEventTextLogger
snapshotExtLedgerStateTextLogger = BM.appendName "snapshotBlockEvent" blockEventTextLogger

Expand All @@ -390,11 +394,12 @@ buildIndexersForSnapshot
(path </> show no)
blockRange'
nodeConfig
return [snapshotExtLedgerStateWorker, snapshotBlockEventWorker]
pure [snapshotExtLedgerStateWorker, snapshotBlockEventWorker]

Core.WorkerIndexer _epochStateMVar snapshotWorker <-
Core.WorkerIndexer _ledgerStateMVar snapshotWorker <-
ExtLedgerStateCoordinator.extLedgerStateWorker
epochStateConfig
ledgerStateConfig
ledgerStateTextLogger
(concat snapshotWorkers)
path

Expand Down Expand Up @@ -439,16 +444,14 @@ snapshotExtLedgerStateEventBuilder securityParam catchupConfig textLogger path b
(pure . Just . fst)
(BM.appendName indexerName indexerEventLogger)
in snapshotExtLedgerStateEventWorker
securityParam
standardWorkerConfig
(SnapshotWorkerConfig (ExtLedgerStateCoordinator.blockNo . fst) blockRange' nodeConfig)
path

snapshotExtLedgerStateEventWorker
:: forall input m n
. (MonadIO m, MonadError Core.IndexerError m, MonadIO n)
=> SecurityParam
-> StandardWorkerConfig n input ExtLedgerStateEvent
=> StandardWorkerConfig n input ExtLedgerStateEvent
-> SnapshotWorkerConfig input
-> FilePath
-> m
Expand All @@ -458,11 +461,11 @@ snapshotExtLedgerStateEventWorker
ExtLedgerStateEvent
(Core.WithTrace n (Core.FileIndexer EpochMetadata))
)
snapshotExtLedgerStateEventWorker securityParam standardWorkerConfig snapshotBlockEventWorkerConfig path = do
snapshotExtLedgerStateEventWorker standardWorkerConfig snapshotBlockEventWorkerConfig path = do
codecConfig <- getConfigCodec (SnapshotBlockEvent.nodeConfig snapshotBlockEventWorkerConfig)
indexer <-
Core.withTrace (logger standardWorkerConfig)
<$> buildExtLedgerStateEventIndexer codecConfig securityParam path
<$> buildExtLedgerStateEventIndexer codecConfig path
let preprocessor =
Core.traverseMaybeEvent (lift . eventExtractor standardWorkerConfig)
<<< justBeforeBlockRangePreprocessor
Expand Down
26 changes: 21 additions & 5 deletions marconi-cardano-chain-index/src/Marconi/Cardano/ChainIndex/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Cardano.BM.Trace (logError, logInfo)
import Cardano.BM.Tracing qualified as BM
import Control.Concurrent.Async (race_)
import Control.Exception (finally)
import Control.Monad (unless)
import Control.Monad (guard, unless)
import Control.Monad.Except (runExceptT)
import Control.Monad.Reader (runReaderT)
import Data.Aeson (toJSON)
Expand Down Expand Up @@ -55,6 +55,10 @@ import System.Posix.Signals (
)
import qualified Marconi.Cardano.Indexers.ExtLedgerStateCoordinator as ExtLedgerState
import qualified Marconi.Cardano.Core.Extract.WithDistance as Distance
import qualified Marconi.Cardano.Indexers.EpochSDD as EpochSDD
import qualified Marconi.Cardano.Indexers.EpochNonce as EpochNonce
import qualified Marconi.Cardano.ChainIndex.Indexers as Indexers
import Marconi.Cardano.Indexers.ExtLedgerStateCoordinator (ExtLedgerStateEvent(extLedgerState))
#endif

run :: Text -> IO ()
Expand All @@ -74,8 +78,8 @@ run appName = withGracefulTermination_ $ do

createDirectoryIfMissing True (Cli.optionsDbPath o)

let batchSize = 5000
volatileEpochStateSnapshotInterval = 1000
let batchSize = Cli.batchSizeConfig $ Cli.commonOptions o
epochStateSnapshotInterval = 100000
filteredAddresses = shelleyAddressesToAddressAny $ Cli.optionsTargetAddresses o
filteredAssetIds = Cli.optionsTargetAssets o
includeScript = not $ Cli.optionsDisableScript o
Expand Down Expand Up @@ -117,6 +121,13 @@ run appName = withGracefulTermination_ $ do
Utils.querySecurityParam @Void networkId socketPath

let SecurityParam stopCatchupDistance = securityParam
extLedgerStateAsEvent previousLedgerStateEvent ledgerStateEvent _blockEvent = do
previousEpochNo <- ExtLedgerState.getEpochNo $ extLedgerState previousLedgerStateEvent
epochNo <- ExtLedgerState.getEpochNo $ extLedgerState ledgerStateEvent
guard $ epochNo /= previousEpochNo
let sdd = EpochSDD.getEpochSDD ledgerStateEvent
nonce = EpochNonce.getEpochNonce ledgerStateEvent
pure $ Indexers.EpochEvent epochNo sdd nonce

mindexers <-
runExceptT $
Expand All @@ -127,10 +138,11 @@ run appName = withGracefulTermination_ $ do
(MintTokenEvent.MintTokenEventConfig filteredAssetIds)
( ExtLedgerState.ExtLedgerStateWorkerConfig
Distance.getEvent
trace
Distance.chainDistance
nodeConfigPath
volatileEpochStateSnapshotInterval
epochStateSnapshotInterval
securityParam
extLedgerStateAsEvent
)
trace
marconiTrace
Expand Down Expand Up @@ -177,6 +189,10 @@ shelleyAddressesToAddressAny Nothing = []
shelleyAddressesToAddressAny (Just targetAddresses) =
fmap C.AddressShelley $ NEList.toList $ NESet.toList targetAddresses

getBlockNo :: C.BlockInMode C.CardanoMode -> C.BlockNo
getBlockNo (C.BlockInMode block _eraInMode) =
case C.getBlockHeader block of C.BlockHeader _ _ b -> b

getStartingPoint :: Cli.StartingPoint -> C.ChainPoint -> C.ChainPoint
getStartingPoint preferredStartingPoint indexerLastSyncPoint =
case preferredStartingPoint of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ run = do
let extLedgerStateConfig =
ExtLedgerStateWorkerConfig
Distance.getEvent
trace
Distance.chainDistance
nodeConfigPath
volatileEpochStateSnapshotInterval
securityParam
(const $ curry Just)
snapshotConfig =
RunIndexerConfig
marconiTrace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Control.Monad.Trans (lift)
import Data.List.NonEmpty (NonEmpty)
import Data.Text (Text)
import Data.Text qualified as Text
import Marconi.Cardano.ChainIndex.Indexers (EpochEvent)
import Marconi.Cardano.ChainIndex.Indexers qualified as Indexers
import Marconi.Cardano.Core.Extract.WithDistance (WithDistance)
import Marconi.Cardano.Core.Indexer.Worker (
Expand Down Expand Up @@ -144,7 +145,7 @@ buildIndexers
-> Core.CatchupConfig
-> Utxo.UtxoIndexerConfig
-> MintTokenEvent.MintTokenEventConfig
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig IO (WithDistance BlockEvent)
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig EpochEvent (WithDistance BlockEvent)
-> BM.Trace IO Text
-> MarconiTrace IO
-> FilePath
Expand Down Expand Up @@ -180,6 +181,7 @@ buildIndexers
Core.WorkerIndexer _epochStateMVar epochStateWorker <-
ExtLedgerStateCoordinator.extLedgerStateWorker
epochStateConfig
epochStateTextLogger
[epochSDDWorker, epochNonceWorker]
path

Expand Down
Loading