Skip to content

Commit

Permalink
Try #537:+
Browse files Browse the repository at this point in the history
  • Loading branch information
iohk-bors[bot] committed Mar 4, 2021
2 parents 788cf0d + ed0e0ba commit 7259739
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 94 deletions.
8 changes: 7 additions & 1 deletion cardano-db-sync-extended/app/cardano-db-sync-extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Cardano.Prelude
import Cardano.Config.Git.Rev (gitRev)

import Cardano.DbSync (runDbSyncNode)
import Cardano.DbSync.Metrics (withMetricsLayer)
import Cardano.DbSync.Plugin.Extended (extendedDbSyncNodePlugin)

import Cardano.Sync.Config
Expand All @@ -29,7 +30,12 @@ main = do
cmd <- Opt.execParser opts
case cmd of
CmdVersion -> runVersionCommand
CmdRun params -> runDbSyncNode extendedDbSyncNodePlugin params
CmdRun params -> do
prometheusPort <- dncPrometheusPort <$> readSyncNodeConfig (enpConfigFile params)

-- This enables us to be much more flexibile with what we actually measure.
withMetricsLayer prometheusPort $ \metricsLayer ->
runDbSyncNode metricsLayer extendedDbSyncNodePlugin params

-- -------------------------------------------------------------------------------------------------

Expand Down
8 changes: 7 additions & 1 deletion cardano-db-sync/app/cardano-db-sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Cardano.Prelude
import Cardano.Config.Git.Rev (gitRev)

import Cardano.DbSync (defDbSyncNodePlugin, runDbSyncNode)
import Cardano.DbSync.Metrics (withMetricsLayer)

import Cardano.Sync.Config
import Cardano.Sync.Config.Types
Expand All @@ -29,7 +30,12 @@ main = do
cmd <- Opt.execParser opts
case cmd of
CmdVersion -> runVersionCommand
CmdRun params -> runDbSyncNode defDbSyncNodePlugin params
CmdRun params -> do
prometheusPort <- dncPrometheusPort <$> readSyncNodeConfig (enpConfigFile params)

-- This enables us to be much more flexibile with what we actually measure.
withMetricsLayer prometheusPort $ \metricsLayer ->
runDbSyncNode metricsLayer defDbSyncNodePlugin params

-- -------------------------------------------------------------------------------------------------

Expand Down
8 changes: 4 additions & 4 deletions cardano-db-sync/src/Cardano/DbSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import Cardano.DbSync.Plugin.Default (defDbSyncNodePlugin)
import Cardano.DbSync.Rollback (unsafeRollback)
import Cardano.Sync.Database (runDbThread)

