Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add logging to query server
  • Loading branch information
jhbertra committed Nov 28, 2022
1 parent 625f98b commit 55253ad
Showing 1 changed file with 97 additions and 31 deletions.
128 changes: 97 additions & 31 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Discovery/QueryServer.hs
Expand Up @@ -11,21 +11,55 @@ module Language.Marlowe.Runtime.Discovery.QueryServer
where

import Control.Concurrent.Component
import Control.Exception (displayException)
import Data.Aeson (ToJSON, toJSON)
import Data.Void (Void, absurd)
import Language.Marlowe.Runtime.ChainSync.Api (PolicyId)
import Language.Marlowe.Runtime.Discovery.Api
import Network.Protocol.Driver (RunServer(..))
import Network.Protocol.Query.Server
import Network.Protocol.Query.Types
import Numeric.Natural (Natural)
import Observe.Event (EventBackend)
import Observe.Event.DSL (SelectorSpec(..))
import Observe.Event (Event, EventBackend, addField, narrowEventBackend, reference, subEventBackend, withEvent)
import Observe.Event.BackendModification (modifyEventBackend, setInitialCause)
import Observe.Event.DSL (SelectorField(..), SelectorSpec(..))
import Observe.Event.Render.JSON (DefaultRenderFieldJSON(..), DefaultRenderSelectorJSON(..))
import Observe.Event.Render.JSON.DSL.Compile (compile)
import Observe.Event.Syntax ((≔))
import System.IO (hPutStrLn, stderr)

data PageField a
= Params a
| Count Int

instance ToJSON a => DefaultRenderFieldJSON (PageField a) where
defaultRenderFieldJSON = \case
Params a -> ("params", toJSON a)
Count i -> ("count", toJSON i)

data QuerySelector a f where
Receive :: QuerySelector a (PageField a)
Next :: QuerySelector a (PageField a)
Done :: QuerySelector a Void

instance ToJSON a => DefaultRenderSelectorJSON (QuerySelector a) where
defaultRenderSelectorJSON = \case
Receive -> ("receive", defaultRenderFieldJSON)
Next -> ("next", defaultRenderFieldJSON)
Done -> ("done", defaultRenderFieldJSON)

type QuerySelectorVoid = QuerySelector Void
type QuerySelectorRoleToken = QuerySelector PolicyId

compile $ SelectorSpec "worker"
[ "get" Inject ''QuerySelectorVoid
, ["get", "by", "role"] Inject ''QuerySelectorRoleToken
]

compile $ SelectorSpec ["discovery", "query", "server"]
[ "todo" ''()
[ ["new", "client"] ''Void
, "worker" Inject ''WorkerSelector
, ["worker", "crashed"] ''String
, ["worker", "terminated"] ''Void
]

type RunQueryServer m = RunServer m (QueryServer DiscoveryQuery)
Expand All @@ -39,64 +73,96 @@ data DiscoveryQueryServerDependencies r = DiscoveryQueryServerDependencies
}

discoveryQueryServer :: Component IO (DiscoveryQueryServerDependencies r) ()
discoveryQueryServer = serverComponent
discoveryQueryServer = serverComponentWith
worker
(hPutStrLn stderr . ("Query worker crashed with exception: " <>) . show)
(hPutStrLn stderr "Query client terminated normally")
(\eventBackend ex -> do
withEvent eventBackend WorkerCrashed \ev -> addField ev $ displayException ex
)
(\eventBackend -> do
withEvent eventBackend WorkerTerminated $ const $ pure ()
)
\DiscoveryQueryServerDependencies{..} -> do
runQueryServer <- acceptRunQueryServer
pure WorkerDependencies {..}
withEvent eventBackend NewClient \ev -> do
pure
( modifyEventBackend (setInitialCause $ reference ev) eventBackend
, WorkerDependencies
{ runQueryServer
, getHeaders
, getHeadersByRoleTokenCurrency
, pageSize
, eventBackend = subEventBackend Worker ev
}
)

data WorkerDependencies = WorkerDependencies
data WorkerDependencies r = WorkerDependencies
{ runQueryServer :: RunQueryServer IO
, getHeaders :: IO [ContractHeader]
, getHeadersByRoleTokenCurrency :: PolicyId -> IO [ContractHeader]
, pageSize :: Natural
, eventBackend :: EventBackend IO r WorkerSelector
}

