Skip to content

Commit

Permalink
Introduce consumed-tx-out and prune-tx-out flags which control extra …
Browse files Browse the repository at this point in the history
…migrations
  • Loading branch information
kderme committed May 30, 2023
1 parent f498e35 commit 6b07707
Show file tree
Hide file tree
Showing 16 changed files with 247 additions and 68 deletions.
2 changes: 2 additions & 0 deletions cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs
Expand Up @@ -234,6 +234,8 @@ mkSyncNodeParams staticDir mutableDir = do
, enpHasOfflineData = True
, enpTurboMode = False
, enpFullMode = True
, enpMigrateConsumed = False
, enpPruneTxOut = False
, enpSnEveryFollowing = 35
, enpSnEveryLagging = 35
, enpMaybeRollback = Nothing
Expand Down
24 changes: 24 additions & 0 deletions cardano-db-sync/app/cardano-db-sync.hs
Expand Up @@ -81,6 +81,8 @@ pRunDbSyncNode =
<*> pHasOfflineData
<*> pTurboMode
<*> pFullMode
<*> pMigrateConsumed
<*> pPruneTxOut
<*> pure 500
<*> pure 10000
<*> optional pSlotNo
Expand Down Expand Up @@ -256,6 +258,28 @@ pFullMode =
<> Opt.help "Makes db-sync run with all possible functionalities."
)

pMigrateConsumed :: Parser Bool
pMigrateConsumed =
Opt.flag
False
True
( Opt.long "consumed-tx-out"
<> Opt.help "Runs the tx_out migration, which adds a new field.If this is set once,\
\ then it must be always be set on following executions of db-sync, unless prune-tx-out\
\ is used instead."
)

pPruneTxOut :: Parser Bool
pPruneTxOut =
Opt.flag
False
True
( Opt.long "prune-tx-out"
<> Opt.help "Prunes the consumed tx_out periodically. This assumes \
\ consumed-tx-out is also set, even if it's not. If this is set once,\
\ then it must be always be set on following executions of db-sync."
)

