Skip to content

Commit

Permalink
Port chainseek to async-components
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Nov 24, 2022
1 parent 9f3cd48 commit 844c24a
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 147 deletions.
41 changes: 36 additions & 5 deletions async-components/src/Control/Concurrent/Component.hs
Expand Up @@ -7,12 +7,14 @@ module Control.Concurrent.Component
import Control.Applicative (liftA2)
import Control.Arrow
import Control.Category
import Control.Concurrent.Async.Lifted (Async, Concurrently(..), waitCatchSTM, withAsync)
import Control.Concurrent.Async.Lifted (Async, Concurrently(..), wait, waitCatchSTM, waitEitherCatch, withAsync)
import Control.Concurrent.STM
import Control.Exception (SomeException, throwIO)
import Control.Monad (join)
import Control.Monad.Base (MonadBase(liftBase))
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.Trans.Control (MonadBaseControl(StM))
import qualified Data.Bifunctor as B
import Prelude hiding ((.))

newtype Component m a b = Component { unComponent :: a -> STM (Concurrently m (), b) }
Expand Down Expand Up @@ -101,12 +103,41 @@ runComponent c a = do
(run, b) <- unComponent c a
pure (runConcurrently run, b)

runComponent_ :: MonadBaseControl IO m => Component m a () -> a -> m ()
runComponent_ c a = fst =<< liftBase (atomically $ runComponent c a)

withComponent :: MonadBaseControl IO m => Component m a b -> a -> (b -> Async (StM m ()) -> m c) -> m c
withComponent c a f = do
(Concurrently run, b) <- liftBase $ atomically $ unComponent c a
withAsync run $ f b

component :: (a -> STM b) -> (a -> b -> m ()) -> Component m a b
component initialize run = Component \a -> do
b <- initialize a
pure (Concurrently $ run a b, b)
withComponent_ :: MonadBaseControl IO m => Component m a () -> a -> (Async (StM m ()) -> m c) -> m c
withComponent_ c a = withComponent c a . const

component :: (a -> STM (m (), b)) -> Component m a b
component run = Component $ (fmap . B.first) Concurrently . run

