Skip to content

Commit

Permalink
Port marlowe-history to async-components
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Nov 24, 2022
1 parent 844c24a commit edf38e7
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 136 deletions.
7 changes: 5 additions & 2 deletions async-components/src/Control/Concurrent/Component.hs
Expand Up @@ -117,6 +117,9 @@ withComponent_ c a = withComponent c a . const
component :: (a -> STM (m (), b)) -> Component m a b
component run = Component $ (fmap . B.first) Concurrently . run

component_ :: (a -> m ()) -> Component m a ()
component_ = component . fmap (pure . (,()))

serverComponent
:: forall m a b
. MonadBaseControl IO m
Expand All @@ -125,7 +128,7 @@ serverComponent
-> m ()
-> (a -> m b)
-> Component m a ()
serverComponent worker onWorkerError onWorkerTerminated accept = component \a ->
serverComponent worker onWorkerError onWorkerTerminated accept = component_ \a ->
let
run :: m ()
run = do
Expand All @@ -140,4 +143,4 @@ serverComponent worker onWorkerError onWorkerTerminated accept = component \a ->
Left (Right ()) -> onWorkerTerminated
wait aserver
in
pure (run, ())
run
Expand Up @@ -46,7 +46,7 @@ data WorkerDependencies = WorkerDependencies
}

worker :: Component IO WorkerDependencies ()
worker = component \WorkerDependencies{..} ->
worker = component_ \WorkerDependencies{..} -> do
let
RunServer run = runJobServer

Expand All @@ -61,5 +61,4 @@ worker = component \WorkerDependencies{..} ->
pure case result of
SubmitFail err -> Left $ show err
SubmitSuccess -> Right ()
in
pure (run server, ())
run server
Expand Up @@ -67,7 +67,7 @@ data WorkerDependencies = WorkerDependencies
}

worker :: Component IO WorkerDependencies ()
worker = component \WorkerDependencies{..} ->
worker = component_ \WorkerDependencies{..} -> do
let
RunServer run = runQueryServer

Expand Down Expand Up @@ -122,5 +122,4 @@ worker = component \WorkerDependencies{..} ->
withExceptT (const ()) $ except result

extractSlotConfig GenesisParameters{..} = SlotConfig protocolParamSystemStart protocolParamSlotLength
in
pure (run server, ())
run server
Expand Up @@ -55,7 +55,7 @@ data WorkerDependencies = WorkerDependencies
}

worker :: Component IO WorkerDependencies ()
worker = component \WorkerDependencies{..} -> do
worker = component_ \WorkerDependencies{..} -> do
let
RunServer runServer = runChainSeekServer
runWorker = void $ runServer server
Expand Down Expand Up @@ -95,4 +95,4 @@ worker = component \WorkerDependencies{..} -> do
, recvMsgDone = pure ()
}

pure (runWorker, ())
runWorker
9 changes: 4 additions & 5 deletions marlowe-runtime/marlowe-history/Main.hs
Expand Up @@ -3,6 +3,7 @@
module Main
where