pVersionCommand :: Parser SyncCommand
pVersionCommand =
asum
Expand Down
74 changes: 72 additions & 2 deletions cardano-db-sync/src/Cardano/DbSync/Api.hs
Expand Up @@ -27,6 +27,13 @@ module Cardano.DbSync.Api (
setIsFixedAndMigrate,
getRanIndexes,
runIndexMigrations,
runExtraMigrationsMaybe,
getSafeBlockNoDiff,
getPruneInterval,
whenConsumeTxOut,
whenPruneTxOut,
getHasConsumed,
getPrunes,
mkSyncEnvFromConfig,
replaceConnection,
verifySnapshotPoint,
Expand Down Expand Up @@ -98,6 +105,7 @@ data SyncEnv = SyncEnv
, envIndexes :: !(StrictTVar IO Bool)
, envOptions :: !SyncOptions
, envCache :: !Cache
, envExtraMigrations :: !(StrictTVar IO ExtraMigrations)
, envOfflineWorkQueue :: !(StrictTBQueue IO PoolFetchRetry)
, envOfflineResultQueue :: !(StrictTBQueue IO FetchResult)
, envEpochState :: !(StrictTVar IO EpochState)
Expand All @@ -117,6 +125,12 @@ data FixesRan = NoneFixRan | DataFixRan | AllFixRan
data ConsistentLevel = Consistent | DBAheadOfLedger | Unchecked
deriving (Show, Eq)

data ExtraMigrations = ExtraMigrations
{ emRan :: Bool
, emConsume :: Bool
, emPrune :: Bool
} deriving Show

setConsistentLevel :: SyncEnv -> ConsistentLevel -> IO ()
setConsistentLevel env cst = do
logInfo (getTrace env) $ "Setting ConsistencyLevel to " <> textShow cst
Expand Down Expand Up @@ -165,6 +179,54 @@ runIndexMigrations env = do
logInfo (getTrace env) "Indexes were created"
atomically $ writeTVar (envIndexes env) True

initExtraMigrations :: Bool -> Bool -> ExtraMigrations
initExtraMigrations cons prne =
ExtraMigrations
{ emRan = False
, emConsume = cons || prne
, emPrune = prne
}

runExtraMigrationsMaybe :: SyncEnv -> IO ()
runExtraMigrationsMaybe env = do
extraMigr <- liftIO $ readTVarIO $ envExtraMigrations env
logInfo (getTrace env) $ textShow extraMigr
unless (emRan extraMigr) $ do
backend <- getBackend env
DB.runDbIohkNoLogging backend $
DB.runExtraMigrations
(getTrace env)
(getSafeBlockNoDiff env)
(emConsume extraMigr)
(emPrune extraMigr)
liftIO $ atomically $ writeTVar (envExtraMigrations env) (extraMigr {emRan = True})

getSafeBlockNoDiff :: SyncEnv -> Word64
getSafeBlockNoDiff _ = 2 * 2160

getPruneInterval :: SyncEnv -> Word64
getPruneInterval _ = 10 * 2160

whenConsumeTxOut :: MonadIO m => SyncEnv -> m () -> m ()
whenConsumeTxOut env action = do
extraMigr <- liftIO $ readTVarIO $ envExtraMigrations env
when (emConsume extraMigr) action

whenPruneTxOut :: MonadIO m => SyncEnv -> m () -> m ()
whenPruneTxOut env action = do
extraMigr <- liftIO $ readTVarIO $ envExtraMigrations env
when (emPrune extraMigr) action

getHasConsumed :: SyncEnv -> IO Bool
getHasConsumed env = do
extraMigr <- liftIO $ readTVarIO $ envExtraMigrations env
pure $ emConsume extraMigr

getPrunes :: SyncEnv -> IO Bool
getPrunes env = do
extraMigr <- liftIO $ readTVarIO $ envExtraMigrations env
pure $ emPrune extraMigr

data SyncOptions = SyncOptions
{ soptExtended :: !Bool
, soptAbortOnInvalid :: !Bool
Expand Down Expand Up @@ -330,14 +392,17 @@ mkSyncEnv ::
Bool -> -- shouldUseLedger
Bool ->
Bool ->
Bool ->
Bool ->
RunMigration ->
IO SyncEnv
mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedgerDir shouldUseLedger ranAll forcedIndexes runMigration = do
mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedgerDir shouldUseLedger ranAll forcedIndexes toConsume toPrune runMigration = do
cache <- if soptCache syncOptions then newEmptyCache 250000 50000 else pure uninitiatedCache
backendVar <- newTVarIO Strict.Nothing
consistentLevelVar <- newTVarIO Unchecked
fixDataVar <- newTVarIO $ if ranAll then DataFixRan else NoneFixRan
indexesVar <- newTVarIO forcedIndexes
extraMigrVar <- newTVarIO $ initExtraMigrations toConsume toPrune
owq <- newTBQueueIO 100
orq <- newTBQueueIO 100
epochVar <- newTVarIO initEpochState
Expand Down Expand Up @@ -377,6 +442,7 @@ mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedge
, envIsFixed = fixDataVar
, envIndexes = indexesVar
, envCache = cache
, envExtraMigrations = extraMigrVar
, envOfflineWorkQueue = owq
, envOfflineResultQueue = orq
, envEpochState = epochVar
Expand All @@ -393,9 +459,11 @@ mkSyncEnvFromConfig ::
GenesisConfig ->
Bool ->
Bool ->
Bool ->
Bool ->
RunMigration ->
IO (Either SyncNodeError SyncEnv)
mkSyncEnvFromConfig trce connSring syncOptions maybeLedgerDir shouldUseLedger genCfg ranAll forcedIndexes runMigration =
mkSyncEnvFromConfig trce connSring syncOptions maybeLedgerDir shouldUseLedger genCfg ranAll forcedIndexes toConsume toPrune runMigration =
case genCfg of
GenesisCardano _ bCfg sCfg _
| unProtocolMagicId (Byron.configProtocolMagicId bCfg) /= Shelley.sgNetworkMagic (scConfig sCfg) ->
Expand Down Expand Up @@ -428,6 +496,8 @@ mkSyncEnvFromConfig trce connSring syncOptions maybeLedgerDir shouldUseLedger ge
shouldUseLedger
ranAll
forcedIndexes
toConsume
toPrune
runMigration

-- | 'True' is for in memory points and 'False' for on disk
Expand Down
2 changes: 2 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Config/Types.hs
Expand Up @@ -71,6 +71,8 @@ data SyncNodeParams = SyncNodeParams
, enpHasOfflineData :: !Bool
, enpTurboMode :: !Bool
, enpFullMode :: !Bool
, enpMigrateConsumed :: !Bool
, enpPruneTxOut :: !Bool
, enpSnEveryFollowing :: !Word64
, enpSnEveryLagging :: !Word64
, enpMaybeRollback :: !(Maybe SlotNo)
Expand Down
8 changes: 7 additions & 1 deletion cardano-db-sync/src/Cardano/DbSync/Default.hs
Expand Up @@ -46,7 +46,7 @@ import Database.Persist.SqlBackend.Internal
import Database.Persist.SqlBackend.Internal.StatementCache
import Ouroboros.Consensus.Cardano.Block (HardForkBlock (..))
import qualified Ouroboros.Consensus.HardFork.Combinator as Consensus
import Ouroboros.Network.Block (blockHash, blockNo, getHeaderFields)
import Ouroboros.Network.Block (blockHash, blockNo, getHeaderFields, headerFieldBlockNo, unBlockNo)

insertListBlocks ::
SyncEnv ->
Expand Down Expand Up @@ -82,6 +82,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do
, ". Time to restore consistency."
]
rollbackFromBlockNo syncEnv (blockNo cblk)
liftIO $ putStrLn ("block" :: Text)
insertBlock syncEnv cblk applyRes True tookSnapshot
liftIO $ setConsistentLevel syncEnv Consistent
Right blockId | Just (adaPots, slotNo, epochNo) <- getAdaPots applyRes -> do
Expand Down Expand Up @@ -167,6 +168,9 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
Generic.fromBabbageBlock (ioPlutusExtra iopts) (getPrices applyResult) blk
BlockConway _blk -> panic "TODO: Conway 1"
insertEpoch details
whenPruneTxOut syncEnv $
when (unBlockNo blkNo `mod` getPruneInterval syncEnv == 0) $ do
lift $ DB.deleteConsumedTxOut tracer (getSafeBlockNoDiff syncEnv)
lift $ commitOrIndexes withinTwoMin withinHalfHour
where
tracer = getTrace syncEnv
Expand Down Expand Up @@ -203,6 +207,8 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
isWithinHalfHour :: SlotDetails -> Bool
isWithinHalfHour sd = isSyncedWithinSeconds sd 1800 == SyncFollowing

blkNo = headerFieldBlockNo $ getHeaderFields cblk

-- -------------------------------------------------------------------------------------------------

insertLedgerEvents ::
Expand Down
12 changes: 5 additions & 7 deletions cardano-db-sync/src/Cardano/DbSync/Era.hs
Expand Up @@ -8,24 +8,22 @@ module Cardano.DbSync.Era (
insertValidateGenesisDist,
) where

import Cardano.BM.Data.Trace (Trace)
import Cardano.DbSync.Config
import qualified Cardano.DbSync.Era.Byron.Genesis as Byron
import qualified Cardano.DbSync.Era.Shelley.Genesis as Shelley
import Cardano.DbSync.Era.Shelley.Offline as X
import Cardano.DbSync.Error
import Cardano.Prelude
import Database.Persist.Sql (SqlBackend)
import Cardano.DbSync.Api

insertValidateGenesisDist ::
Trace IO Text ->
SqlBackend ->
SyncEnv ->
NetworkName ->
GenesisConfig ->
Bool ->
ExceptT SyncNodeError IO ()
insertValidateGenesisDist trce backend nname genCfg shelleyInitiation =
insertValidateGenesisDist syncEnv nname genCfg shelleyInitiation =
case genCfg of
GenesisCardano _ bCfg sCfg _aCfg -> do
Byron.insertValidateGenesisDist backend trce nname bCfg
Shelley.insertValidateGenesisDist backend trce (unNetworkName nname) (scConfig sCfg) shelleyInitiation
Byron.insertValidateGenesisDist syncEnv nname bCfg
Shelley.insertValidateGenesisDist syncEnv (unNetworkName nname) (scConfig sCfg) shelleyInitiation
34 changes: 20 additions & 14 deletions cardano-db-sync/src/Cardano/DbSync/Era/Byron/Genesis.hs
Expand Up @@ -31,27 +31,32 @@ import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import Database.Persist.Sql (SqlBackend)
import Paths_cardano_db_sync (version)
import Cardano.DbSync.Api

-- | Idempotent insert the initial Genesis distribution transactions into the DB.
-- If these transactions are already in the DB, they are validated.
insertValidateGenesisDist ::
SqlBackend ->
Trace IO Text ->
SyncEnv ->
NetworkName ->
Byron.Config ->
ExceptT SyncNodeError IO ()
insertValidateGenesisDist backend tracer (NetworkName networkName) cfg = do
insertValidateGenesisDist syncEnv (NetworkName networkName) cfg = do
-- Setting this to True will log all 'Persistent' operations which is great
-- for debugging, but otherwise *way* too chatty.
backend <- liftIO $ getBackend syncEnv
hasConsumed <- liftIO $ getHasConsumed syncEnv
prunes <- liftIO $ getPrunes syncEnv
if False
then newExceptT $ DB.runDbIohkLogging backend tracer insertAction
else newExceptT $ DB.runDbIohkNoLogging backend insertAction
then newExceptT $ DB.runDbIohkLogging backend tracer (insertAction hasConsumed prunes)
else newExceptT $ DB.runDbIohkNoLogging backend (insertAction hasConsumed prunes)
where
insertAction :: (MonadBaseControl IO m, MonadIO m) => ReaderT SqlBackend m (Either SyncNodeError ())
insertAction = do
tracer = getTrace syncEnv

insertAction :: Bool -> Bool -> (MonadBaseControl IO m, MonadIO m) => ReaderT SqlBackend m (Either SyncNodeError ())
insertAction hasConsumed prunes = do
ebid <- DB.queryBlockId (configGenesisHash cfg)
case ebid of
Right bid -> validateGenesisDistribution tracer networkName cfg bid
Right bid -> validateGenesisDistribution prunes tracer networkName cfg bid
Left _ ->
runExceptT $ do
liftIO $ logInfo tracer "Inserting Byron Genesis distribution"
Expand Down Expand Up @@ -99,7 +104,7 @@ insertValidateGenesisDist backend tracer (NetworkName networkName) cfg = do
, DB.blockOpCert = Nothing
, DB.blockOpCertCounter = Nothing
}
lift $ mapM_ (insertTxOuts bid) $ genesisTxos cfg
lift $ mapM_ (insertTxOuts hasConsumed bid) $ genesisTxos cfg
liftIO . logInfo tracer $
"Initial genesis distribution populated. Hash "
<> renderByteArray (configGenesisHash cfg)
Expand All @@ -110,12 +115,13 @@ insertValidateGenesisDist backend tracer (NetworkName networkName) cfg = do
-- | Validate that the initial Genesis distribution in the DB matches the Genesis data.
validateGenesisDistribution ::
(MonadBaseControl IO m, MonadIO m) =>
Bool ->
Trace IO Text ->
Text ->
Byron.Config ->
DB.BlockId ->
ReaderT SqlBackend m (Either SyncNodeError ())
validateGenesisDistribution tracer networkName cfg bid =
validateGenesisDistribution prunes tracer networkName cfg bid =
runExceptT $ do
meta <- liftLookupFail "validateGenesisDistribution" DB.queryMeta

