Skip to content

Commit

Permalink
Improve DBSync configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
kderme committed Jan 9, 2022
1 parent 45733de commit e0fab82
Show file tree
Hide file tree
Showing 63 changed files with 577 additions and 365 deletions.
2 changes: 2 additions & 0 deletions cardano-chain-gen/cardano-chain-gen.cabal
Expand Up @@ -167,6 +167,8 @@ test-suite cardano-chain-gen
, io-classes
, optparse-applicative
, ouroboros-network
, silently
, stm
, strict-containers
, tasty
, tasty-quickcheck
Expand Down
7 changes: 2 additions & 5 deletions cardano-chain-gen/src/Cardano/Mock/Forging/Interpreter.hs
Expand Up @@ -107,7 +107,6 @@ newtype Fingerprint = Fingerprint [Word64]

mkFingerprint :: FilePath -> IO (FingerprintMode, Fingerprint)
mkFingerprint path = do
print path
thereIsFile <- doesPathExist path
if thereIsFile then do
mfingerPrint <- eitherDecodeFileStrict path
Expand Down Expand Up @@ -151,7 +150,6 @@ finalizeFingerprint inter = do
interState <- readMVar $ iState inter
case iFingerMode inter of
SearchSlots fp -> do
print $ "Dumping slots to " <> fp
encodeFile fp $ reverseFingerprint $ isFingerprint interState
ValidateSlots -> pure ()