worker :: Component IO WorkerDependencies ()
worker :: Component IO (WorkerDependencies r) ()
worker = component_ \WorkerDependencies{..} -> do
let
RunServer run = runQueryServer

server :: QueryServer DiscoveryQuery IO ()
server = QueryServer $ pure $ ServerStInit \case
GetContractHeaders ->
getContractHeadersServer pageSize getHeaders
getContractHeadersServer (narrowEventBackend Get eventBackend) pageSize getHeaders
GetContractHeadersByRoleTokenCurrency policyId ->
getContractHeadersByRoleTokenCurrencyServer policyId getHeadersByRoleTokenCurrency
getContractHeadersByRoleTokenCurrencyServer (narrowEventBackend GetByRole eventBackend) policyId getHeadersByRoleTokenCurrency
run server

getContractHeadersServer
:: Natural
:: forall r
. EventBackend IO r (QuerySelector Void)
-> Natural
-> IO [ContractHeader]
-> IO (ServerStNext DiscoveryQuery 'CanReject () Void [ContractHeader] IO ())
getContractHeadersServer pageSize getHeaders = next <$> getHeaders
getContractHeadersServer eventBackend pageSize getHeaders = do
withEvent' Receive \ev -> next ev =<< getHeaders
where
withEvent' :: QuerySelector Void f -> (Event IO r (QuerySelector Void) f -> IO a) -> IO a
withEvent' = withEvent eventBackend
next
:: [ContractHeader]
-> ServerStNext DiscoveryQuery k () Void [ContractHeader] IO ()
next headers = case splitAt (fromIntegral pageSize) headers of
(page, []) -> lastPage page
(page, headers') -> SendMsgNextPage page (Just ()) ServerStPage
{ recvMsgDone = pure ()
, recvMsgRequestNext = const $ pure $ next headers'
}
:: Event IO r (QuerySelector Void) (PageField Void)
-> [ContractHeader]
-> IO (ServerStNext DiscoveryQuery k () Void [ContractHeader] IO ())
next ev headers = do
let (page, page') = splitAt (fromIntegral pageSize) headers
addField ev $ Count $ length page
pure case page' of
[] -> lastPage page
headers' -> SendMsgNextPage page (Just ()) ServerStPage
{ recvMsgDone = withEvent' Done mempty
, recvMsgRequestNext = const $ withEvent' Next \ev' -> next ev' headers'
}
lastPage
:: [ContractHeader]
-> ServerStNext DiscoveryQuery k () Void [ContractHeader] IO ()
lastPage page = SendMsgNextPage page Nothing ServerStPage
{ recvMsgDone = pure ()
, recvMsgRequestNext = const $ pure $ lastPage []
{ recvMsgDone = withEvent' Done mempty
, recvMsgRequestNext = const $ withEvent' Next $ const $ pure $ lastPage []
}

getContractHeadersByRoleTokenCurrencyServer
:: PolicyId
:: forall r
. EventBackend IO r (QuerySelector PolicyId)
-> PolicyId
-> (PolicyId -> IO [ContractHeader])
-> IO (ServerStNext DiscoveryQuery 'CanReject Void Void [ContractHeader] IO ())
getContractHeadersByRoleTokenCurrencyServer policyId getHeadersByRoleTokenCurrency = do
headers <- getHeadersByRoleTokenCurrency policyId
pure $ SendMsgNextPage headers Nothing ServerStPage
{ recvMsgDone = pure ()
, recvMsgRequestNext = absurd
}
getContractHeadersByRoleTokenCurrencyServer eventBackend policyId getHeadersByRoleTokenCurrency = do
let
withEvent' :: QuerySelector PolicyId f -> (Event IO r (QuerySelector PolicyId) f -> IO a) -> IO a
withEvent' = withEvent eventBackend
withEvent' Receive \ev -> do
addField ev $ Params policyId
headers <- getHeadersByRoleTokenCurrency policyId
addField ev $ Count $ length headers
pure $ SendMsgNextPage headers Nothing ServerStPage
{ recvMsgDone = pure ()
, recvMsgRequestNext = absurd
}

0 comments on commit 55253ad

Please sign in to comment.