Skip to content

Commit

Permalink
reduce cyclic errors seperate out Ledger & Api
Browse files Browse the repository at this point in the history
  • Loading branch information
Cmdv committed Jun 5, 2023
1 parent fb3d0d9 commit 3200bd8
Show file tree
Hide file tree
Showing 22 changed files with 296 additions and 229 deletions.
6 changes: 4 additions & 2 deletions cardano-db-sync/cardano-db-sync.cabal
Expand Up @@ -43,6 +43,7 @@ library
exposed-modules: Cardano.DbSync

Cardano.DbSync.Api
Cardano.DbSync.Api.Types
Cardano.DbSync.Config
Cardano.DbSync.Config.Alonzo
Cardano.DbSync.Config.Byron
Expand Down Expand Up @@ -95,8 +96,9 @@ library

Cardano.DbSync.Era.Util

Cardano.DbSync.LedgerEvent
Cardano.DbSync.LedgerState
Cardano.DbSync.Ledger.Event
Cardano.DbSync.Ledger.State
Cardano.DbSync.Ledger.Types

Cardano.DbSync.Metrics

Expand Down
7 changes: 4 additions & 3 deletions cardano-db-sync/src/Cardano/DbSync.hs
Expand Up @@ -30,6 +30,7 @@ import Cardano.BM.Trace (Trace, logError, logInfo, logWarning)
import Cardano.Db (textShow)
import qualified Cardano.Db as Db
import Cardano.DbSync.Api
import Cardano.DbSync.Api.Types (InsertOptions (..), SyncOptions (..))
import Cardano.DbSync.Config (configureLogging)
import Cardano.DbSync.Config.Types (
ConfigFile (..),
Expand Down Expand Up @@ -101,12 +102,12 @@ runDbSync metricsSetters knownMigrations iomgr trce params aop = do
logInfo trce msg
when (mode `elem` [Db.Indexes, Db.Full]) $ logWarning trce indexesMsg
Db.runMigrations pgConfig True dbMigrationDir (Just $ Db.LogFileDir "/tmp") mode
(ranAll, unofficial) <- if enpForceIndexes params then runMigration Db.Full else runMigration Db.Initial
(ranMigrations, unofficial) <- if enpForceIndexes params then runMigration Db.Full else runMigration Db.Initial
unless (null unofficial) $
logWarning trce $
"Unofficial migration scripts found: " <> textShow unofficial

if ranAll
if ranMigrations
then logInfo trce "All migrations were executed"
else logInfo trce "Some migrations were not executed. They need to run when syncing has started."

Expand All @@ -119,7 +120,7 @@ runDbSync metricsSetters knownMigrations iomgr trce params aop = do
-- For testing and debugging.
whenJust (enpMaybeRollback params) $ \slotNo ->
void $ unsafeRollback trce pgConfig slotNo
runSyncNode metricsSetters trce iomgr connectionString ranAll (void . runMigration) params syncOpts
runSyncNode metricsSetters trce iomgr connectionString ranMigrations (void . runMigration) params syncOpts
where
dbMigrationDir :: Db.MigrationDir
dbMigrationDir = enpMigrationDir params
Expand Down
122 changes: 20 additions & 102 deletions cardano-db-sync/src/Cardano/DbSync/Api.hs
Expand Up @@ -8,13 +8,6 @@
{-# LANGUAGE FlexibleContexts #-}

module Cardano.DbSync.Api (
SyncEnv (..),
LedgerEnv (..),
SyncOptions (..),
InsertOptions (..),
ConsistentLevel (..),
RunMigration,
FixesRan (..),
fullInsertOptions,
defaultInsertOptions,
turboInsertOptions,
Expand Down Expand Up @@ -58,12 +51,13 @@ import Cardano.BM.Trace (Trace, logInfo, logWarning)
import qualified Cardano.Chain.Genesis as Byron
import Cardano.Crypto.ProtocolMagic (ProtocolMagicId (..))
import qualified Cardano.Db as DB
import Cardano.DbSync.Api.Types
import Cardano.DbSync.Cache
import Cardano.DbSync.Config.Cardano
import Cardano.DbSync.Config.Shelley
import Cardano.DbSync.Config.Types
import Cardano.DbSync.Error
import Cardano.DbSync.LedgerState
import Cardano.DbSync.Ledger.State (HasLedgerEnv (..), LedgerEvent (..), LedgerStateFile (..), SnapshotPoint (..), getHeaderHash, hashToAnnotation, listKnownSnapshots, mkHasLedgerEnv)
import Cardano.DbSync.LocalStateQuery
import Cardano.DbSync.Types
import Cardano.DbSync.Util
Expand All @@ -72,17 +66,15 @@ import qualified Cardano.Ledger.Shelley.Genesis as Shelley
import Cardano.Prelude
import Cardano.Slotting.Slot (EpochNo (..), SlotNo (..), WithOrigin (..))
import Control.Concurrent.Class.MonadSTM.Strict (
StrictTVar,
newTBQueueIO,
newTVarIO,
readTVar,
readTVarIO,
writeTVar,
)
import Control.Concurrent.Class.MonadSTM.Strict.TBQueue (StrictTBQueue)
import Control.Monad.Trans.Maybe (MaybeT (..))
import qualified Data.Strict.Maybe as Strict
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Time.Clock (getCurrentTime)
import Database.Persist.Postgresql (ConnectionString)
import Database.Persist.Sql (SqlBackend)
import Ouroboros.Consensus.Block.Abstract (HeaderHash, Point (..), fromRawHash, BlockProtocol)
Expand All @@ -95,45 +87,6 @@ import Ouroboros.Network.Magic (NetworkMagic (..))
import qualified Ouroboros.Network.Point as Point
import Ouroboros.Consensus.Protocol.Abstract (ConsensusProtocol)

data SyncEnv = SyncEnv
{ envProtocol :: !SyncProtocol
, envNetworkMagic :: !NetworkMagic
, envSystemStart :: !SystemStart
, envConnString :: ConnectionString
, envRunDelayedMigration :: RunMigration
, envBackend :: !(StrictTVar IO (Strict.Maybe SqlBackend))
, envConsistentLevel :: !(StrictTVar IO ConsistentLevel)
, envIsFixed :: !(StrictTVar IO FixesRan)
, envIndexes :: !(StrictTVar IO Bool)
, envOptions :: !SyncOptions
, envCache :: !Cache
, envExtraMigrations :: !(StrictTVar IO ExtraMigrations)
, envOfflineWorkQueue :: !(StrictTBQueue IO PoolFetchRetry)
, envOfflineResultQueue :: !(StrictTBQueue IO FetchResult)
, envEpochState :: !(StrictTVar IO EpochState)
, envEpochSyncTime :: !(StrictTVar IO UTCTime)
, envLedgerEnv :: !LedgerEnv
}

-- A representation of if we are using a ledger or not given CLI options
data LedgerEnv where
HasLedger :: HasLedgerEnv -> LedgerEnv
NoLedger :: NoLedgerEnv -> LedgerEnv

type RunMigration = DB.MigrationToRun -> IO ()

data FixesRan = NoneFixRan | DataFixRan | AllFixRan

data ConsistentLevel = Consistent | DBAheadOfLedger | Unchecked
deriving (Show, Eq)

data ExtraMigrations = ExtraMigrations
{ emRan :: Bool
, emConsume :: Bool
, emPrune :: Bool
}
deriving (Show)

setConsistentLevel :: SyncEnv -> ConsistentLevel -> IO ()
setConsistentLevel env cst = do
logInfo (getTrace env) $ "Setting ConsistencyLevel to " <> textShow cst
Expand Down Expand Up @@ -230,24 +183,6 @@ getPrunes env = do
extraMigr <- liftIO $ readTVarIO $ envExtraMigrations env
pure $ emPrune extraMigr

data SyncOptions = SyncOptions
{ soptExtended :: !Bool
, soptAbortOnInvalid :: !Bool
, soptCache :: !Bool
, soptSkipFix :: !Bool
, soptOnlyFix :: !Bool
, soptInsertOptions :: !InsertOptions
, snapshotEveryFollowing :: !Word64
, snapshotEveryLagging :: !Word64
}

data InsertOptions = InsertOptions
{ ioMultiAssets :: !Bool
, ioMetadata :: !Bool
, ioPlutusExtra :: !Bool
, ioOfflineData :: !Bool
}

fullInsertOptions :: InsertOptions
fullInsertOptions = InsertOptions True True True True

Expand All @@ -261,11 +196,6 @@ replaceConnection :: SyncEnv -> SqlBackend -> IO ()
replaceConnection env sqlBackend = do
atomically $ writeTVar (envBackend env) $ Strict.Just sqlBackend

data EpochState = EpochState
{ esInitialized :: !Bool
, esEpochNo :: !(Strict.Maybe EpochNo)
}

initEpochState :: EpochState
initEpochState =
EpochState
Expand Down Expand Up @@ -391,27 +321,23 @@ mkSyncEnv ::
Ledger.Network ->
NetworkMagic ->
SystemStart ->
Maybe LedgerStateDir ->
Bool -> -- shouldUseLedger
Bool ->
Bool ->
Bool ->
SyncNodeParams ->
Bool ->
RunMigration ->
IO SyncEnv
mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedgerDir shouldUseLedger ranAll forcedIndexes toConsume toPrune runMigration = do
mkSyncEnv trce connString syncOptions protoInfo nw nwMagic systemStart syncNodeParams ranMigrations runMigrationFnc = do
cache <- if soptCache syncOptions then newEmptyCache 250000 50000 else pure uninitiatedCache
backendVar <- newTVarIO Strict.Nothing
consistentLevelVar <- newTVarIO Unchecked
fixDataVar <- newTVarIO $ if ranAll then DataFixRan else NoneFixRan
indexesVar <- newTVarIO forcedIndexes
extraMigrVar <- newTVarIO $ initExtraMigrations toConsume toPrune
fixDataVar <- newTVarIO $ if ranMigrations then DataFixRan else NoneFixRan
indexesVar <- newTVarIO $ enpForceIndexes syncNodeParams
extraMigrVar <- newTVarIO $ initExtraMigrations (enpMigrateConsumed syncNodeParams) (enpPruneTxOut syncNodeParams)
owq <- newTBQueueIO 100
orq <- newTBQueueIO 100
epochVar <- newTVarIO initEpochState
epochSyncTime <- newTVarIO =<< getCurrentTime
ledgerEnvType <-
case (maybeLedgerDir, shouldUseLedger) of
case (enpMaybeLedgerStateDir syncNodeParams, enpShouldUseLedger syncNodeParams) of
(Just dir, True) ->
HasLedger
<$> mkHasLedgerEnv
Expand All @@ -420,9 +346,7 @@ mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedge
dir
nw
systemStart
(soptAbortOnInvalid syncOptions)
(snapshotEveryFollowing syncOptions)
(snapshotEveryLagging syncOptions)
syncOptions
(Nothing, False) -> NoLedger <$> mkNoLedgerEnv trce protoInfo nw systemStart
(Just _, False) -> do
logWarning trce $
Expand All @@ -437,8 +361,8 @@ mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedge
{ envProtocol = SyncProtocolCardano
, envNetworkMagic = nwMagic
, envSystemStart = systemStart
, envConnString = connSring
, envRunDelayedMigration = runMigration
, envConnString = connString
, envRunDelayedMigration = runMigrationFnc
, envBackend = backendVar
, envOptions = syncOptions
, envConsistentLevel = consistentLevelVar
Expand All @@ -457,16 +381,14 @@ mkSyncEnvFromConfig ::
Trace IO Text ->
ConnectionString ->
SyncOptions ->
Maybe LedgerStateDir ->
Bool -> -- shouldUseLedger
GenesisConfig ->
SyncNodeParams ->
-- | migrations were ran on startup
Bool ->
Bool ->
Bool ->
Bool ->
-- | run migration function
RunMigration ->
IO (Either SyncNodeError SyncEnv)
mkSyncEnvFromConfig trce connSring syncOptions maybeLedgerDir shouldUseLedger genCfg ranAll forcedIndexes toConsume toPrune runMigration =
mkSyncEnvFromConfig trce connString syncOptions genCfg syncNodeParams ranMigration runMigrationFnc =
case genCfg of
GenesisCardano _ bCfg sCfg _
| unProtocolMagicId (Byron.configProtocolMagicId bCfg) /= Shelley.sgNetworkMagic (scConfig sCfg) ->
Expand All @@ -489,19 +411,15 @@ mkSyncEnvFromConfig trce connSring syncOptions maybeLedgerDir shouldUseLedger ge
Right
<$> mkSyncEnv
trce
connSring
connString
syncOptions
(mkProtocolInfoCardano genCfg [])
(Shelley.sgNetworkId $ scConfig sCfg)
(NetworkMagic . unProtocolMagicId $ Byron.configProtocolMagicId bCfg)
(SystemStart . Byron.gdStartTime $ Byron.configGenesisData bCfg)
maybeLedgerDir
shouldUseLedger
ranAll
forcedIndexes
toConsume
toPrune
runMigration
syncNodeParams
ranMigration
runMigrationFnc

-- | 'True' is for in memory points and 'False' for on disk
getLatestPoints :: SyncEnv -> IO [(CardanoPoint, Bool)]
Expand Down
96 changes: 96 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Api/Types.hs
@@ -0,0 +1,96 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE NoImplicitPrelude #-}

module Cardano.DbSync.Api.Types (
SyncEnv (..),
SyncOptions (..),
InsertOptions (..),
LedgerEnv (..),
RunMigration,
FixesRan (..),
ConsistentLevel (..),
ExtraMigrations (..),
EpochState (..),
) where

import qualified Cardano.Db as DB
import Cardano.DbSync.Cache (Cache)
import Cardano.DbSync.Config.Types (SyncProtocol)
import Cardano.DbSync.Ledger.Types (HasLedgerEnv)
import Cardano.DbSync.LocalStateQuery (NoLedgerEnv)
import Cardano.DbSync.Types (FetchResult, PoolFetchRetry)
import Cardano.Prelude (Bool, Eq, IO, Show, Word64)
import Cardano.Slotting.Slot (EpochNo (..))
import Control.Concurrent.Class.MonadSTM.Strict (
StrictTVar,
)
import Control.Concurrent.Class.MonadSTM.Strict.TBQueue (StrictTBQueue)
import qualified Data.Strict.Maybe as Strict
import Data.Time.Clock (UTCTime)
import Database.Persist.Postgresql (ConnectionString)
import Database.Persist.Sql (SqlBackend)
import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..))
import Ouroboros.Network.Magic (NetworkMagic (..))

data SyncEnv = SyncEnv
{ envProtocol :: !SyncProtocol
, envNetworkMagic :: !NetworkMagic
, envSystemStart :: !SystemStart
, envConnString :: ConnectionString
, envRunDelayedMigration :: RunMigration
, envBackend :: !(StrictTVar IO (Strict.Maybe SqlBackend))
, envConsistentLevel :: !(StrictTVar IO ConsistentLevel)
, envIsFixed :: !(StrictTVar IO FixesRan)
, envIndexes :: !(StrictTVar IO Bool)
, envOptions :: !SyncOptions
, envCache :: !Cache
, envExtraMigrations :: !(StrictTVar IO ExtraMigrations)
, envOfflineWorkQueue :: !(StrictTBQueue IO PoolFetchRetry)
, envOfflineResultQueue :: !(StrictTBQueue IO FetchResult)
, envEpochState :: !(StrictTVar IO EpochState)
, envEpochSyncTime :: !(StrictTVar IO UTCTime)
, envLedgerEnv :: !LedgerEnv
}

data SyncOptions = SyncOptions
{ soptExtended :: !Bool
, soptAbortOnInvalid :: !Bool
, soptCache :: !Bool
, soptSkipFix :: !Bool
, soptOnlyFix :: !Bool
, soptInsertOptions :: !InsertOptions
, snapshotEveryFollowing :: !Word64
, snapshotEveryLagging :: !Word64
}

data InsertOptions = InsertOptions
{ ioMultiAssets :: !Bool
, ioMetadata :: !Bool
, ioPlutusExtra :: !Bool
, ioOfflineData :: !Bool
}

-- A representation of if we are using a ledger or not given CLI options
data LedgerEnv where
HasLedger :: HasLedgerEnv -> LedgerEnv
NoLedger :: NoLedgerEnv -> LedgerEnv

type RunMigration = DB.MigrationToRun -> IO ()

data FixesRan = NoneFixRan | DataFixRan | AllFixRan

data ConsistentLevel = Consistent | DBAheadOfLedger | Unchecked
deriving (Show, Eq)

data ExtraMigrations = ExtraMigrations
{ emRan :: Bool
, emConsume :: Bool
, emPrune :: Bool
}
deriving (Show)

data EpochState = EpochState
{ esInitialized :: !Bool
, esEpochNo :: !(Strict.Maybe EpochNo)
}
3 changes: 2 additions & 1 deletion cardano-db-sync/src/Cardano/DbSync/Database.hs
Expand Up @@ -18,7 +18,7 @@ import Cardano.DbSync.Api
import Cardano.DbSync.DbAction
import Cardano.DbSync.Default
import Cardano.DbSync.Error
import Cardano.DbSync.LedgerState
import Cardano.DbSync.Ledger.State
import Cardano.DbSync.Metrics
import Cardano.DbSync.Rollback
import Cardano.DbSync.Types
Expand All @@ -32,6 +32,7 @@ import Ouroboros.Consensus.HeaderValidation hiding (TipInfo)
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Network.Block (Point (..))
import Ouroboros.Network.Point (blockPointHash, blockPointSlot)
import Cardano.DbSync.Api.Types (SyncEnv (..), LedgerEnv (..), ConsistentLevel (..))

data NextState
= Continue
Expand Down

0 comments on commit 3200bd8

Please sign in to comment.