import Control.Concurrent.Component
import Control.Concurrent.STM (atomically)
import Control.Exception (bracket, bracketOnError, throwIO)
import Data.Either (fromRight)
Expand All @@ -11,7 +12,7 @@ import Language.Marlowe.Protocol.Sync.Codec (codecMarloweSync)
import Language.Marlowe.Protocol.Sync.Server (marloweSyncServerPeer)
import Language.Marlowe.Runtime.ChainSync.Api
(ChainSyncQuery(..), RuntimeChainSeekClient, WithGenesis(..), runtimeChainSeekCodec)
import Language.Marlowe.Runtime.History (History(..), HistoryDependencies(..), mkHistory)
import Language.Marlowe.Runtime.History (HistoryDependencies(..), history)
import Language.Marlowe.Runtime.History.Api (historyJobCodec, historyQueryCodec)
import Language.Marlowe.Runtime.History.Store (hoistHistoryQueries)
import Language.Marlowe.Runtime.History.Store.Memory (mkHistoryQueriesInMemory)
Expand Down Expand Up @@ -84,10 +85,8 @@ run Options{..} = withSocketsDo do
acceptRunQueryServer = acceptRunServerPeerOverSocket throwIO querySocket historyQueryCodec queryServerPeer
acceptRunSyncServer = acceptRunServerPeerOverSocket throwIO syncSocket codecMarloweSync marloweSyncServerPeer
let followerPageSize = 1024 -- TODO move to config with a default
History{..} <- atomically do
historyQueries <- hoistHistoryQueries atomically <$> mkHistoryQueriesInMemory
mkHistory HistoryDependencies{..}
runHistory
historyQueries <- atomically $ hoistHistoryQueries atomically <$> mkHistoryQueriesInMemory
runComponent_ history HistoryDependencies{..}
where
openServer addr = bracketOnError (openSocket addr) close \socket -> do
setSocketOption socket ReuseAddr 1
Expand Down
6 changes: 2 additions & 4 deletions marlowe-runtime/marlowe-runtime.cabal
Expand Up @@ -281,7 +281,6 @@ executable marlowe
aeson
, base >= 4.9 && < 5
, ansi-terminal
, async
, base16
, bytestring
, cardano-api
Expand Down Expand Up @@ -333,7 +332,7 @@ executable marlowe-history
build-depends:
base >= 4.9 && < 5
, ansi-terminal
, async
, async-components
, base16
, containers
, marlowe
Expand All @@ -359,7 +358,7 @@ executable marlowe-discovery
build-depends:
base >= 4.9 && < 5
, ansi-terminal
, async
, async-components
, base16
, containers
, marlowe
Expand All @@ -384,7 +383,6 @@ executable marlowe-tx
Paths_marlowe_runtime
build-depends:
ansi-terminal
, async
, base >= 4.9 && < 5
, base16
, bytestring
Expand Down
37 changes: 11 additions & 26 deletions marlowe-runtime/src/Language/Marlowe/Runtime/History.hs
@@ -1,3 +1,4 @@
{-# LANGUAGE Arrows #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecursiveDo #-}
Expand All @@ -6,17 +7,14 @@
module Language.Marlowe.Runtime.History
where

import Control.Concurrent.Async (Concurrently(..))
import Control.Concurrent.STM (STM)
import Data.Foldable (asum)
import Control.Concurrent.Component
import Language.Marlowe.Runtime.ChainSync.Api (RuntimeChainSeekClient, SlotConfig)
import Language.Marlowe.Runtime.History.FollowerSupervisor
import Language.Marlowe.Runtime.History.JobServer
import Language.Marlowe.Runtime.History.QueryServer
import Language.Marlowe.Runtime.History.Store
(HistoryQueries, HistoryStore(..), HistoryStoreDependencies(..), mkHistoryStore)
import Language.Marlowe.Runtime.History.SyncServer
(HistorySyncServer(..), HistorySyncServerDependencies(..), RunSyncServer, mkHistorySyncServer)
(HistoryQueries, HistoryStore(..), HistoryStoreDependencies(..), historyStore)
import Language.Marlowe.Runtime.History.SyncServer (HistorySyncServerDependencies(..), RunSyncServer, historySyncServer)
import Numeric.Natural (Natural)

data HistoryDependencies = HistoryDependencies
Expand All @@ -30,23 +28,10 @@ data HistoryDependencies = HistoryDependencies
, historyQueries :: HistoryQueries IO
}

newtype History = History
{ runHistory :: IO ()
}