Expand All @@ -178,7 +176,6 @@ initInterpreter pinfo traceForge fingerprintFile = do
, isNextBlockNo = BlockNo 0
, isFingerprint = fingerprint
}
print $ initChainDB topLeverCfg initSt
stvar <- newMVar initState
pure $ Interpreter
{ iForging = Map.fromList $ zip [0..] forging
Expand Down Expand Up @@ -373,8 +370,8 @@ registerAllStakeCreds :: Interpreter -> NodeId -> IO CardanoBlock
registerAllStakeCreds inter nodeId = do
st <- getState inter
tx <- case ledgerState st of
LedgerStateShelley sts -> either throwIO (pure . TxShelley) $ Shelley.mkDCertTx sts
LedgerStateAlonzo sta -> either throwIO (pure . TxAlonzo) $ Alonzo.mkDCertTx sta
LedgerStateShelley sts -> either throwIO (pure . TxShelley) $ Shelley.mkDCertTxPools sts
LedgerStateAlonzo sta -> either throwIO (pure . TxAlonzo) $ Alonzo.mkDCertTxPools sta
_ -> throwIO UnexpectedEra
forgeNext inter $ MockBlock [tx] nodeId

Expand Down
9 changes: 7 additions & 2 deletions cardano-chain-gen/src/Cardano/Mock/Forging/Tx/Alonzo.hs
Expand Up @@ -72,9 +72,14 @@ mkPaymentTx inputIndex outputIndex amount fees sta = do
change = TxOut addr' (valueFromList (fromIntegral $ fromIntegral inputValue - amount - fees) []) Strict.SNothing
Right $ mkSimpleTx $ consPaymentTxBody input (StrictSeq.fromList [output, change]) (Coin fees)

mkDCertTx :: LedgerState (ShelleyBlock (AlonzoEra StandardCrypto))
mkDCertTx :: [DCert StandardCrypto] -> Wdrl StandardCrypto
-> LedgerState (ShelleyBlock (AlonzoEra StandardCrypto))
-> Either ForgingError (ValidatedTx (AlonzoEra StandardCrypto))
mkDCertTx sta = Right $ mkSimpleTx $ consCertTxBody (allPoolStakeCert sta) (Wdrl mempty)
mkDCertTx certs wdrl sta = Right $ mkSimpleTx $ consCertTxBody certs wdrl

mkDCertTxPools :: LedgerState (ShelleyBlock (AlonzoEra StandardCrypto))
-> Either ForgingError (ValidatedTx (AlonzoEra StandardCrypto))
mkDCertTxPools sta = Right $ mkSimpleTx $ consCertTxBody (allPoolStakeCert sta) (Wdrl mempty)

mkSimpleTx :: TxBody (AlonzoEra StandardCrypto) -> ValidatedTx (AlonzoEra StandardCrypto)
mkSimpleTx txBody = ValidatedTx
Expand Down
4 changes: 2 additions & 2 deletions cardano-chain-gen/src/Cardano/Mock/Forging/Tx/Generic.hs
Expand Up @@ -44,9 +44,9 @@ resolveUTxOIndex index st = toLeft $ case index of
utxoPairs = Map.toList $ unUTxO $ _utxo $ _utxoState $ esLState $
nesEs $ Consensus.shelleyLedgerState st

safeIndex :: Int -> [(TxIn (Crypto era), Core.TxOut era)] -> Maybe (TxIn (Crypto era), Core.TxOut era)
safeIndex :: Int -> [a] -> Maybe a
safeIndex n ls
| n < length ls = Just $ utxoPairs !! n
| n < length ls = Just $ ls !! n
| True = Nothing

eq addr (_, txOut) = addr == getField @"address" txOut
Expand Down
6 changes: 3 additions & 3 deletions cardano-chain-gen/src/Cardano/Mock/Forging/Tx/Shelley.hs
Expand Up @@ -42,9 +42,9 @@ mkPaymentTx inputIndex outputIndex amount fees st = do

Right $ mkSimpleTx $ consPaymentTxBody input (StrictSeq.fromList [output, change]) (Coin fees)

mkDCertTx :: LedgerState (ShelleyBlock (ShelleyEra StandardCrypto))
-> Either ForgingError (Tx (ShelleyEra StandardCrypto))
mkDCertTx sta = Right $ mkSimpleTx $ consCertTxBody (allPoolStakeCert sta) (Wdrl mempty)
mkDCertTxPools :: LedgerState (ShelleyBlock (ShelleyEra StandardCrypto))
-> Either ForgingError (Tx (ShelleyEra StandardCrypto))
mkDCertTxPools sta = Right $ mkSimpleTx $ consCertTxBody (allPoolStakeCert sta) (Wdrl mempty)

mkSimpleTx :: TxBody (ShelleyEra StandardCrypto) -> Tx (ShelleyEra StandardCrypto)
mkSimpleTx txBody = Tx
Expand Down
3 changes: 3 additions & 0 deletions cardano-chain-gen/src/Cardano/Mock/Forging/Types.hs
Expand Up @@ -45,3 +45,6 @@ data UTxOIndex = UTxOIndex Int | UTxOAddress (Addr StandardCrypto)
deriving (Show, Eq)

data StakeIndex = StakeIndex Int | StakeAddress (StakeCredential StandardCrypto)
| PoolAccount PoolIndex

data PoolIndex = PoolIndex Int | PoolId
152 changes: 120 additions & 32 deletions cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs
@@ -1,17 +1,26 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module Test.Cardano.Db.Mock.Config where

import Cardano.Prelude (panic)
import Cardano.Prelude (ReaderT, stderr, panic)

import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar
import Control.Exception (SomeException)
import Control.Monad.Extra (eitherM)
import Control.Monad.Logger (NoLoggingT)
import Control.Monad.Trans.Except (runExceptT)
import Control.Tracer (nullTracer)
import Data.Text (Text)
import qualified Data.Text as Text
import System.Directory
import System.FilePath.Posix (takeFileName , (</>))
import System.FilePath.Posix ((</>))
import System.IO.Silently

import Database.Persist.Sql (SqlBackend)

import Ouroboros.Consensus.Config
import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus
Expand All @@ -24,6 +33,8 @@ import Cardano.CLI.Shelley.Run.Genesis as CLI
import Cardano.Node.Protocol.Shelley (readLeaderCredentials)
import Cardano.Node.Types (ProtocolFilepaths (..))

import qualified Cardano.Db as DB

import Cardano.DbSync
import Cardano.DbSync.Config
import Cardano.DbSync.Config.Cardano
Expand All @@ -34,13 +45,77 @@ import Cardano.DbSync.Types (MetricSetters (..))
import Cardano.Mock.ChainSync.Server
import Cardano.Mock.Forging.Interpreter hiding (CardanoBlock)

rootTestDir :: FilePath
rootTestDir = "test/testfiles"

mkMutableDir :: FilePath -> FilePath
mkMutableDir testLabel = rootTestDir </> "temp" </> testLabel

mkConfigDir :: FilePath -> FilePath
mkConfigDir config = rootTestDir </> config

fingerprintRoot :: FilePath
fingerprintRoot = rootTestDir </> "fingerprint"

mkFingerPrint :: FilePath -> FilePath
mkFingerPrint testLabel = fingerprintRoot </> testLabel

data Config = Config
{ topLevelConfig :: TopLevelConfig CardanoBlock
, protocolInfo :: Consensus.ProtocolInfo IO CardanoBlock
, protocolInfoForging :: Consensus.ProtocolInfo IO CardanoBlock
, syncNodeParams :: SyncNodeParams
}

data DBSyncEnv = DBSyncEnv
{ dbSyncParams :: SyncNodeParams
, dbSyncForkDB :: IO (Async ())
, dbSyncThreadVar :: TMVar (Async ())
}

mkDBSyncEnv :: SyncNodeParams -> IO () -> IO DBSyncEnv
mkDBSyncEnv params runDBSync = do
runningVar <- atomically newEmptyTMVar
pure $ DBSyncEnv
{ dbSyncParams = params
, dbSyncForkDB = async runDBSync
, dbSyncThreadVar = runningVar
}

stopDBSync :: DBSyncEnv -> IO ()
stopDBSync env = do
thr <- atomically $ tryReadTMVar (dbSyncThreadVar env)
case thr of
Nothing -> error "Could not cancel db-sync when it's not running"
Just a -> do
cancel a
-- make it empty
_ <- atomically $ takeTMVar (dbSyncThreadVar env)
pure ()

startDBSync :: DBSyncEnv -> IO ()
startDBSync env = do
thr <- atomically $ tryReadTMVar $ dbSyncThreadVar env
case thr of
Just _a -> error "db-sync already running"
Nothing -> do
a <- dbSyncForkDB env
_ <- atomically $ tryPutTMVar (dbSyncThreadVar env) a
pure ()

pollDBSync :: DBSyncEnv -> IO (Maybe (Either SomeException ()))
pollDBSync env = do
thr <- atomically $ tryReadTMVar (dbSyncThreadVar env)
case thr of
Nothing -> error "Could not poll db-sync when it's not running"
Just a -> poll a

getDBSyncPGPass :: DBSyncEnv -> DB.PGPassSource
getDBSyncPGPass = enpPGPassSource . dbSyncParams

queryDBSync :: DBSyncEnv -> ReaderT SqlBackend (NoLoggingT IO) a -> IO a
queryDBSync env q = DB.runWithConnectionNoLogging (getDBSyncPGPass env) q

setupTestsDir :: FilePath -> IO ()
setupTestsDir dir = do
eitherM (panic . textShow) pure $ runExceptT $
Expand All @@ -54,8 +129,8 @@ mkConfig staticDir mutableDir = do
let pInfoDbSync = mkProtocolInfoCardano genCfg []
creds <- mkShelleyCredentials $ staticDir </> "pools" </> "bulk1.creds"
let pInfoForger = mkProtocolInfoCardano genCfg creds
let syncParams = mkSyncNodeParams staticDir mutableDir
pure $ Config (Consensus.pInfoConfig pInfoDbSync) pInfoDbSync pInfoForger syncParams
syncPars <- mkSyncNodeParams staticDir mutableDir
pure $ Config (Consensus.pInfoConfig pInfoDbSync) pInfoDbSync pInfoForger syncPars

mkShelleyCredentials :: FilePath -> IO [TPraosLeaderCredentials StandardCrypto]
mkShelleyCredentials bulkFile = do
Expand All @@ -71,15 +146,22 @@ mkShelleyCredentials bulkFile = do
}

