Skip to content

Commit

Permalink
Brendan's review
Browse files Browse the repository at this point in the history
  • Loading branch information
berewt committed Jan 12, 2024
1 parent c575959 commit 9c213e6
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
Expand Up @@ -175,6 +175,7 @@ data CommonOptions = CommonOptions
, optionsRetryConfig :: !RetryConfig
-- ^ set up retry configuration when the node socket is unavailable
, batchSizeConfig :: !Word64
-- ^ Size of the batches sent to the indexers
}
deriving stock (Show, Generic)
deriving anyclass (FromJSON, ToJSON)
Expand Down Expand Up @@ -366,7 +367,7 @@ commonBatchSizeParser =
$ Opt.long "batch-size"
<> Opt.metavar "INT"
<> Opt.value 3000
<> Opt.help "Number of blocks send as a batch to the indexers"
<> Opt.help "Number of blocks sent as a batch to the indexers"
<> Opt.showDefault

{- | Parse the addresses to index. Addresses should be given in Bech32 format
Expand Down
Expand Up @@ -96,8 +96,8 @@ mkEpochNonceIndexer path = do
( epochNo INT NOT NULL
, nonce BLOB NOT NULL
, blockNo INT NOT NULL
, slotNo INT
, blockHeaderHash BLOB
, slotNo INT NOT NULL
, blockHeaderHash BLOB NOT NULL
)|]
nonceInsertQuery =
[sql|INSERT INTO epoch_nonce
Expand Down
Expand Up @@ -5,6 +5,7 @@
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE UndecidableInstances #-}
Expand All @@ -27,7 +28,6 @@ module Marconi.Cardano.Indexers.ExtLedgerStateCoordinator (
ExtLedgerStateEvent (..),
ExtLedgerConfig,
ExtLedgerStateCoordinator (ExtLedgerStateCoordinator),
ExtLedgerStateCoordinatorConfig (ExtLedgerStateCoordinatorConfig),
ExtLedgerStateWorkerConfig (..),
EpochMetadata,

Expand Down Expand Up @@ -108,11 +108,6 @@ data ExtLedgerStateEvent = ExtLedgerStateEvent

type instance Core.Point ExtLedgerStateEvent = C.ChainPoint

newtype ExtLedgerStateCoordinatorConfig = ExtLedgerStateCoordinatorConfig
{ _configGenesisConfig :: C.GenesisConfig
-- ^ used to bootstrap the ledger state
}

-- | Metadata used to create 'ExtLedgerStateEventIndexer' filenames
data EpochMetadata = EpochMetadata
{ metadataBlockNo :: Maybe C.BlockNo
Expand All @@ -131,9 +126,9 @@ Lens.makeLenses ''ExtLedgerStateCoordinator

data ExtLedgerStatePersistConfig = ExtLedgerStatePersistConfig
{ _blocksToNextSnapshot :: Word64
-- ^ Number of block before the next snapshot
-- ^ Number of block before the next snapshot (must be at least 1)
, _snapshotInterval :: Word64
-- ^ Track the previous snapshot, used for resuming
-- ^ Track the previous snapshot, used for resuming (can't start at 0)
, _ledgerStateIndexer :: ExtLedgerStateFileIndexer
}

Expand All @@ -142,19 +137,20 @@ Lens.makeLenses ''ExtLedgerStatePersistConfig
-- | The state maintained by the preprocessor of the indexer to decide how to handle incoming events
data PreprocessorState output input = PreprocessorState
{ _volatileLedgerStates :: Seq (Core.Timed C.ChainPoint ExtLedgerStateEvent)
-- ^ The last computed ledger state
-- ^ storage for volatile ledger states
, _lastLedgerState :: Core.Timed C.ChainPoint ExtLedgerStateEvent
-- ^ The last computed ledger state
, _currentLength :: Word64
-- ^ stored ledger state
, _maxLength :: Word64
-- ^ The number of volatile ledger states that are stored
, _securityParam :: Word64
-- ^ how many of them do we keep in memory
, _persistConfig :: ExtLedgerStatePersistConfig
-- ^ the indexer that saves snapshots
}

Lens.makeLenses ''PreprocessorState

{- | Used to maintain a list of `maxLength` ledger states in memory.
{- | Used to maintain a list of `securityParam` ledger states in memory.
When full, the oldest stored ledger state is pushed out of the queue
and we check if it should be saved (we save every `snapshotInterval + 1` time)
-}
Expand Down Expand Up @@ -190,7 +186,7 @@ updateLatestLedgerState isVolatile' event =
pure $ guard (timeToNextSnapshot == 0) $> eventToSave
-- If we're already indexing volatile event
xs Seq.:|> oldestLedgerState -> do
max' <- Lens.use maxLength
max' <- Lens.use securityParam
current <- Lens.use currentLength
let bufferIsFull = max' <= current
if bufferIsFull
Expand Down Expand Up @@ -312,7 +308,7 @@ saveLastStable = do
Seq.Empty ->
Just <$> Lens.use lastLedgerState
_xs Seq.:|> oldestLedgerState -> do
max' <- Lens.use maxLength
max' <- Lens.use securityParam
current <- Lens.use currentLength
let bufferIsFull = max' <= current
if bufferIsFull
Expand Down Expand Up @@ -366,7 +362,7 @@ extLedgerStateWorker config workers path = do
liftIO $ createDirectoryIfMissing True rootDir
ledgerStateIndexer' <- buildExtLedgerStateEventIndexer configCodec rootDir
let snapshotInterval' = workerSnapshotInterval config
persistConfig' = ExtLedgerStatePersistConfig snapshotInterval' snapshotInterval' ledgerStateIndexer'
persistConfig' = mkExtLedgerStatePersistConfig snapshotInterval' ledgerStateIndexer'
coordinator' <-
Core.withTrace indexerEventLogger
<$> mkExtLedgerStateCoordinator workers
Expand All @@ -383,6 +379,15 @@ extLedgerStateWorker config workers path = do
Core.WorkerIndexer workerState $
Core.Worker indexerName workerState eventPreprocessing id

{- | Smart constructor for ExtLedgerStatePersistConfig, ensure that the time to next snapshot is at
least one block (saving each ledger state)
-}
mkExtLedgerStatePersistConfig
:: Word64 -> Core.FileIndexer EpochMetadata ExtLedgerStateEvent -> ExtLedgerStatePersistConfig
mkExtLedgerStatePersistConfig snapshotInterval' =
let curatedSnapshotInterval = max 1 snapshotInterval'
in ExtLedgerStatePersistConfig curatedSnapshotInterval curatedSnapshotInterval

instance
( MonadIO m
, MonadError Core.IndexerError m
Expand Down
4 changes: 2 additions & 2 deletions marconi-core/src/Marconi/Core.hs
Expand Up @@ -612,9 +612,9 @@ import Marconi.Core.Indexer.SQLiteAggregateQuery (
import Marconi.Core.Indexer.SQLiteIndexer (
GetLastStablePointQuery (GetLastStablePointQuery, getLastStablePointQuery),
InsertPointQuery (InsertPointQuery),
SQLiteDBLocation (..),
SQLInsertPlan (..),
SQLRollbackPlan (..),
SQLiteDBLocation (..),
SQLiteIndexer (..),
SetLastStablePointQuery (SetLastStablePointQuery, getSetLastStablePointQuery),
ToRow (..),
Expand All @@ -626,7 +626,7 @@ import Marconi.Core.Indexer.SQLiteIndexer (
mkSqliteIndexer,
parseDBLocation,
querySQLiteIndexerWith,
querySyncedOnlySQLiteIndexerWith
querySyncedOnlySQLiteIndexerWith,
)
import Marconi.Core.Preprocessor (
Preprocessor,
Expand Down

0 comments on commit 9c213e6

Please sign in to comment.