Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikd committed Apr 8, 2021
1 parent 49c73a2 commit da810b0
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 83 deletions.
33 changes: 24 additions & 9 deletions cardano-db-sync/src/Cardano/DbSync.hs
Expand Up @@ -33,9 +33,11 @@ import Cardano.BM.Trace (Trace)
import qualified Cardano.Db as DB

import Cardano.DbSync.Era (insertValidateGenesisDist)
import Cardano.DbSync.Era.Shelley.Insert.Epoch (runEpochUpdateThread)
import Cardano.DbSync.Plugin.Default (defDbSyncNodePlugin)
import Cardano.DbSync.Rollback (unsafeRollback)
import Cardano.Sync.Database (runDbThread)
import Cardano.Sync.LedgerState (EpochUpdateControl (..), mkEpochUpdateControl)

import Cardano.Sync (Block (..), MetricSetters, SyncDataLayer (..), SyncNodePlugin (..),
configureLogging, runSyncNode)
Expand All @@ -46,7 +48,7 @@ import Cardano.Sync.Tracing.ToObjectOrphans ()

import Control.Monad.Extra (whenJust)

import Database.Persist.Postgresql (withPostgresqlConn)
import Database.Persist.Postgresql (ConnectionString, withPostgresqlConn)

import Database.Persist.Sql (SqlBackend)

Expand All @@ -63,22 +65,35 @@ runDbSyncNode metricsSetters mkPlugin params = do

trce <- configureLogging params "db-sync-node"

let connectionString = DB.toConnectionString pgConfig
-- This is a convoluted mess because `cardano-db` and `cardano-sync` should
-- be in the same package. The split is only needed because SMASH uses
-- the `cardano-sync` part of `db-sync` as a library. Once that is no longer
-- the case, these two packages will be reunited and the code cleaned up.

DB.runIohkLogging trce $ withPostgresqlConn connectionString $ \backend ->
lift $ do
-- For testing and debugging.
whenJust (enpMaybeRollback params) $ \ slotNo ->
void $ unsafeRollback trce slotNo
let connectString = DB.toConnectionString pgConfig

runSyncNode (mkSyncDataLayer trce backend) metricsSetters trce (mkPlugin backend)
params (insertValidateGenesisDist backend) runDbThread
epochUpdate <- mkEpochUpdateControl

race_
(runDBInsertThread trce connectString epochUpdate)
(runEpochUpdateThread trce epochUpdate)
where
-- This is only necessary because `cardano-db` and `cardano-sync` both define
-- this newtype, but the later does not depend on the former.
dbMigrationDir :: DB.MigrationDir
dbMigrationDir = DB.MigrationDir $ unMigrationDir (enpMigrationDir params)

runDBInsertThread :: Trace IO Text -> ConnectionString -> EpochUpdateControl -> IO ()
runDBInsertThread trce connectString euc =
DB.runIohkLogging trce $ withPostgresqlConn connectString $ \backend ->
lift $ do
-- For testing and debugging.
whenJust (enpMaybeRollback params) $ \ slotNo ->
void $ unsafeRollback trce slotNo

runSyncNode trce (mkSyncDataLayer trce backend) euc metricsSetters
(mkPlugin backend) params (insertValidateGenesisDist backend) runDbThread

-- -------------------------------------------------------------------------------------------------

-- The base @DataLayer@.
Expand Down
97 changes: 52 additions & 45 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Epoch.hs
@@ -1,8 +1,9 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}