serverComponent
:: forall m a b
. MonadBaseControl IO m
=> Component m b ()
-> (SomeException -> m ())
-> m ()
-> (a -> m b)
-> Component m a ()
serverComponent worker onWorkerError onWorkerTerminated accept = component \a ->
let
run :: m ()
run = do
b <- accept a
withComponent_ worker b \aworker ->
withAsync run \aserver -> do
result <- waitEitherCatch aworker aserver
case result of
Right (Left ex) -> liftBase $ throwIO ex
Right (Right x) -> pure x
Left (Left ex) -> onWorkerError ex
Left (Right ()) -> onWorkerTerminated
wait aserver
in
pure (run, ())
10 changes: 5 additions & 5 deletions marlowe-chain-sync/app/Main.hs
Expand Up @@ -16,8 +16,9 @@ import qualified Cardano.Api as Cardano
import Cardano.Api.Byron (toByronRequiresNetworkMagic)
import qualified Cardano.Chain.Genesis as Byron
import Cardano.Crypto (abstractHashToBytes, decodeAbstractHash)
import Control.Concurrent.STM (atomically)
import Control.Exception (bracket, bracketOnError, throwIO)
import Control.Concurrent.Component
import Control.Concurrent.STM (atomically, modifyTVar, newTVarIO, readTVar)
import Control.Exception (bracket, bracketOnError, finally, throwIO)
import Control.Monad ((<=<))
import Control.Monad.Trans.Except (ExceptT(ExceptT), runExceptT, withExceptT)
import Data.String (IsString(fromString))
Expand All @@ -26,7 +27,7 @@ import Data.Time (secondsToNominalDiffTime)
import Hasql.Pool (UsageError(..))
import qualified Hasql.Pool as Pool
import qualified Hasql.Session as Session
import Language.Marlowe.Runtime.ChainSync (ChainSync(..), ChainSyncDependencies(..), mkChainSync)
import Language.Marlowe.Runtime.ChainSync (ChainSyncDependencies(..), chainSync)
import Language.Marlowe.Runtime.ChainSync.Api (WithGenesis(..), codecChainSeek)
import Language.Marlowe.Runtime.ChainSync.Database (hoistDatabaseQueries)
import qualified Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL as PostgreSQL
Expand Down Expand Up @@ -74,7 +75,7 @@ run Options{..} = withSocketsDo do
(Byron.mkConfigFromFile (toByronRequiresNetworkMagic networkId) genesisConfigFile hash)
(hash, genesisConfig) <- either (fail . unpack) pure genesisConfigResult
let genesisBlock = computeByronGenesisBlock (abstractHashToBytes hash) genesisConfig
chainSync <- atomically $ mkChainSync ChainSyncDependencies
runComponent_ chainSync ChainSyncDependencies
{ connectToLocalNode = Cardano.connectToLocalNode localNodeConnectInfo
, databaseQueries = hoistDatabaseQueries
(either throwUsageError pure <=< Pool.use pool)
Expand All @@ -95,7 +96,6 @@ run Options{..} = withSocketsDo do
, maxCost
, costModel
}
runChainSync chainSync
where
throwUsageError (ConnectionError err) = error $ show err
throwUsageError (SessionError (Session.QueryError _ _ err)) = error $ show err
Expand Down
3 changes: 2 additions & 1 deletion marlowe-chain-sync/marlowe-chain-sync.cabal
Expand Up @@ -73,7 +73,7 @@ library
build-depends:
base >= 4.9 && < 5
, aeson
, async
, async-components
, base16
, binary
, bytestring
Expand Down Expand Up @@ -121,6 +121,7 @@ executable chainseekd
build-depends:
base >= 4.9 && < 5
, aeson
, async-components
, base16
, bytestring
, cardano-api
Expand Down
51 changes: 17 additions & 34 deletions marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync.hs
@@ -1,31 +1,28 @@
{-# LANGUAGE Arrows #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}

module Language.Marlowe.Runtime.ChainSync
( ChainSync(..)
, ChainSyncDependencies(..)
, mkChainSync
( ChainSyncDependencies(..)
, chainSync
) where

import Cardano.Api (CardanoEra, CardanoMode, LocalNodeClientProtocolsInMode, Tx, TxValidationErrorInMode)
import qualified Cardano.Api as Cardano
import Control.Concurrent.Async (concurrently_)
import Control.Concurrent.STM (STM)
import Control.Monad (unless)
import Control.Concurrent.Component
import Data.Time (NominalDiffTime)
import Language.Marlowe.Runtime.ChainSync.Database (CommitGenesisBlock(..), DatabaseQueries(..), GetGenesisBlock(..))
import Language.Marlowe.Runtime.ChainSync.Database (DatabaseQueries(..))
import Language.Marlowe.Runtime.ChainSync.Genesis (GenesisBlock)
import Language.Marlowe.Runtime.ChainSync.JobServer
(ChainSyncJobServer(..), ChainSyncJobServerDependencies(..), RunJobServer, mkChainSyncJobServer)
import Language.Marlowe.Runtime.ChainSync.NodeClient
(CostModel, NodeClient(..), NodeClientDependencies(..), mkNodeClient)
(ChainSyncJobServerDependencies(..), RunJobServer, chainSyncJobServer)
import Language.Marlowe.Runtime.ChainSync.NodeClient (CostModel, NodeClient(..), NodeClientDependencies(..), nodeClient)
import Language.Marlowe.Runtime.ChainSync.QueryServer
(ChainSyncQueryServer(..), ChainSyncQueryServerDependencies(..), RunQueryServer, mkChainSyncQueryServer)
(ChainSyncQueryServerDependencies(..), RunQueryServer, chainSyncQueryServer)
import Language.Marlowe.Runtime.ChainSync.Server
(ChainSyncServer(..), ChainSyncServerDependencies(..), RunChainSeekServer, mkChainSyncServer)
import Language.Marlowe.Runtime.ChainSync.Store (ChainStore(..), ChainStoreDependencies(..), mkChainStore)
(ChainSyncServerDependencies(..), RunChainSeekServer, chainSyncServer)
import Language.Marlowe.Runtime.ChainSync.Store (ChainStore(..), ChainStoreDependencies(..), chainStore)
import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure)
import Ouroboros.Network.Protocol.LocalTxSubmission.Client (SubmitResult)

Expand All @@ -51,26 +48,12 @@ data ChainSyncDependencies = ChainSyncDependencies
-> IO (SubmitResult (TxValidationErrorInMode CardanoMode))
}

newtype ChainSync = ChainSync { runChainSync :: IO () }