Expand Down Expand Up @@ -151,7 +157,7 @@ validateGenesisDistribution tracer networkName cfg bid =
case DB.word64ToAda <$> configGenesisSupply cfg of
Left err -> dbSyncNodeError $ "validateGenesisDistribution: " <> textShow err
Right expectedSupply ->
when (expectedSupply /= totalSupply) $
when (expectedSupply /= totalSupply && not prunes) $
dbSyncNodeError $
Text.concat
[ "validateGenesisDistribution: Expected total supply to be "
Expand All @@ -165,8 +171,8 @@ validateGenesisDistribution tracer networkName cfg bid =

-- -----------------------------------------------------------------------------

insertTxOuts :: (MonadBaseControl IO m, MonadIO m) => DB.BlockId -> (Byron.Address, Byron.Lovelace) -> ReaderT SqlBackend m ()
insertTxOuts blkId (address, value) = do
insertTxOuts :: (MonadBaseControl IO m, MonadIO m) => Bool -> DB.BlockId -> (Byron.Address, Byron.Lovelace) -> ReaderT SqlBackend m ()
insertTxOuts hasConsumed blkId (address, value) = do
-- Each address/value pair of the initial coin distribution comes from an artifical transaction
-- with a hash generated by hashing the address.
txId <-
Expand All @@ -184,7 +190,7 @@ insertTxOuts blkId (address, value) = do
, DB.txValidContract = True
, DB.txScriptSize = 0
}
void . DB.insertTxOutPlex False $
void . DB.insertTxOutPlex hasConsumed $
DB.TxOut
{ DB.txOutTxId = txId
, DB.txOutIndex = 0
Expand Down

0 comments on commit 6b07707

Please sign in to comment.