module Cardano.DbSync.Era.Shelley.Insert.Epoch
( epochUpdateThread
( runEpochUpdateThread
) where

import Cardano.Prelude
Expand All @@ -17,7 +18,6 @@ import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
-- import Cardano.DbSync.Era.Shelley.Generic.ParamProposal
-- import Cardano.DbSync.Era.Shelley.Query

import Cardano.Sync.Api
import Cardano.Sync.LedgerState
-- import Cardano.Sync.Types
import Cardano.Sync.Util
Expand All @@ -27,6 +27,8 @@ import Cardano.Slotting.Slot (EpochNo (..))

import Control.Concurrent.STM.TMVar (putTMVar, takeTMVar)
import Control.Concurrent.STM.TVar (writeTVar)
-- import Control.Monad.Trans.Control (MonadBaseControl)

-- import Control.Monad.Extra (whenJust)
-- import Control.Monad.Logger (LoggingT)
-- import Control.Monad.Trans.Control (MonadBaseControl)
Expand Down Expand Up @@ -59,49 +61,54 @@ import Database.Persist.Sql (SqlBackend)
-- be inserted is passed via a `TMVar` and another `TMVar` is used to signal the
-- main insert thread of completion.

epochUpdateThread :: Trace IO Text -> LedgerEnv -> SqlBackend -> IO ()
epochUpdateThread tracer env backend =
loop
runEpochUpdateThread :: Trace IO Text -> EpochUpdateControl -> IO ()
runEpochUpdateThread tracer euc = do
logInfo tracer "runEpochUpdateThread"
forever $ do
-- Will block until data arrives.
epochUpdate <- atomically $ takeTMVar (ruInsertDone euc)

liftIO . logInfo tracer $
mconcat
[ "Asynchonously inserting epoch updates for epoch "
, textShow (unEpochNo $ Generic.euEpoch epochUpdate)
]
-- This starts a new database connection and runs the following in a
-- transaction.
DB.runDbNoLogging $ do
-- DB.runIohkLogging tracer $
-- withPostgresqlConn connectString $ \ backend ->
-- DB.runDbIohkLogging backend tracer $ do
insertEpochUpdate tracer epochUpdate
liftIO $ waitOnTMVar (Generic.euEpoch epochUpdate)

where
loop :: IO a
loop = do
-- Will block until data arrives.
epochUpdate <- atomically $ takeTMVar (ruInsertDone $ leEpochUpdate env)

liftIO . logInfo tracer $
mconcat
[ "Asynchonously inserting epoch updates for epoch "
, textShow (unEpochNo $ Generic.euEpoch epochUpdate)
]
-- This starts a new database connection and runs the following in a
-- transaction.
DB.runDbAction backend (Just tracer) $ do
-- Insert the data.
insertEpochUpdate tracer epochUpdate

liftIO $ do
-- Signal the main thread that insertion is complete.
atomically $ do
writeTVar (ruState $ leEpochUpdate env) WaitingForEpoch
putTMVar (ruUpdateReady $ leEpochUpdate env) ()

logInfo tracer $
mconcat
[ "Asynchonous insert for epoch "
, textShow (unEpochNo $ Generic.euEpoch epochUpdate)
, " done, waiting for epoch boundary"
]

void . atomically $ takeTMVar (ruCommit $ leEpochUpdate env)
logInfo tracer $
mconcat
[ "Committing insert for epoch "
, textShow (unEpochNo $ Generic.euEpoch epochUpdate)
, " done"
]

loop

insertEpochUpdate :: MonadIO m => Trace IO Text -> Generic.EpochUpdate -> ReaderT SqlBackend m ()
waitOnTMVar :: EpochNo -> IO ()
waitOnTMVar epochNo = do
-- Signal the main thread that insertion is complete.
atomically $ do
writeTVar (ruState euc) WaitingForEpoch
putTMVar (ruUpdateReady euc) ()

logInfo tracer $
mconcat
[ "Asynchonous insert for epoch "
, textShow (unEpochNo epochNo)
, " done, waiting for epoch boundary"
]

-- Wait for the main thread to notify us that its time to commit the transaction.
void . atomically $ takeTMVar (ruCommit euc)
logInfo tracer $
mconcat
[ "Committing insert for epoch "
, textShow (unEpochNo epochNo)
, " done"
]

insertEpochUpdate
:: MonadIO m -- (MonadBaseControl IO m, MonadIO m)
=> Trace IO Text -> Generic.EpochUpdate
-> ReaderT SqlBackend m ()
insertEpochUpdate tracer _eu =
liftIO $ logInfo tracer "insertEpochUpdate"
13 changes: 5 additions & 8 deletions cardano-sync/src/Cardano/Sync.hs
Expand Up @@ -47,6 +47,7 @@ import Cardano.Sync.Config
import Cardano.Sync.Database (DbAction (..), DbActionQueue, lengthDbActionQueue,
mkDbApply, mkDbRollback, newDbActionQueue, runDbStartup, writeDbActionQueue)
import Cardano.Sync.Error
import Cardano.Sync.LedgerState
import Cardano.Sync.Metrics
import Cardano.Sync.Plugin (SyncNodePlugin (..))
import Cardano.Sync.StateQuery (StateQueryTMVar, getSlotDetails, localStateQueryHandler,
Expand Down Expand Up @@ -121,15 +122,11 @@ type RunDBThreadFunction
-> IO ()

runSyncNode
:: SyncDataLayer
-> MetricSetters
-> Trace IO Text
-> SyncNodePlugin
-> SyncNodeParams
-> InsertValidateGenesisFunction
:: Trace IO Text -> SyncDataLayer -> EpochUpdateControl -> MetricSetters
-> SyncNodePlugin -> SyncNodeParams -> InsertValidateGenesisFunction
-> RunDBThreadFunction
-> IO ()
runSyncNode dataLayer metricsSetters trce plugin enp insertValidateGenesisDist runDBThreadFunction =
runSyncNode trce dataLayer euc metricsSetters plugin enp insertValidateGenesisDist runDBThreadFunction =
withIOManager $ \iomgr -> do

let configFile = enpConfigFile enp
Expand All @@ -152,7 +149,7 @@ runSyncNode dataLayer metricsSetters trce plugin enp insertValidateGenesisDist r
liftIO $ runDbStartup trce plugin
case genCfg of
GenesisCardano _ bCfg _sCfg -> do
syncEnv <- ExceptT $ mkSyncEnvFromConfig dataLayer (enpLedgerStateDir enp) genCfg
syncEnv <- ExceptT $ mkSyncEnvFromConfig dataLayer euc (enpLedgerStateDir enp) genCfg
liftIO $ runSyncNodeNodeClient metricsSetters syncEnv iomgr trce plugin
runDBThreadFunction (cardanoCodecConfig bCfg) (enpSocketPath enp)
where
Expand Down
26 changes: 14 additions & 12 deletions cardano-sync/src/Cardano/Sync/Api.hs
Expand Up @@ -51,10 +51,13 @@ data SyncDataLayer = SyncDataLayer
, sdlGetLatestSlotNo :: IO SlotNo
}

mkSyncEnv :: SyncDataLayer -> ProtocolInfo IO CardanoBlock -> Shelley.Network -> NetworkMagic -> SystemStart -> LedgerStateDir -> IO SyncEnv
mkSyncEnv dataLayer protocolInfo network networkMagic systemStart dir = do
mkSyncEnv
:: SyncDataLayer -> ProtocolInfo IO CardanoBlock -> EpochUpdateControl
-> Shelley.Network -> NetworkMagic -> SystemStart -> LedgerStateDir
-> IO SyncEnv
mkSyncEnv dataLayer protocolInfo euc network networkMagic systemStart dir = do
latestSlot <- sdlGetLatestSlotNo dataLayer
ledgerEnv <- mkLedgerEnv protocolInfo dir network latestSlot True
ledgerEnv <- mkLedgerEnv protocolInfo euc dir network latestSlot True
pure $ SyncEnv
{ envProtocol = SyncProtocolCardano
, envNetworkMagic = networkMagic
Expand All @@ -63,8 +66,8 @@ mkSyncEnv dataLayer protocolInfo network networkMagic systemStart dir = do
, envLedger = ledgerEnv
}

mkSyncEnvFromConfig :: SyncDataLayer -> LedgerStateDir -> GenesisConfig -> IO (Either SyncNodeError SyncEnv)
mkSyncEnvFromConfig dataLayer dir genCfg =
mkSyncEnvFromConfig :: SyncDataLayer -> EpochUpdateControl -> LedgerStateDir -> GenesisConfig -> IO (Either SyncNodeError SyncEnv)
mkSyncEnvFromConfig dataLayer euc dir genCfg =
case genCfg of
GenesisCardano _ bCfg sCfg
| unProtocolMagicId (Byron.configProtocolMagicId bCfg) /= Shelley.sgNetworkMagic (scConfig sCfg) ->
Expand All @@ -79,13 +82,12 @@ mkSyncEnvFromConfig dataLayer dir genCfg =
[ "SystemStart ", textShow (Byron.gdStartTime $ Byron.configGenesisData bCfg)
, " /= ", textShow (Shelley.sgSystemStart $ scConfig sCfg)
]
| otherwise -> Right <$> mkSyncEnv
dataLayer
(mkProtocolInfoCardano genCfg)
(Shelley.sgNetworkId (scConfig sCfg))
(NetworkMagic (unProtocolMagicId $ Byron.configProtocolMagicId bCfg))
(SystemStart (Byron.gdStartTime $ Byron.configGenesisData bCfg))
dir
| otherwise ->
Right <$> mkSyncEnv dataLayer (mkProtocolInfoCardano genCfg) euc
(Shelley.sgNetworkId $ scConfig sCfg)
(NetworkMagic (unProtocolMagicId $ Byron.configProtocolMagicId bCfg))
(SystemStart (Byron.gdStartTime $ Byron.configGenesisData bCfg))
dir

getLatestPoints :: SyncEnv -> IO [CardanoPoint]
getLatestPoints env = do
Expand Down
16 changes: 7 additions & 9 deletions cardano-sync/src/Cardano/Sync/LedgerState.hs
Expand Up @@ -19,6 +19,7 @@ module Cardano.Sync.LedgerState
, listLedgerStateFilesOrdered
, hashToAnnotation
, loadLedgerStateFromFile
, mkEpochUpdateControl
, mkLedgerEnv
) where