mkChainSync :: ChainSyncDependencies -> STM ChainSync
mkChainSync ChainSyncDependencies{..} = do
chainSync :: Component IO ChainSyncDependencies ()
chainSync = proc ChainSyncDependencies{..} -> do
let DatabaseQueries{..} = databaseQueries
NodeClient{..} <- mkNodeClient NodeClientDependencies{..}
NodeClient{..} <- nodeClient -< NodeClientDependencies{..}
let rateLimit = persistRateLimit
ChainStore{..} <- mkChainStore ChainStoreDependencies{..}
ChainSyncServer{..} <- mkChainSyncServer ChainSyncServerDependencies{..}
ChainSyncQueryServer{..} <- mkChainSyncQueryServer ChainSyncQueryServerDependencies{..}
ChainSyncJobServer{..} <- mkChainSyncJobServer ChainSyncJobServerDependencies{..}
pure $ ChainSync do
mDbGenesisBlock <- runGetGenesisBlock getGenesisBlock
case mDbGenesisBlock of
Just dbGenesisBlock -> unless (dbGenesisBlock == genesisBlock) do
fail "Existing genesis block does not match computed genesis block"
Nothing -> runCommitGenesisBlock commitGenesisBlock genesisBlock

runNodeClient
`concurrently_` runChainStore
`concurrently_` runChainSyncServer
`concurrently_` runChainSyncQueryServer
`concurrently_` runChainSyncJobServer
ChainStore{..} <- chainStore -< ChainStoreDependencies{..}
chainSyncServer -< ChainSyncServerDependencies{..}
chainSyncQueryServer -< ChainSyncQueryServerDependencies{..}
chainSyncJobServer -< ChainSyncJobServerDependencies{..}
Expand Up @@ -9,10 +9,7 @@ module Language.Marlowe.Runtime.ChainSync.JobServer
where

import Cardano.Api (CardanoEra(..), CardanoMode, ScriptDataSupportedInEra(..), Tx, TxValidationErrorInMode)
import Control.Concurrent.Async (Concurrently(..))
import Control.Concurrent.STM (STM, atomically)
import Control.Exception (SomeException, catch)
import Data.Void (Void)
import Control.Concurrent.Component
import Language.Marlowe.Runtime.ChainSync.Api (ChainSyncCommand(..))
import Network.Protocol.Driver (RunServer(..))
import Network.Protocol.Job.Server
Expand All @@ -30,22 +27,14 @@ data ChainSyncJobServerDependencies = ChainSyncJobServerDependencies
-> IO (SubmitResult (TxValidationErrorInMode CardanoMode))
}

newtype ChainSyncJobServer = ChainSyncJobServer
{ runChainSyncJobServer :: IO Void
}

mkChainSyncJobServer :: ChainSyncJobServerDependencies -> STM ChainSyncJobServer
mkChainSyncJobServer ChainSyncJobServerDependencies{..} = do
let
runChainSyncJobServer = do
chainSyncJobServer :: Component IO ChainSyncJobServerDependencies ()
chainSyncJobServer = serverComponent
worker
(hPutStrLn stderr . ("Job worker crashed with exception: " <>) . show)
(hPutStrLn stderr "Job client terminated normally")
\ChainSyncJobServerDependencies{..} -> do
runJobServer <- acceptRunJobServer
Worker{..} <- atomically $ mkWorker WorkerDependencies {..}
runConcurrently $
Concurrently (runWorker `catch` catchWorker) *> Concurrently runChainSyncJobServer
pure $ ChainSyncJobServer { runChainSyncJobServer }

catchWorker :: SomeException -> IO ()
catchWorker = hPutStrLn stderr . ("Job worker crashed with exception: " <>) . show
pure WorkerDependencies {..}

data WorkerDependencies = WorkerDependencies
{ runJobServer :: RunJobServer IO
Expand All @@ -56,17 +45,11 @@ data WorkerDependencies = WorkerDependencies
-> IO (SubmitResult (TxValidationErrorInMode CardanoMode))
}

newtype Worker = Worker
{ runWorker :: IO ()
}

mkWorker :: WorkerDependencies -> STM Worker
mkWorker WorkerDependencies{..} =
worker :: Component IO WorkerDependencies ()
worker = component \WorkerDependencies{..} ->
let
RunServer run = runJobServer
in
pure Worker { runWorker = run server }
where

server :: JobServer ChainSyncCommand IO ()
server = liftCommandHandler $ flip either (\case) \case
SubmitTx era tx -> ((),) <$> do
Expand All @@ -78,3 +61,5 @@ mkWorker WorkerDependencies{..} =
pure case result of
SubmitFail err -> Left $ show err
SubmitSuccess -> Right ()
in
pure (run server, ())
Expand Up @@ -8,7 +8,7 @@ module Language.Marlowe.Runtime.ChainSync.NodeClient
, NodeClient(..)
, NodeClientDependencies(..)
, isEmptyChanges
, mkNodeClient
, nodeClient
, toEmptyChanges
) where

