Skip to content

Commit

Permalink
Add healthcheck to chain-indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Mar 16, 2023
1 parent e2a0f07 commit c293054
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 8 deletions.
Expand Up @@ -13,6 +13,8 @@ module Language.Marlowe.Runtime.ChainIndexer

import Cardano.Api (CardanoMode, ChainPoint(..), ChainTip(..), LocalNodeClientProtocolsInMode)
import Control.Concurrent.Component
import Control.Concurrent.Component.Probes
import Control.Concurrent.STM (atomically)
import Data.Aeson (Value(..), object, (.=))
import Data.Time (NominalDiffTime)
import Language.Marlowe.Runtime.Cardano.Api
Expand Down Expand Up @@ -54,6 +56,7 @@ data ChainIndexerDependencies r = ChainIndexerDependencies
, persistRateLimit :: !NominalDiffTime
, genesisBlock :: !GenesisBlock
, eventBackend :: !(EventBackend IO r ChainIndexerSelector)
, httpPort :: Int
}

chainIndexer :: Component IO (ChainIndexerDependencies r) ()
Expand All @@ -67,7 +70,7 @@ chainIndexer = proc ChainIndexerDependencies{..} -> do
, eventBackend = narrowEventBackend (injectSelector NodeClientEvent) eventBackend
}
let rateLimit = persistRateLimit
chainStore -< ChainStoreDependencies
ready <- chainStore -< ChainStoreDependencies
{ commitRollback
, commitBlocks
, rateLimit
Expand All @@ -77,6 +80,14 @@ chainIndexer = proc ChainIndexerDependencies{..} -> do
, commitGenesisBlock
, eventBackend = narrowEventBackend (injectSelector ChainStoreEvent) eventBackend
}
probeServer -< ProbeServerDependencies
{ probes = Probes
{ liveness = atomically connected
, startup = pure True
, readiness = atomically ready
}
, port = httpPort
}

getChainIndexerSelectorConfig :: GetSelectorConfig ChainIndexerSelector
getChainIndexerSelectorConfig = \case
Expand Down
Expand Up @@ -110,8 +110,9 @@ data NodeClientDependencies r = NodeClientDependencies
}

-- | The public API of the NodeClient component.
newtype NodeClient = NodeClient
data NodeClient = NodeClient
{ getChanges :: STM Changes -- ^ An STM action that atomically reads and clears the current change set.
, connected :: STM Bool
}

data NodeClientSelector f where
Expand All @@ -136,6 +137,7 @@ data RollBackwardField
nodeClient :: Component IO (NodeClientDependencies r) NodeClient
nodeClient = component \NodeClientDependencies{..} -> do
changesVar <- newTVar emptyChanges
connectedVar <- newTVar False

let
getChanges :: STM Changes
Expand All @@ -156,16 +158,20 @@ nodeClient = component \NodeClientDependencies{..} -> do
let ChainSyncClientPipelined client = pipelinedClient'
in LocalChainSyncClientPipelined $ ChainSyncClientPipelined do
finalize ev Nothing
atomically $ writeTVar connectedVar True
client
, localTxSubmissionClient = Nothing
, localTxMonitoringClient = Nothing
, localStateQueryClient = Nothing
}
atomically $ writeTVar connectedVar False
case result of
Left ex -> finalize ev (Just ex) *> throw ex
Right _ -> pure ()

pure (runNodeClient, NodeClient { getChanges })
connected = readTVar connectedVar

pure (runNodeClient, NodeClient { getChanges, connected })

blockHeaderToBlockNo :: BlockHeader -> BlockNo
blockHeaderToBlockNo (BlockHeader _ _ blockNo) = blockNo
Expand Down
Expand Up @@ -10,8 +10,9 @@ module Language.Marlowe.Runtime.ChainIndexer.Store