-- | staticDir can be shared by tests running in parallel. mutableDir not.
mkSyncNodeParams :: FilePath -> FilePath -> SyncNodeParams
mkSyncNodeParams staticDir mutableDir = SyncNodeParams
{ enpConfigFile = ConfigFile $ staticDir </> "test-db-sync-config.json"
, enpSocketPath = SocketPath $ mutableDir </> ".socket"
, enpLedgerStateDir = LedgerStateDir $ mutableDir </> "ledger-states"
, enpMigrationDir = MigrationDir "../schema"
, enpExtended = True
, enpMaybeRollback = Nothing
}
mkSyncNodeParams :: FilePath -> FilePath -> IO SyncNodeParams
mkSyncNodeParams staticDir mutableDir = do
Just pgconfig <- DB.parsePGConfig $ "/var/run/postgresql:5432:" <> dbname <> ":*:*"
pure $ SyncNodeParams
{ enpConfigFile = ConfigFile $ staticDir </> "test-db-sync-config.json"
, enpSocketPath = SocketPath $ mutableDir </> ".socket"
, enpLedgerStateDir = LedgerStateDir $ mutableDir </> "ledger-states"
, enpMigrationDir = MigrationDir "../schema"
, enpPGPassSource = DB.PGPassCached pgconfig
, enpExtended = True
, enpMaybeRollback = Nothing
}
where
-- TODO: Use this to have parallem tests
-- <> (Text.encodeUtf8 $ Text.pack testLabel)
dbname = "testing"