Expand Down Expand Up @@ -40,6 +40,7 @@ import Cardano.Api.ChainSync.ClientPipelined
, runPipelineDecision
)
import Control.Arrow ((&&&))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, TVar, atomically, modifyTVar, newTVar, readTVar, writeTVar)
import Control.Monad (guard)
import Data.List (sortOn)
Expand Down Expand Up @@ -101,14 +102,13 @@ data NodeClientDependencies = NodeClientDependencies
}

-- | The public API of the NodeClient component.
data NodeClient = NodeClient
{ runNodeClient :: !(IO ()) -- ^ Run the component in IO.
, getChanges :: !(STM Changes) -- ^ An STM action that atomically reads and clears the current change set.
newtype NodeClient = NodeClient
{ getChanges :: STM Changes -- ^ An STM action that atomically reads and clears the current change set.
}

-- | Create a new NodeClient component.
mkNodeClient :: NodeClientDependencies -> STM NodeClient
mkNodeClient NodeClientDependencies{..} = do
nodeClient :: Component IO NodeClientDependencies NodeClient
nodeClient = component \NodeClientDependencies{..} -> do
changesVar <- newTVar emptyChanges

let
Expand All @@ -130,7 +130,7 @@ mkNodeClient NodeClientDependencies{..} = do
, localStateQueryClient = Nothing
}

pure NodeClient { runNodeClient, getChanges }
pure (runNodeClient, NodeClient { getChanges })

blockHeaderToBlockNo :: BlockHeader -> BlockNo
blockHeaderToBlockNo (BlockHeader _ _ blockNo) = blockNo
Expand Down
Expand Up @@ -22,9 +22,7 @@ import Cardano.Api
, toEraInMode
)
import qualified Cardano.Api as Cardano
import Control.Concurrent.Async (Concurrently(Concurrently, runConcurrently))
import Control.Concurrent.STM (STM, atomically)
import Control.Exception (SomeException, catch)
import Control.Concurrent.Component
import Control.Monad.Trans.Except (ExceptT(ExceptT), except, runExceptT, throwE, withExceptT)
import Data.Bifunctor (bimap, first)
import Data.Void (Void, absurd)
Expand All @@ -49,22 +47,14 @@ data ChainSyncQueryServerDependencies = ChainSyncQueryServerDependencies
, getUTxOs :: !(Database.GetUTxOs IO)
}

newtype ChainSyncQueryServer = ChainSyncQueryServer
{ runChainSyncQueryServer :: IO Void
}

mkChainSyncQueryServer :: ChainSyncQueryServerDependencies -> STM ChainSyncQueryServer
mkChainSyncQueryServer ChainSyncQueryServerDependencies{..} = do
let
runChainSyncQueryServer = do
chainSyncQueryServer :: Component IO ChainSyncQueryServerDependencies ()
chainSyncQueryServer = serverComponent
worker
(hPutStrLn stderr . ("Query worker crashed with exception: " <>) . show)
(hPutStrLn stderr "Query client terminated normally")
\ChainSyncQueryServerDependencies{..} -> do
runQueryServer <- acceptRunQueryServer
Worker{..} <- atomically $ mkWorker WorkerDependencies {..}
runConcurrently $
Concurrently (runWorker `catch` catchWorker) *> Concurrently runChainSyncQueryServer
pure $ ChainSyncQueryServer { runChainSyncQueryServer }

catchWorker :: SomeException -> IO ()
catchWorker = hPutStrLn stderr . ("Query worker crashed with exception: " <>) . show
pure WorkerDependencies {..}

data WorkerDependencies = WorkerDependencies
{ runQueryServer :: RunQueryServer IO
Expand All @@ -76,18 +66,11 @@ data WorkerDependencies = WorkerDependencies
, getUTxOs :: !(Database.GetUTxOs IO)
}

newtype Worker = Worker
{ runWorker :: IO ()
}

mkWorker :: WorkerDependencies -> STM Worker
mkWorker WorkerDependencies{..} =
worker :: Component IO WorkerDependencies ()
worker = component \WorkerDependencies{..} ->
let
RunServer run = runQueryServer
in
pure Worker { runWorker = run server }

where
server :: QueryServer ChainSyncQuery IO ()
server = QueryServer $ pure $ ServerStInit \case
GetSlotConfig -> queryGenesisParameters extractSlotConfig
Expand Down Expand Up @@ -139,3 +122,5 @@ mkWorker WorkerDependencies{..} =
withExceptT (const ()) $ except result

extractSlotConfig GenesisParameters{..} = SlotConfig protocolParamSystemStart protocolParamSlotLength
in
pure (run server, ())

0 comments on commit 844c24a

Please sign in to comment.