import Cardano.Sync (Block (..), SyncDataLayer (..), SyncNodePlugin (..),
import Cardano.Sync (Block (..), MetricsLayer, SyncDataLayer (..), SyncNodePlugin (..),
configureLogging, runSyncNode)
import Cardano.Sync.Config.Types (ConfigFile (..), GenesisFile (..), LedgerStateDir (..),
MigrationDir (..), NetworkName (..), SocketPath (..), SyncCommand (..),
Expand All @@ -49,8 +49,8 @@ import Database.Persist.Postgresql (withPostgresqlConn)
import Database.Persist.Sql (SqlBackend)


runDbSyncNode :: (SqlBackend -> SyncNodePlugin) -> SyncNodeParams -> IO ()
runDbSyncNode mkPlugin params = do
runDbSyncNode :: MetricsLayer -> (SqlBackend -> SyncNodePlugin) -> SyncNodeParams -> IO ()
runDbSyncNode metricsLayer mkPlugin params = do

-- Read the PG connection info
pgConfig <- DB.readPGPassFileEnv Nothing
Expand All @@ -69,7 +69,7 @@ runDbSyncNode mkPlugin params = do
Just slotNo -> void $ unsafeRollback trce slotNo
Nothing -> pure ()

runSyncNode (mkSyncDataLayer trce backend) trce (mkPlugin backend)
runSyncNode (mkSyncDataLayer trce backend) metricsLayer trce (mkPlugin backend)
params (insertValidateGenesisDist backend) runDbThread

-- -------------------------------------------------------------------------------------------------
Expand Down
57 changes: 44 additions & 13 deletions cardano-db-sync/src/Cardano/DbSync/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,52 @@
module Cardano.DbSync.Metrics
( Metrics (..)
, makeMetrics
, withMetricsLayer
, withMetricsServer
) where

import Cardano.Prelude

import Cardano.Sync.Types (MetricsLayer (..))

import System.Metrics.Prometheus.Concurrent.RegistryT (RegistryT (..), registerGauge,
runRegistryT, unRegistryT)
import System.Metrics.Prometheus.Http.Scrape (serveMetricsT)
import System.Metrics.Prometheus.Metric.Gauge (Gauge)
import qualified System.Metrics.Prometheus.Metric.Gauge as Gauge


-- |The metrics we use for Prometheus.
data Metrics = Metrics
{ mDbHeight :: !Gauge
, mNodeHeight :: !Gauge
, mQueuePre :: !Gauge
, mQueuePost :: !Gauge
, mQueuePostWrite :: !Gauge
{ mNodeBlockHeight :: !Gauge
-- ^ The block tip number of the remote node.
, mDbQueueLength :: !Gauge
-- ^ The number of @DbAction@ remaining for the database.
, mDbBlockHeight :: !Gauge
-- ^ The block tip number in the database.
, mDbSlotHeight :: !Gauge
-- ^ The slot tip number in the database.
}

-- This enables us to be much more flexibile with what we actually measure.
withMetricsLayer :: Int -> (MetricsLayer -> IO a) -> IO a
withMetricsLayer prometheusPort action =
withMetricsServer prometheusPort $ \metrics -> do

-- Metrics layer.
let metricsLayer =
MetricsLayer
{ mlSetNodeBlockHeight = \nodeHeight ->
Gauge.set (fromIntegral nodeHeight) $ mNodeBlockHeight metrics
, mlSetDbQueueLength = \queuePostWrite ->
Gauge.set (fromIntegral queuePostWrite) $ mDbQueueLength metrics
, mlSetDbBlockHeight = \blockNo ->
Gauge.set (fromIntegral blockNo) $ mDbBlockHeight metrics
, mlSetDbSlotHeight = \slotNo ->
Gauge.set (fromIntegral slotNo) $ mDbSlotHeight metrics
}

action metricsLayer

withMetricsServer :: Int -> (Metrics -> IO a) -> IO a
withMetricsServer port action = do
(metrics, registry) <- runRegistryT $ (,) <$> makeMetrics <*> RegistryT ask
Expand All @@ -32,11 +59,15 @@ withMetricsServer port action = do
(const $ action metrics)

makeMetrics :: RegistryT IO Metrics
makeMetrics =
Metrics
<$> registerGauge "db_block_height" mempty
<*> registerGauge "remote_tip_height" mempty
<*> registerGauge "action_queue_length_pre" mempty
<*> registerGauge "action_queue_length_post" mempty
<*> registerGauge "action_queue_length_post_write" mempty
makeMetrics = do
nodeBlockHeight <- registerGauge "cardano_db_sync_node_block_height" mempty
dbQueueLength <- registerGauge "cardano_db_sync_db_queue_length" mempty
dbBlockHeight <- registerGauge "cardano_db_sync_db_block_height" mempty
dbSlotHeight <- registerGauge "cardano_db_sync_db_slot_height" mempty

return $ Metrics
{ mNodeBlockHeight = nodeBlockHeight
, mDbQueueLength = dbQueueLength
, mDbBlockHeight = dbBlockHeight
, mDbSlotHeight = dbSlotHeight
}
2 changes: 0 additions & 2 deletions cardano-sync/cardano-sync.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ library
Cardano.Sync.Error

Cardano.Sync.LedgerState
Cardano.Sync.Metrics

Cardano.Sync.Era.Byron.Util
Cardano.Sync.Era.Cardano.Util
Expand Down Expand Up @@ -97,7 +96,6 @@ library
, ouroboros-consensus-shelley
, ouroboros-network
, ouroboros-network-framework
, prometheus
, shelley-spec-ledger
, stm
, text
Expand Down
62 changes: 37 additions & 25 deletions cardano-sync/src/Cardano/Sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ module Cardano.Sync
, SocketPath (..)

, SyncDataLayer (..)
, MetricsLayer (..)
, emptyMetricsLayer
, Block (..)
, Meta (..)
, SyncEnv (..)
Expand All @@ -44,7 +46,6 @@ import Cardano.Sync.Config
import Cardano.Sync.Database (DbAction (..), DbActionQueue, lengthDbActionQueue,
mkDbApply, mkDbRollback, newDbActionQueue, runDbStartup, writeDbActionQueue)
import Cardano.Sync.Error
import Cardano.Sync.Metrics
import Cardano.Sync.Plugin (SyncNodePlugin (..))
import Cardano.Sync.StateQuery (StateQueryTMVar, getSlotDetails, localStateQueryHandler,
newStateQueryTMVar)
Expand Down Expand Up @@ -103,7 +104,6 @@ import qualified Ouroboros.Network.Snocket as Snocket
import Ouroboros.Network.Subscription (SubscriptionTrace)

import System.Directory (createDirectoryIfMissing)
import qualified System.Metrics.Prometheus.Metric.Gauge as Gauge


type InsertValidateGenesisFunction
Expand All @@ -113,22 +113,23 @@ type InsertValidateGenesisFunction
-> ExceptT SyncNodeError IO ()

type RunDBThreadFunction
= Trace IO Text
= MetricsLayer
-> Trace IO Text
-> SyncEnv
-> SyncNodePlugin
-> Metrics
-> DbActionQueue
-> IO ()

runSyncNode
:: SyncDataLayer
-> MetricsLayer
-> Trace IO Text
-> SyncNodePlugin
-> SyncNodeParams
-> InsertValidateGenesisFunction
-> RunDBThreadFunction
-> IO ()
runSyncNode dataLayer trce plugin enp insertValidateGenesisDist runDBThreadFunction =
runSyncNode dataLayer metricsLayer trce plugin enp insertValidateGenesisDist runDBThreadFunction =
withIOManager $ \iomgr -> do

let configFile = enpConfigFile enp
Expand All @@ -152,7 +153,7 @@ runSyncNode dataLayer trce plugin enp insertValidateGenesisDist runDBThreadFunct
case genCfg of
GenesisCardano _ bCfg _sCfg -> do
syncEnv <- ExceptT $ mkSyncEnvFromConfig dataLayer (enpLedgerStateDir enp) genCfg
liftIO $ runSyncNodeNodeClient (dncPrometheusPort enc) syncEnv iomgr trce plugin
liftIO $ runSyncNodeNodeClient metricsLayer syncEnv iomgr trce plugin
runDBThreadFunction (cardanoCodecConfig bCfg) (enpSocketPath enp)
where
cardanoCodecConfig :: Byron.Config -> CodecConfig CardanoBlock
Expand All @@ -166,7 +167,7 @@ runSyncNode dataLayer trce plugin enp insertValidateGenesisDist runDBThreadFunct
-- -------------------------------------------------------------------------------------------------

runSyncNodeNodeClient
:: Int
:: MetricsLayer
-> SyncEnv
-> IOManager
-> Trace IO Text
Expand All @@ -175,17 +176,16 @@ runSyncNodeNodeClient
-> CodecConfig CardanoBlock
-> SocketPath
-> IO ()
runSyncNodeNodeClient port env iomgr trce plugin runDBThreadFunction codecConfig (SocketPath socketPath) = do
runSyncNodeNodeClient metricsLayer env iomgr trce plugin runDBThreadFunction codecConfig (SocketPath socketPath) = do
queryVar <- newStateQueryTMVar
logInfo trce $ "localInitiatorNetworkApplication: connecting to node via " <> textShow socketPath
withMetricsServer port $ \ metrics ->
void $ subscribe
(localSnocket iomgr socketPath)
codecConfig
(envNetworkMagic env)
networkSubscriptionTracers
clientSubscriptionParams
(dbSyncProtocols trce env plugin metrics queryVar runDBThreadFunction)
void $ subscribe
(localSnocket iomgr socketPath)
codecConfig
(envNetworkMagic env)
networkSubscriptionTracers
clientSubscriptionParams
(dbSyncProtocols metricsLayer trce env plugin queryVar runDBThreadFunction)
where
clientSubscriptionParams =
ClientSubscriptionParams
Expand Down Expand Up @@ -217,17 +217,17 @@ runSyncNodeNodeClient port env iomgr trce plugin runDBThreadFunction codecConfig
handshakeTracer = toLogObject $ appendName "Handshake" trce

dbSyncProtocols
:: Trace IO Text
:: MetricsLayer
-> Trace IO Text
-> SyncEnv
-> SyncNodePlugin
-> Metrics
-> StateQueryTMVar CardanoBlock (Interpreter (CardanoEras StandardCrypto))
-> RunDBThreadFunction
-> Network.NodeToClientVersion
-> ClientCodecs CardanoBlock IO
-> ConnectionId LocalAddress
-> NodeToClientProtocols 'InitiatorMode BSL.ByteString IO () Void
dbSyncProtocols trce env plugin metrics queryVar runDBThreadFunction version codecs _connectionId =
dbSyncProtocols metricsLayer trce env plugin queryVar runDBThreadFunction version codecs _connectionId =
NodeToClientProtocols
{ localChainSyncProtocol = localChainSyncPtcl
, localTxSubmissionProtocol = dummylocalTxSubmit
Expand All @@ -252,13 +252,13 @@ dbSyncProtocols trce env plugin metrics queryVar runDBThreadFunction version cod
actionQueue <- newDbActionQueue

race_
(runDBThreadFunction trce env plugin metrics actionQueue)
(runDBThreadFunction metricsLayer trce env plugin actionQueue)
(runPipelinedPeer
localChainSyncTracer
(cChainSyncCodec codecs)
channel
(chainSyncClientPeerPipelined
$ chainSyncClient (envDataLayer env) trce env queryVar metrics latestPoints currentTip actionQueue)
$ chainSyncClient (envDataLayer env) metricsLayer trce env queryVar latestPoints currentTip actionQueue)
)

atomically $ writeDbActionQueue actionQueue DbFinish
Expand Down Expand Up @@ -337,15 +337,15 @@ getCurrentTipBlockNo dataLayer = do
--
chainSyncClient
:: SyncDataLayer
-> MetricsLayer
-> Trace IO Text
-> SyncEnv
-> StateQueryTMVar CardanoBlock (Interpreter (CardanoEras StandardCrypto))
-> Metrics
-> [Point CardanoBlock]
-> WithOrigin BlockNo
-> DbActionQueue
-> ChainSyncClientPipelined CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
chainSyncClient dataLayer trce env queryVar metrics latestPoints currentTip actionQueue = do
chainSyncClient dataLayer metricsLayer trce env queryVar latestPoints currentTip actionQueue = do
ChainSyncClientPipelined $ pure $
-- Notify the core node about the our latest points at which we are
-- synchronised. This client is not persistent and thus it just
Expand All @@ -360,6 +360,12 @@ chainSyncClient dataLayer trce env queryVar metrics latestPoints currentTip acti
where
policy = pipelineDecisionLowHighMark 1000 10000

setNodeBlockHeight :: Word64 -> IO ()
setNodeBlockHeight = mlSetNodeBlockHeight metricsLayer

setDbQueueLength :: Natural -> IO ()
setDbQueueLength = mlSetDbQueueLength metricsLayer

go :: MkPipelineDecision -> Nat n -> WithOrigin BlockNo -> WithOrigin BlockNo
-> ClientPipelinedStIdle n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
go mkPipelineDecision n clientTip serverTip =
Expand Down Expand Up @@ -387,12 +393,18 @@ chainSyncClient dataLayer trce env queryVar metrics latestPoints currentTip acti
ClientStNext
{ recvMsgRollForward = \blk tip ->
logException trce "recvMsgRollForward: " $ do
Gauge.set (withOrigin 0 (fromIntegral . unBlockNo) (getTipBlockNo tip)) (mNodeHeight metrics)

-- Set the metrics for remote node block height.
setNodeBlockHeight (withOrigin 0 unBlockNo (getTipBlockNo tip))

details <- getSlotDetails trce env queryVar (cardanoBlockSlotNo blk)
newSize <- atomically $ do
writeDbActionQueue actionQueue $ mkDbApply blk details
lengthDbActionQueue actionQueue
Gauge.set (fromIntegral newSize) $ mQueuePostWrite metrics

-- Set the metrics for the size of the db queue, checking for congestion.
setDbQueueLength newSize

pure $ finish (At (blockNo blk)) tip
, recvMsgRollBackward = \point tip ->
logException trce "recvMsgRollBackward: " $ do
Expand Down

0 comments on commit 7259739

Please sign in to comment.