emptyMetricsSetters :: MetricSetters
emptyMetricsSetters = MetricSetters
Expand All @@ -90,26 +172,32 @@ emptyMetricsSetters = MetricSetters
}

withFullConfig :: FilePath -> FilePath
-> (Interpreter -> ServerHandle IO CardanoBlock -> IO (Async ()) -> IO ())
-> (Interpreter -> ServerHandle IO CardanoBlock -> DBSyncEnv -> IO ())
-> IOManager -> [(Text, Text)] -> IO ()
withFullConfig staticDir mutableDir action iom migr = do
recreateDir mutableDir
cfg <- mkConfig staticDir mutableDir
fingerFile <- prepareFingerprintFile staticDir mutableDir
withInterpreter (protocolInfoForging cfg) nullTracer fingerFile $ \interpreter -> do
let initSt = Consensus.pInfoInitLedger $ protocolInfo cfg
-- TODO: get 42 from config
mockServer <- forkServerThread @CardanoBlock iom (topLevelConfig cfg) initSt (NetworkMagic 42) $ unSocketPath (enpSocketPath $ syncNodeParams cfg)
-- we dont fork dbsync here. Just prepare it as an action
let dbSync = async $ runDbSyncNode emptyMetricsSetters True migr (syncNodeParams cfg)
action interpreter mockServer dbSync

prepareFingerprintFile :: FilePath -> FilePath -> IO FilePath
prepareFingerprintFile staticDir mutableDir = do
let testLabel = takeFileName mutableDir
let dir = staticDir </> "fingerprints"
createDirectoryIfMissing True dir
pure $ dir </> testLabel
withFullConfig config testLabel action iom migr = do
recreateDir mutableDir
cfg <- mkConfig configDir mutableDir
fingerFile <- prepareFingerprintFile testLabel
withInterpreter (protocolInfoForging cfg) nullTracer fingerFile $ \interpreter -> do
let initSt = Consensus.pInfoInitLedger $ protocolInfo cfg
-- TODO: get 42 from config
mockServer <- forkServerThread @CardanoBlock iom (topLevelConfig cfg) initSt (NetworkMagic 42) $ unSocketPath (enpSocketPath $ syncNodeParams cfg)
-- we dont fork dbsync here. Just prepare it as an action
let dbsyncParams = syncNodeParams cfg
dbsyncRun = runDbSync emptyMetricsSetters migr iom nullTracer dbsyncParams True 100 100
dbSync <- mkDBSyncEnv dbsyncParams dbsyncRun
_ <- hSilence [stderr] $ DB.recreateDB (getDBSyncPGPass dbSync)
action interpreter mockServer dbSync
where
configDir = mkConfigDir config
mutableDir = mkMutableDir testLabel

prepareFingerprintFile :: FilePath -> IO FilePath
prepareFingerprintFile testLabel = do
createDirectoryIfMissing True fingerprintRoot
pure fingerprintFile
where
fingerprintFile = mkFingerPrint testLabel

recreateDir :: FilePath -> IO ()
recreateDir path = do
Expand Down

0 comments on commit e0fab82

Please sign in to comment.