Expand Down Expand Up @@ -138,24 +139,21 @@ data LedgerStateSnapshot = LedgerStateSnapshot
, lssNewEpoch :: !(Maybe Generic.NewEpoch) -- Only Just for a single block at the epoch boundary
}

mkLedgerEnv :: Consensus.ProtocolInfo IO CardanoBlock
-> LedgerStateDir
-> Shelley.Network
-> SlotNo
-> Bool
-> IO LedgerEnv
mkLedgerEnv protocolInfo dir network slot deleteFiles = do
mkLedgerEnv
:: Consensus.ProtocolInfo IO CardanoBlock -> EpochUpdateControl
-> LedgerStateDir -> Shelley.Network -> SlotNo -> Bool
-> IO LedgerEnv
mkLedgerEnv protocolInfo eu dir network slot deleteFiles = do
when deleteFiles $
deleteNewerLedgerStateFiles dir slot
st <- findLatestLedgerState protocolInfo dir deleteFiles
var <- newTVarIO st
ru <- mkEpochUpdateControl
pure LedgerEnv
{ leProtocolInfo = protocolInfo
, leDir = dir
, leNetwork = network
, leStateVar = var
, leEpochUpdate = ru
, leEpochUpdate = eu
}


Expand Down

0 comments on commit da810b0

Please sign in to comment.