mkHistory :: HistoryDependencies -> STM History
mkHistory HistoryDependencies{..} = do
FollowerSupervisor{..} <- mkFollowerSupervisor FollowerSupervisorDependencies{..}
HistoryJobServer{..} <- mkHistoryJobServer HistoryJobServerDependencies{..}
HistoryQueryServer{..} <- mkHistoryQueryServer HistoryQueryServerDependencies{..}
HistoryStore{..} <- mkHistoryStore HistoryStoreDependencies{..}
HistorySyncServer{..} <- mkHistorySyncServer HistorySyncServerDependencies{..}
pure History
{ runHistory = runConcurrently $ asum $ Concurrently <$>
[ runFollowerSupervisor
, runHistoryJobServer
, runHistoryQueryServer
, runHistoryStore
, runHistorySyncServer
]
}
history :: Component IO HistoryDependencies ()
history = proc HistoryDependencies{..} -> do
FollowerSupervisor{..} <- followerSupervisor -< FollowerSupervisorDependencies{..}
historyJobServer -< HistoryJobServerDependencies{..}
historyQueryServer -< HistoryQueryServerDependencies{..}
HistoryStore{..} <- historyStore -< HistoryStoreDependencies{..}
historySyncServer -< HistorySyncServerDependencies{..}
Expand Up @@ -3,7 +3,8 @@
module Language.Marlowe.Runtime.History.FollowerSupervisor
where

import Control.Concurrent.Async (Concurrently(Concurrently, runConcurrently))
import Control.Concurrent.Async (Concurrently(..))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, atomically, modifyTVar, newTVar, readTVar, writeTVar)
import Control.Monad (guard, mfilter, when, (<=<))
import Data.Foldable (sequenceA_)
Expand Down Expand Up @@ -38,11 +39,10 @@ data FollowerSupervisor = FollowerSupervisor
, stopFollowingContract :: ContractId -> STM Bool
, followerStatuses :: STM (Map ContractId FollowerStatus)
, changes :: STM (Map ContractId UpdateContract)
, runFollowerSupervisor :: IO ()
}

mkFollowerSupervisor :: FollowerSupervisorDependencies -> STM FollowerSupervisor
mkFollowerSupervisor FollowerSupervisorDependencies{..} = do
followerSupervisor :: Component IO FollowerSupervisorDependencies FollowerSupervisor
followerSupervisor = component \FollowerSupervisorDependencies{..} -> do
followersVar <- newTVar Map.empty
followerActivationsVar <- newTVar Map.empty
seenVar <- newTVar Set.empty
Expand Down Expand Up @@ -153,4 +153,4 @@ mkFollowerSupervisor FollowerSupervisorDependencies{..} = do
$ sequenceA_
$ Concurrently <$> (runFollowerSupervisor : (uncurry runFollowerWithCleanup <$> newFollowers))

pure FollowerSupervisor {..}
pure (runFollowerSupervisor, FollowerSupervisor {..})
38 changes: 12 additions & 26 deletions marlowe-runtime/src/Language/Marlowe/Runtime/History/JobServer.hs
Expand Up @@ -8,9 +8,8 @@
module Language.Marlowe.Runtime.History.JobServer
where

import Control.Concurrent.Async (Concurrently(Concurrently, runConcurrently))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, atomically, retry)
import Control.Exception (SomeException, catch)
import Data.Map (Map)
import qualified Data.Map as Map
import Language.Marlowe.Runtime.Core.Api
Expand All @@ -28,22 +27,14 @@ data HistoryJobServerDependencies = HistoryJobServerDependencies
, followerStatuses :: STM (Map ContractId FollowerStatus)
}

newtype HistoryJobServer = HistoryJobServer
{ runHistoryJobServer :: IO ()
}

mkHistoryJobServer :: HistoryJobServerDependencies -> STM HistoryJobServer
mkHistoryJobServer HistoryJobServerDependencies{..} = do
let
runHistoryJobServer = do
historyJobServer :: Component IO HistoryJobServerDependencies ()
historyJobServer = serverComponent
worker
(hPutStrLn stderr . ("Job worker crashed with exception: " <>) . show)
(hPutStrLn stderr "Job client terminated normally")
\HistoryJobServerDependencies{..} -> do
runJobServer <- acceptRunJobServer
Worker{..} <- atomically $ mkWorker WorkerDependencies {..}
runConcurrently $
Concurrently (runWorker `catch` catchWorker) *> Concurrently runHistoryJobServer
pure $ HistoryJobServer { runHistoryJobServer }