import Cardano.Api (ChainPoint(..), ChainTip(..))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM (STM, atomically, newTVar, readTVar)
import Control.Concurrent.STM.Delay (Delay, newDelay, waitDelay)
import Control.Concurrent.STM.TVar (writeTVar)
import Control.Monad (guard, unless, when)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Maybe (MaybeT(..))
Expand Down Expand Up @@ -49,8 +50,9 @@ data ChainStoreDependencies r = ChainStoreDependencies
}

-- | Create a ChainStore component.
chainStore :: Component IO (ChainStoreDependencies r) ()
chainStore = component_ \ChainStoreDependencies{..} -> do
chainStore :: Component IO (ChainStoreDependencies r) (STM Bool)
chainStore = component \ChainStoreDependencies{..} -> do
readyVar <- newTVar False
let
awaitChanges :: Maybe Delay -> STM Changes
awaitChanges delay = do
Expand All @@ -70,6 +72,7 @@ chainStore = component_ \ChainStoreDependencies{..} -> do
Just dbGenesisBlock -> unless (dbGenesisBlock == genesisBlock) do
fail "Existing genesis block does not match computed genesis block"
Nothing -> runCommitGenesisBlock commitGenesisBlock genesisBlock
atomically $ writeTVar readyVar True
go Nothing
where
go lastWrite = do
Expand All @@ -96,4 +99,4 @@ chainStore = component_ \ChainStoreDependencies{..} -> do
let delayMicroseconds = floor $ 1_000_000 * nominalDiffTimeToSeconds delay
lift $ newDelay delayMicroseconds

runChainStore
pure (runChainStore, readTVar readyVar)
3 changes: 2 additions & 1 deletion marlowe-chain-sync/marlowe-chain-indexer/Main.hs
Expand Up @@ -38,7 +38,7 @@ main = run =<< getOptions "0.0.0.0"

run :: Options -> IO ()
run Options{..} = do
pool <- Pool.acquire 100 (Just $ 5000000) (fromString databaseUri)
pool <- Pool.acquire 100 (Just 5000000) (fromString databaseUri)
genesisConfigResult <- runExceptT do
hash <- ExceptT $ pure $ decodeAbstractHash genesisConfigHash
(hash,) <$> withExceptT
Expand All @@ -58,6 +58,7 @@ run Options{..} = do
, maxCost
, costModel
, eventBackend = narrowEventBackend (injectSelector App) eventBackend
, httpPort
}
loggerDependencies = LoggerDependencies
{ configFilePath = logConfigFile
Expand Down
14 changes: 14 additions & 0 deletions marlowe-chain-sync/marlowe-chain-indexer/Options.hs
Expand Up @@ -21,6 +21,7 @@ data Options = Options
, costModel :: !CostModel
, maxCost :: !Int
, logConfigFile :: !(Maybe FilePath)
, httpPort :: !Int
} deriving (Show, Eq)

getOptions :: String -> IO Options
Expand Down Expand Up @@ -70,6 +71,7 @@ parseOptions defaultNetworkId defaultSocketPath defaultDatabaseUri version = O.i
<*> costModelParser
<*> maxCostParser
<*> logConfigFileParser
<*> httpPortParser
)
where
versionOption :: O.Parser (a -> a)
Expand Down Expand Up @@ -146,6 +148,18 @@ parseOptions defaultNetworkId defaultSocketPath defaultDatabaseUri version = O.i
, O.help "Testnet network ID magic. Defaults to the CARDANO_TESTNET_MAGIC environment variable."
]

httpPortParser :: O.Parser Int
httpPortParser = O.option O.auto options
where
options :: O.Mod O.OptionFields Int
options = mconcat
[ O.long "http-port"
, O.metavar "PORT_NUMBER"
, O.help "Port number to serve the http healthcheck API on"
, O.value 8080
, O.showDefault
]

costModelParser :: O.Parser CostModel
costModelParser = CostModel <$> blockCostParser <*> txCostParser

Expand Down

0 comments on commit c293054

Please sign in to comment.