diff --git a/async-components/src/Control/Concurrent/Component.hs b/async-components/src/Control/Concurrent/Component.hs index 52c8935147..1024cd008a 100644 --- a/async-components/src/Control/Concurrent/Component.hs +++ b/async-components/src/Control/Concurrent/Component.hs @@ -117,6 +117,9 @@ withComponent_ c a = withComponent c a . const component :: (a -> STM (m (), b)) -> Component m a b component run = Component $ (fmap . B.first) Concurrently . run +component_ :: (a -> m ()) -> Component m a () +component_ = component . fmap (pure . (,())) + serverComponent :: forall m a b . MonadBaseControl IO m @@ -125,7 +128,7 @@ serverComponent -> m () -> (a -> m b) -> Component m a () -serverComponent worker onWorkerError onWorkerTerminated accept = component \a -> +serverComponent worker onWorkerError onWorkerTerminated accept = component_ \a -> let run :: m () run = do @@ -140,4 +143,4 @@ serverComponent worker onWorkerError onWorkerTerminated accept = component \a -> Left (Right ()) -> onWorkerTerminated wait aserver in - pure (run, ()) + run diff --git a/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/JobServer.hs b/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/JobServer.hs index 2eed490071..f9226094b1 100644 --- a/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/JobServer.hs +++ b/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/JobServer.hs @@ -46,7 +46,7 @@ data WorkerDependencies = WorkerDependencies } worker :: Component IO WorkerDependencies () -worker = component \WorkerDependencies{..} -> +worker = component_ \WorkerDependencies{..} -> do let RunServer run = runJobServer @@ -61,5 +61,4 @@ worker = component \WorkerDependencies{..} -> pure case result of SubmitFail err -> Left $ show err SubmitSuccess -> Right () - in - pure (run server, ()) + run server diff --git a/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/QueryServer.hs b/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/QueryServer.hs index a3903ce6b4..f5b104acff 100644 --- a/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/QueryServer.hs +++ b/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/QueryServer.hs @@ -67,7 +67,7 @@ data WorkerDependencies = WorkerDependencies } worker :: Component IO WorkerDependencies () -worker = component \WorkerDependencies{..} -> +worker = component_ \WorkerDependencies{..} -> do let RunServer run = runQueryServer @@ -122,5 +122,4 @@ worker = component \WorkerDependencies{..} -> withExceptT (const ()) $ except result extractSlotConfig GenesisParameters{..} = SlotConfig protocolParamSystemStart protocolParamSlotLength - in - pure (run server, ()) + run server diff --git a/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/Server.hs b/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/Server.hs index dcd14b4215..e5e0723f47 100644 --- a/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/Server.hs +++ b/marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/Server.hs @@ -55,7 +55,7 @@ data WorkerDependencies = WorkerDependencies } worker :: Component IO WorkerDependencies () -worker = component \WorkerDependencies{..} -> do +worker = component_ \WorkerDependencies{..} -> do let RunServer runServer = runChainSeekServer runWorker = void $ runServer server @@ -95,4 +95,4 @@ worker = component \WorkerDependencies{..} -> do , recvMsgDone = pure () } - pure (runWorker, ()) + runWorker diff --git a/marlowe-runtime/marlowe-history/Main.hs b/marlowe-runtime/marlowe-history/Main.hs index 7a470d7177..46f64214aa 100644 --- a/marlowe-runtime/marlowe-history/Main.hs +++ b/marlowe-runtime/marlowe-history/Main.hs @@ -3,6 +3,7 @@ module Main where +import Control.Concurrent.Component import Control.Concurrent.STM (atomically) import Control.Exception (bracket, bracketOnError, throwIO) import Data.Either (fromRight) @@ -11,7 +12,7 @@ import Language.Marlowe.Protocol.Sync.Codec (codecMarloweSync) import Language.Marlowe.Protocol.Sync.Server (marloweSyncServerPeer) import Language.Marlowe.Runtime.ChainSync.Api (ChainSyncQuery(..), RuntimeChainSeekClient, WithGenesis(..), runtimeChainSeekCodec) -import Language.Marlowe.Runtime.History (History(..), HistoryDependencies(..), mkHistory) +import Language.Marlowe.Runtime.History (HistoryDependencies(..), history) import Language.Marlowe.Runtime.History.Api (historyJobCodec, historyQueryCodec) import Language.Marlowe.Runtime.History.Store (hoistHistoryQueries) import Language.Marlowe.Runtime.History.Store.Memory (mkHistoryQueriesInMemory) @@ -84,10 +85,8 @@ run Options{..} = withSocketsDo do acceptRunQueryServer = acceptRunServerPeerOverSocket throwIO querySocket historyQueryCodec queryServerPeer acceptRunSyncServer = acceptRunServerPeerOverSocket throwIO syncSocket codecMarloweSync marloweSyncServerPeer let followerPageSize = 1024 -- TODO move to config with a default - History{..} <- atomically do - historyQueries <- hoistHistoryQueries atomically <$> mkHistoryQueriesInMemory - mkHistory HistoryDependencies{..} - runHistory + historyQueries <- atomically $ hoistHistoryQueries atomically <$> mkHistoryQueriesInMemory + runComponent_ history HistoryDependencies{..} where openServer addr = bracketOnError (openSocket addr) close \socket -> do setSocketOption socket ReuseAddr 1 diff --git a/marlowe-runtime/marlowe-runtime.cabal b/marlowe-runtime/marlowe-runtime.cabal index ddfc373485..d0a4e1c914 100644 --- a/marlowe-runtime/marlowe-runtime.cabal +++ b/marlowe-runtime/marlowe-runtime.cabal @@ -281,7 +281,6 @@ executable marlowe aeson , base >= 4.9 && < 5 , ansi-terminal - , async , base16 , bytestring , cardano-api @@ -333,7 +332,7 @@ executable marlowe-history build-depends: base >= 4.9 && < 5 , ansi-terminal - , async + , async-components , base16 , containers , marlowe @@ -359,7 +358,7 @@ executable marlowe-discovery build-depends: base >= 4.9 && < 5 , ansi-terminal - , async + , async-components , base16 , containers , marlowe @@ -384,7 +383,6 @@ executable marlowe-tx Paths_marlowe_runtime build-depends: ansi-terminal - , async , base >= 4.9 && < 5 , base16 , bytestring diff --git a/marlowe-runtime/src/Language/Marlowe/Runtime/History.hs b/marlowe-runtime/src/Language/Marlowe/Runtime/History.hs index f2b0a084ce..053afa1545 100644 --- a/marlowe-runtime/src/Language/Marlowe/Runtime/History.hs +++ b/marlowe-runtime/src/Language/Marlowe/Runtime/History.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE Arrows #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecursiveDo #-} @@ -6,17 +7,14 @@ module Language.Marlowe.Runtime.History where -import Control.Concurrent.Async (Concurrently(..)) -import Control.Concurrent.STM (STM) -import Data.Foldable (asum) +import Control.Concurrent.Component import Language.Marlowe.Runtime.ChainSync.Api (RuntimeChainSeekClient, SlotConfig) import Language.Marlowe.Runtime.History.FollowerSupervisor import Language.Marlowe.Runtime.History.JobServer import Language.Marlowe.Runtime.History.QueryServer import Language.Marlowe.Runtime.History.Store - (HistoryQueries, HistoryStore(..), HistoryStoreDependencies(..), mkHistoryStore) -import Language.Marlowe.Runtime.History.SyncServer - (HistorySyncServer(..), HistorySyncServerDependencies(..), RunSyncServer, mkHistorySyncServer) + (HistoryQueries, HistoryStore(..), HistoryStoreDependencies(..), historyStore) +import Language.Marlowe.Runtime.History.SyncServer (HistorySyncServerDependencies(..), RunSyncServer, historySyncServer) import Numeric.Natural (Natural) data HistoryDependencies = HistoryDependencies @@ -30,23 +28,10 @@ data HistoryDependencies = HistoryDependencies , historyQueries :: HistoryQueries IO } -newtype History = History - { runHistory :: IO () - } - -mkHistory :: HistoryDependencies -> STM History -mkHistory HistoryDependencies{..} = do - FollowerSupervisor{..} <- mkFollowerSupervisor FollowerSupervisorDependencies{..} - HistoryJobServer{..} <- mkHistoryJobServer HistoryJobServerDependencies{..} - HistoryQueryServer{..} <- mkHistoryQueryServer HistoryQueryServerDependencies{..} - HistoryStore{..} <- mkHistoryStore HistoryStoreDependencies{..} - HistorySyncServer{..} <- mkHistorySyncServer HistorySyncServerDependencies{..} - pure History - { runHistory = runConcurrently $ asum $ Concurrently <$> - [ runFollowerSupervisor - , runHistoryJobServer - , runHistoryQueryServer - , runHistoryStore - , runHistorySyncServer - ] - } +history :: Component IO HistoryDependencies () +history = proc HistoryDependencies{..} -> do + FollowerSupervisor{..} <- followerSupervisor -< FollowerSupervisorDependencies{..} + historyJobServer -< HistoryJobServerDependencies{..} + historyQueryServer -< HistoryQueryServerDependencies{..} + HistoryStore{..} <- historyStore -< HistoryStoreDependencies{..} + historySyncServer -< HistorySyncServerDependencies{..} diff --git a/marlowe-runtime/src/Language/Marlowe/Runtime/History/FollowerSupervisor.hs b/marlowe-runtime/src/Language/Marlowe/Runtime/History/FollowerSupervisor.hs index 1b5e08fc6a..8803b60276 100644 --- a/marlowe-runtime/src/Language/Marlowe/Runtime/History/FollowerSupervisor.hs +++ b/marlowe-runtime/src/Language/Marlowe/Runtime/History/FollowerSupervisor.hs @@ -3,7 +3,8 @@ module Language.Marlowe.Runtime.History.FollowerSupervisor where -import Control.Concurrent.Async (Concurrently(Concurrently, runConcurrently)) +import Control.Concurrent.Async (Concurrently(..)) +import Control.Concurrent.Component import Control.Concurrent.STM (STM, atomically, modifyTVar, newTVar, readTVar, writeTVar) import Control.Monad (guard, mfilter, when, (<=<)) import Data.Foldable (sequenceA_) @@ -38,11 +39,10 @@ data FollowerSupervisor = FollowerSupervisor , stopFollowingContract :: ContractId -> STM Bool , followerStatuses :: STM (Map ContractId FollowerStatus) , changes :: STM (Map ContractId UpdateContract) - , runFollowerSupervisor :: IO () } -mkFollowerSupervisor :: FollowerSupervisorDependencies -> STM FollowerSupervisor -mkFollowerSupervisor FollowerSupervisorDependencies{..} = do +followerSupervisor :: Component IO FollowerSupervisorDependencies FollowerSupervisor +followerSupervisor = component \FollowerSupervisorDependencies{..} -> do followersVar <- newTVar Map.empty followerActivationsVar <- newTVar Map.empty seenVar <- newTVar Set.empty @@ -153,4 +153,4 @@ mkFollowerSupervisor FollowerSupervisorDependencies{..} = do $ sequenceA_ $ Concurrently <$> (runFollowerSupervisor : (uncurry runFollowerWithCleanup <$> newFollowers)) - pure FollowerSupervisor {..} + pure (runFollowerSupervisor, FollowerSupervisor {..}) diff --git a/marlowe-runtime/src/Language/Marlowe/Runtime/History/JobServer.hs b/marlowe-runtime/src/Language/Marlowe/Runtime/History/JobServer.hs index 89427b7ede..af0e7542bd 100644 --- a/marlowe-runtime/src/Language/Marlowe/Runtime/History/JobServer.hs +++ b/marlowe-runtime/src/Language/Marlowe/Runtime/History/JobServer.hs @@ -8,9 +8,8 @@ module Language.Marlowe.Runtime.History.JobServer where -import Control.Concurrent.Async (Concurrently(Concurrently, runConcurrently)) +import Control.Concurrent.Component import Control.Concurrent.STM (STM, atomically, retry) -import Control.Exception (SomeException, catch) import Data.Map (Map) import qualified Data.Map as Map import Language.Marlowe.Runtime.Core.Api @@ -28,22 +27,14 @@ data HistoryJobServerDependencies = HistoryJobServerDependencies , followerStatuses :: STM (Map ContractId FollowerStatus) } -newtype HistoryJobServer = HistoryJobServer - { runHistoryJobServer :: IO () - } - -mkHistoryJobServer :: HistoryJobServerDependencies -> STM HistoryJobServer -mkHistoryJobServer HistoryJobServerDependencies{..} = do - let - runHistoryJobServer = do +historyJobServer :: Component IO HistoryJobServerDependencies () +historyJobServer = serverComponent + worker + (hPutStrLn stderr . ("Job worker crashed with exception: " <>) . show) + (hPutStrLn stderr "Job client terminated normally") + \HistoryJobServerDependencies{..} -> do runJobServer <- acceptRunJobServer - Worker{..} <- atomically $ mkWorker WorkerDependencies {..} - runConcurrently $ - Concurrently (runWorker `catch` catchWorker) *> Concurrently runHistoryJobServer - pure $ HistoryJobServer { runHistoryJobServer } - -catchWorker :: SomeException -> IO () -catchWorker = hPutStrLn stderr . ("Job worker crashed with exception: " <>) . show + pure WorkerDependencies {..} data WorkerDependencies = WorkerDependencies { runJobServer :: RunJobServer IO @@ -52,18 +43,11 @@ data WorkerDependencies = WorkerDependencies , followerStatuses :: STM (Map ContractId FollowerStatus) } -newtype Worker = Worker - { runWorker :: IO () - } - -mkWorker :: WorkerDependencies -> STM Worker -mkWorker WorkerDependencies{..} = +worker :: Component IO WorkerDependencies () +worker = component_ \WorkerDependencies{..} -> let RunServer run = runJobServer - in - pure Worker { runWorker = run server } - where server :: RuntimeHistoryJobServer IO () server = liftCommandHandler $ fmap ((),) . \case Left (FollowContract contractId) -> do @@ -79,3 +63,5 @@ mkWorker WorkerDependencies{..} = else pure $ Right False Left (StopFollowingContract contractId) -> atomically $ Right <$> stopFollowingContract contractId Right jobId -> case jobId of + in + run server diff --git a/marlowe-runtime/src/Language/Marlowe/Runtime/History/QueryServer.hs b/marlowe-runtime/src/Language/Marlowe/Runtime/History/QueryServer.hs index 1f10551b20..493c5fa96f 100644 --- a/marlowe-runtime/src/Language/Marlowe/Runtime/History/QueryServer.hs +++ b/marlowe-runtime/src/Language/Marlowe/Runtime/History/QueryServer.hs @@ -8,9 +8,8 @@ module Language.Marlowe.Runtime.History.QueryServer where -import Control.Concurrent.Async (Concurrently(Concurrently, runConcurrently)) +import Control.Concurrent.Component import Control.Concurrent.STM (STM, atomically) -import Control.Exception (SomeException, catch) import Data.Bifunctor (bimap) import Data.Map (Map) import qualified Data.Map as Map @@ -33,22 +32,14 @@ data HistoryQueryServerDependencies = HistoryQueryServerDependencies , followerPageSize :: Natural } -newtype HistoryQueryServer = HistoryQueryServer - { runHistoryQueryServer :: IO () - } - -mkHistoryQueryServer :: HistoryQueryServerDependencies -> STM HistoryQueryServer -mkHistoryQueryServer HistoryQueryServerDependencies{..} = do - let - runHistoryQueryServer = do +historyQueryServer :: Component IO HistoryQueryServerDependencies () +historyQueryServer = serverComponent + worker + (hPutStrLn stderr . ("Query worker crashed with exception: " <>) . show) + (hPutStrLn stderr "Query client terminated normally") + \HistoryQueryServerDependencies{..} -> do runQueryServer <- acceptRunQueryServer - Worker{..} <- atomically $ mkWorker WorkerDependencies {..} - runConcurrently $ - Concurrently (runWorker `catch` catchWorker) *> Concurrently runHistoryQueryServer - pure $ HistoryQueryServer { runHistoryQueryServer } - -catchWorker :: SomeException -> IO () -catchWorker = hPutStrLn stderr . ("Query worker crashed with exception: " <>) . show + pure WorkerDependencies {..} data WorkerDependencies = WorkerDependencies { runQueryServer :: RunQueryServer IO @@ -56,22 +47,16 @@ data WorkerDependencies = WorkerDependencies , followerPageSize :: Natural } -newtype Worker = Worker - { runWorker :: IO () - } - -mkWorker :: WorkerDependencies -> STM Worker -mkWorker WorkerDependencies{..} = +worker :: Component IO WorkerDependencies () +worker = component_ \WorkerDependencies{..} -> do let RunServer run = runQueryServer - in - pure Worker { runWorker = run server } - where server :: RuntimeHistoryQueryServer IO () server = QueryServer $ pure $ ServerStInit \case GetFollowedContracts -> getFollowedContractsServer followerPageSize followerStatuses GetStatuses contractIds -> getStatusesServer contractIds followerStatuses + run server getFollowedContractsServer :: Natural diff --git a/marlowe-runtime/src/Language/Marlowe/Runtime/History/Store.hs b/marlowe-runtime/src/Language/Marlowe/Runtime/History/Store.hs index ba19810817..47d917a14b 100644 --- a/marlowe-runtime/src/Language/Marlowe/Runtime/History/Store.hs +++ b/marlowe-runtime/src/Language/Marlowe/Runtime/History/Store.hs @@ -6,6 +6,7 @@ module Language.Marlowe.Runtime.History.Store where import Control.Applicative (empty) +import Control.Concurrent.Component import Control.Concurrent.STM (STM, TVar, atomically, modifyTVar, newTVar, readTVarIO, writeTVar) import Control.Concurrent.STM.TVar (readTVar) import Control.Monad (forever, mfilter) @@ -50,9 +51,7 @@ data HistoryStoreDependencies = HistoryStoreDependencies -- | API of the history store. data HistoryStore = HistoryStore - { runHistoryStore :: IO () - -- ^ Run the history store process. - , findContract :: ContractId -> IO (Maybe (BlockHeader, SomeCreateStep)) + { findContract :: ContractId -> IO (Maybe (BlockHeader, SomeCreateStep)) -- ^ Lookup a contract's creation context by its ID. , intersectContract :: forall v. ContractId -> MarloweVersion v -> [BlockHeader] -> IO (Maybe BlockHeader) -- ^ Find the latest common block header from the provided list in a contract's history @@ -120,11 +119,13 @@ instance Show SomeContractSteps where ) -- | Create a new history store from a set of dependencies. -mkHistoryStore :: HistoryStoreDependencies -> STM HistoryStore -mkHistoryStore HistoryStoreDependencies{..} = do +historyStore :: Component IO HistoryStoreDependencies HistoryStore +historyStore = component \HistoryStoreDependencies{..} -> do -- A transactional cache of the latest block in each contract's history. latestBlocksPerContractVar <- newTVar (Map.empty :: Map ContractId (TVar (Maybe BlockHeader))) let + HistoryQueries{..} = historyQueries + runHistoryStore :: IO () runHistoryStore = forever do newChanges <- atomically awaitChanges @@ -209,6 +210,4 @@ mkHistoryStore HistoryStoreDependencies{..} = do FindNext blockHeader (SomeContractSteps version' steps) -> case assertVersionsEqual version' version of Refl -> pure $ Next blockHeader steps - pure HistoryStore{..} - where - HistoryQueries{..} = historyQueries + pure (runHistoryStore, HistoryStore{..}) diff --git a/marlowe-runtime/src/Language/Marlowe/Runtime/History/SyncServer.hs b/marlowe-runtime/src/Language/Marlowe/Runtime/History/SyncServer.hs index 1245b5e6ff..435d7f0998 100644 --- a/marlowe-runtime/src/Language/Marlowe/Runtime/History/SyncServer.hs +++ b/marlowe-runtime/src/Language/Marlowe/Runtime/History/SyncServer.hs @@ -8,9 +8,8 @@ module Language.Marlowe.Runtime.History.SyncServer where -import Control.Concurrent.Async (Concurrently(Concurrently, runConcurrently)) +import Control.Concurrent.Component import Control.Concurrent.STM (STM, atomically, retry) -import Control.Exception (SomeException, catch) import Data.Map (Map) import qualified Data.Map as Map import Language.Marlowe.Protocol.Sync.Server @@ -32,22 +31,14 @@ data HistorySyncServerDependencies = HistorySyncServerDependencies , followerStatuses :: STM (Map ContractId FollowerStatus) } -newtype HistorySyncServer = HistorySyncServer - { runHistorySyncServer :: IO () - } - -mkHistorySyncServer :: HistorySyncServerDependencies -> STM HistorySyncServer -mkHistorySyncServer HistorySyncServerDependencies{..} = do - let - runHistorySyncServer = do +historySyncServer :: Component IO HistorySyncServerDependencies () +historySyncServer = serverComponent + worker + (hPutStrLn stderr . ("Sync worker crashed with exception: " <>) . show) + (hPutStrLn stderr "Sync client terminated normally") + \HistorySyncServerDependencies{..} -> do runSyncServer <- acceptRunSyncServer - Worker{..} <- atomically $ mkWorker WorkerDependencies {..} - runConcurrently $ - Concurrently (runWorker `catch` catchWorker) *> Concurrently runHistorySyncServer - pure $ HistorySyncServer { runHistorySyncServer } - -catchWorker :: SomeException -> IO () -catchWorker = hPutStrLn stderr . ("Sync worker crashed with exception: " <>) . show + pure WorkerDependencies {..} data WorkerDependencies = WorkerDependencies { runSyncServer :: RunSyncServer IO @@ -58,18 +49,11 @@ data WorkerDependencies = WorkerDependencies , followerStatuses :: STM (Map ContractId FollowerStatus) } -newtype Worker = Worker - { runWorker :: IO () - } - -mkWorker :: WorkerDependencies -> STM Worker -mkWorker WorkerDependencies{..} = +worker :: Component IO WorkerDependencies () +worker = component_ \WorkerDependencies{..} -> do let RunServer run = runSyncServer - in - pure Worker { runWorker = run server } - where server :: MarloweSyncServer IO () server = MarloweSyncServer $ pure $ ServerStInit { recvMsgFollowContract = followServer @@ -126,3 +110,5 @@ mkWorker WorkerDependencies{..} = At False -> pure $ SendMsgWait $ waitServer contractId version blockHeader requestedAt lastUpdated , recvMsgCancel = pure $ idleServer contractId version blockHeader } + + run server