catchWorker :: SomeException -> IO ()
catchWorker = hPutStrLn stderr . ("Job worker crashed with exception: " <>) . show
pure WorkerDependencies {..}

data WorkerDependencies = WorkerDependencies
{ runJobServer :: RunJobServer IO
Expand All @@ -52,18 +43,11 @@ data WorkerDependencies = WorkerDependencies
, followerStatuses :: STM (Map ContractId FollowerStatus)
}

newtype Worker = Worker
{ runWorker :: IO ()
}

mkWorker :: WorkerDependencies -> STM Worker
mkWorker WorkerDependencies{..} =
worker :: Component IO WorkerDependencies ()
worker = component_ \WorkerDependencies{..} ->
let
RunServer run = runJobServer
in
pure Worker { runWorker = run server }

where
server :: RuntimeHistoryJobServer IO ()
server = liftCommandHandler $ fmap ((),) . \case
Left (FollowContract contractId) -> do
Expand All @@ -79,3 +63,5 @@ mkWorker WorkerDependencies{..} =
else pure $ Right False
Left (StopFollowingContract contractId) -> atomically $ Right <$> stopFollowingContract contractId
Right jobId -> case jobId of
in
run server
37 changes: 11 additions & 26 deletions marlowe-runtime/src/Language/Marlowe/Runtime/History/QueryServer.hs
Expand Up @@ -8,9 +8,8 @@
module Language.Marlowe.Runtime.History.QueryServer
where

import Control.Concurrent.Async (Concurrently(Concurrently, runConcurrently))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, atomically)
import Control.Exception (SomeException, catch)
import Data.Bifunctor (bimap)
import Data.Map (Map)
import qualified Data.Map as Map
Expand All @@ -33,45 +32,31 @@ data HistoryQueryServerDependencies = HistoryQueryServerDependencies
, followerPageSize :: Natural
}

newtype HistoryQueryServer = HistoryQueryServer
{ runHistoryQueryServer :: IO ()
}

mkHistoryQueryServer :: HistoryQueryServerDependencies -> STM HistoryQueryServer
mkHistoryQueryServer HistoryQueryServerDependencies{..} = do
let
runHistoryQueryServer = do
historyQueryServer :: Component IO HistoryQueryServerDependencies ()
historyQueryServer = serverComponent
worker
(hPutStrLn stderr . ("Query worker crashed with exception: " <>) . show)
(hPutStrLn stderr "Query client terminated normally")
\HistoryQueryServerDependencies{..} -> do
runQueryServer <- acceptRunQueryServer
Worker{..} <- atomically $ mkWorker WorkerDependencies {..}
runConcurrently $
Concurrently (runWorker `catch` catchWorker) *> Concurrently runHistoryQueryServer
pure $ HistoryQueryServer { runHistoryQueryServer }

catchWorker :: SomeException -> IO ()
catchWorker = hPutStrLn stderr . ("Query worker crashed with exception: " <>) . show
pure WorkerDependencies {..}

data WorkerDependencies = WorkerDependencies
{ runQueryServer :: RunQueryServer IO
, followerStatuses :: STM (Map ContractId FollowerStatus)
, followerPageSize :: Natural
}

newtype Worker = Worker
{ runWorker :: IO ()
}

mkWorker :: WorkerDependencies -> STM Worker
mkWorker WorkerDependencies{..} =
worker :: Component IO WorkerDependencies ()
worker = component_ \WorkerDependencies{..} -> do
let
RunServer run = runQueryServer
in
pure Worker { runWorker = run server }

where
server :: RuntimeHistoryQueryServer IO ()
server = QueryServer $ pure $ ServerStInit \case
GetFollowedContracts -> getFollowedContractsServer followerPageSize followerStatuses
GetStatuses contractIds -> getStatusesServer contractIds followerStatuses
run server

getFollowedContractsServer
:: Natural
Expand Down

0 comments on commit edf38e7

Please sign in to comment.