diff --git a/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer.hs b/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer.hs index cd70ad8d7e..b8df460cef 100644 --- a/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer.hs +++ b/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer.hs @@ -13,6 +13,8 @@ module Language.Marlowe.Runtime.ChainIndexer import Cardano.Api (CardanoMode, ChainPoint(..), ChainTip(..), LocalNodeClientProtocolsInMode) import Control.Concurrent.Component +import Control.Concurrent.Component.Probes +import Control.Concurrent.STM (atomically) import Data.Aeson (Value(..), object, (.=)) import Data.Time (NominalDiffTime) import Language.Marlowe.Runtime.Cardano.Api @@ -54,6 +56,7 @@ data ChainIndexerDependencies r = ChainIndexerDependencies , persistRateLimit :: !NominalDiffTime , genesisBlock :: !GenesisBlock , eventBackend :: !(EventBackend IO r ChainIndexerSelector) + , httpPort :: Int } chainIndexer :: Component IO (ChainIndexerDependencies r) () @@ -67,7 +70,7 @@ chainIndexer = proc ChainIndexerDependencies{..} -> do , eventBackend = narrowEventBackend (injectSelector NodeClientEvent) eventBackend } let rateLimit = persistRateLimit - chainStore -< ChainStoreDependencies + ready <- chainStore -< ChainStoreDependencies { commitRollback , commitBlocks , rateLimit @@ -77,6 +80,14 @@ chainIndexer = proc ChainIndexerDependencies{..} -> do , commitGenesisBlock , eventBackend = narrowEventBackend (injectSelector ChainStoreEvent) eventBackend } + probeServer -< ProbeServerDependencies + { probes = Probes + { liveness = atomically connected + , startup = pure True + , readiness = atomically ready + } + , port = httpPort + } getChainIndexerSelectorConfig :: GetSelectorConfig ChainIndexerSelector getChainIndexerSelectorConfig = \case diff --git a/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer/NodeClient.hs b/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer/NodeClient.hs index 9bf9bcd091..4552a26bb7 100644 --- a/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer/NodeClient.hs +++ b/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer/NodeClient.hs @@ -110,8 +110,9 @@ data NodeClientDependencies r = NodeClientDependencies } -- | The public API of the NodeClient component. -newtype NodeClient = NodeClient +data NodeClient = NodeClient { getChanges :: STM Changes -- ^ An STM action that atomically reads and clears the current change set. + , connected :: STM Bool } data NodeClientSelector f where @@ -136,6 +137,7 @@ data RollBackwardField nodeClient :: Component IO (NodeClientDependencies r) NodeClient nodeClient = component \NodeClientDependencies{..} -> do changesVar <- newTVar emptyChanges + connectedVar <- newTVar False let getChanges :: STM Changes @@ -156,16 +158,20 @@ nodeClient = component \NodeClientDependencies{..} -> do let ChainSyncClientPipelined client = pipelinedClient' in LocalChainSyncClientPipelined $ ChainSyncClientPipelined do finalize ev Nothing + atomically $ writeTVar connectedVar True client , localTxSubmissionClient = Nothing , localTxMonitoringClient = Nothing , localStateQueryClient = Nothing } + atomically $ writeTVar connectedVar False case result of Left ex -> finalize ev (Just ex) *> throw ex Right _ -> pure () - pure (runNodeClient, NodeClient { getChanges }) + connected = readTVar connectedVar + + pure (runNodeClient, NodeClient { getChanges, connected }) blockHeaderToBlockNo :: BlockHeader -> BlockNo blockHeaderToBlockNo (BlockHeader _ _ blockNo) = blockNo diff --git a/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer/Store.hs b/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer/Store.hs index 3a3825d293..35d4087f7d 100644 --- a/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer/Store.hs +++ b/marlowe-chain-sync/chain-indexer/Language/Marlowe/Runtime/ChainIndexer/Store.hs @@ -10,8 +10,9 @@ module Language.Marlowe.Runtime.ChainIndexer.Store import Cardano.Api (ChainPoint(..), ChainTip(..)) import Control.Concurrent.Component -import Control.Concurrent.STM (STM, atomically) +import Control.Concurrent.STM (STM, atomically, newTVar, readTVar) import Control.Concurrent.STM.Delay (Delay, newDelay, waitDelay) +import Control.Concurrent.STM.TVar (writeTVar) import Control.Monad (guard, unless, when) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Maybe (MaybeT(..)) @@ -49,8 +50,9 @@ data ChainStoreDependencies r = ChainStoreDependencies } -- | Create a ChainStore component. -chainStore :: Component IO (ChainStoreDependencies r) () -chainStore = component_ \ChainStoreDependencies{..} -> do +chainStore :: Component IO (ChainStoreDependencies r) (STM Bool) +chainStore = component \ChainStoreDependencies{..} -> do + readyVar <- newTVar False let awaitChanges :: Maybe Delay -> STM Changes awaitChanges delay = do @@ -70,6 +72,7 @@ chainStore = component_ \ChainStoreDependencies{..} -> do Just dbGenesisBlock -> unless (dbGenesisBlock == genesisBlock) do fail "Existing genesis block does not match computed genesis block" Nothing -> runCommitGenesisBlock commitGenesisBlock genesisBlock + atomically $ writeTVar readyVar True go Nothing where go lastWrite = do @@ -96,4 +99,4 @@ chainStore = component_ \ChainStoreDependencies{..} -> do let delayMicroseconds = floor $ 1_000_000 * nominalDiffTimeToSeconds delay lift $ newDelay delayMicroseconds - runChainStore + pure (runChainStore, readTVar readyVar) diff --git a/marlowe-chain-sync/marlowe-chain-indexer/Main.hs b/marlowe-chain-sync/marlowe-chain-indexer/Main.hs index 2aaad255d3..05f913b83c 100644 --- a/marlowe-chain-sync/marlowe-chain-indexer/Main.hs +++ b/marlowe-chain-sync/marlowe-chain-indexer/Main.hs @@ -38,7 +38,7 @@ main = run =<< getOptions "0.0.0.0" run :: Options -> IO () run Options{..} = do - pool <- Pool.acquire 100 (Just $ 5000000) (fromString databaseUri) + pool <- Pool.acquire 100 (Just 5000000) (fromString databaseUri) genesisConfigResult <- runExceptT do hash <- ExceptT $ pure $ decodeAbstractHash genesisConfigHash (hash,) <$> withExceptT @@ -58,6 +58,7 @@ run Options{..} = do , maxCost , costModel , eventBackend = narrowEventBackend (injectSelector App) eventBackend + , httpPort } loggerDependencies = LoggerDependencies { configFilePath = logConfigFile diff --git a/marlowe-chain-sync/marlowe-chain-indexer/Options.hs b/marlowe-chain-sync/marlowe-chain-indexer/Options.hs index ae09913d83..2a722e5d9c 100644 --- a/marlowe-chain-sync/marlowe-chain-indexer/Options.hs +++ b/marlowe-chain-sync/marlowe-chain-indexer/Options.hs @@ -21,6 +21,7 @@ data Options = Options , costModel :: !CostModel , maxCost :: !Int , logConfigFile :: !(Maybe FilePath) + , httpPort :: !Int } deriving (Show, Eq) getOptions :: String -> IO Options @@ -70,6 +71,7 @@ parseOptions defaultNetworkId defaultSocketPath defaultDatabaseUri version = O.i <*> costModelParser <*> maxCostParser <*> logConfigFileParser + <*> httpPortParser ) where versionOption :: O.Parser (a -> a) @@ -146,6 +148,18 @@ parseOptions defaultNetworkId defaultSocketPath defaultDatabaseUri version = O.i , O.help "Testnet network ID magic. Defaults to the CARDANO_TESTNET_MAGIC environment variable." ] + httpPortParser :: O.Parser Int + httpPortParser = O.option O.auto options + where + options :: O.Mod O.OptionFields Int + options = mconcat + [ O.long "http-port" + , O.metavar "PORT_NUMBER" + , O.help "Port number to serve the http healthcheck API on" + , O.value 8080 + , O.showDefault + ] + costModelParser :: O.Parser CostModel costModelParser = CostModel <$> blockCostParser <*> txCostParser