Skip to content

Commit

Permalink
Add eventuo11y to marlowe-discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Dec 2, 2022
1 parent e59e364 commit 47ba16e
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 17 deletions.
3 changes: 3 additions & 0 deletions marlowe-runtime/marlowe-discovery/Main.hs
Expand Up @@ -29,6 +29,8 @@ import Network.Socket
, withFdSocket
, withSocketsDo
)
import Observe.Event.Render.JSON (DefaultRenderSelectorJSON(defaultRenderSelectorJSON))
import Observe.Event.Render.JSON.Handle (simpleJsonStderrBackend)
import Options.Applicative
( auto
, execParser
Expand Down Expand Up @@ -67,6 +69,7 @@ run Options{..} = withSocketsDo do
acceptRunQueryServer = acceptRunServerPeerOverSocket throwIO querySocket codecQuery queryServerPeer
acceptRunSyncServer = acceptRunServerPeerOverSocket throwIO syncSocket codecMarloweHeaderSync marloweHeaderSyncServerPeer
let pageSize = 1024 -- TODO move to config with a default
eventBackend <- simpleJsonStderrBackend defaultRenderSelectorJSON
runComponent_ discovery DiscoveryDependencies{..}
where
openServer addr = bracketOnError (openSocket addr) close \socket -> do
Expand Down
4 changes: 4 additions & 0 deletions marlowe-runtime/marlowe-runtime.cabal
Expand Up @@ -116,6 +116,9 @@ library
, bytestring
, containers
, errors
, eventuo11y
, eventuo11y-dsl
, eventuo11y-json
, marlowe
, marlowe-chain-sync
, marlowe-protocols
Expand Down Expand Up @@ -366,6 +369,7 @@ executable marlowe-discovery
, async-components
, base16
, containers
, eventuo11y-json
, marlowe
, marlowe-protocols
, marlowe-runtime
Expand Down
40 changes: 33 additions & 7 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Discovery.hs
@@ -1,5 +1,8 @@
{-# LANGUAGE Arrows #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}

module Language.Marlowe.Runtime.Discovery
where
Expand All @@ -12,17 +15,40 @@ import Language.Marlowe.Runtime.Discovery.Store
import Language.Marlowe.Runtime.Discovery.SyncServer
import Network.Protocol.Driver (RunClient)
import Numeric.Natural (Natural)
import Observe.Event (EventBackend)
import Observe.Event.Backend (narrowEventBackend)
import Observe.Event.DSL (SelectorField(..), SelectorSpec(..))
import Observe.Event.Render.JSON.DSL.Compile (compile)
import Observe.Event.Syntax ((≔))

data DiscoveryDependencies = DiscoveryDependencies
compile $ SelectorSpec "discovery"
[ ["chain", "client"] Inject ''DiscoveryChainClientSelector
, "store" Inject ''DiscoveryStoreSelector
, ["sync", "server"] Inject ''DiscoverySyncServerSelector
, ["query", "server"] Inject ''DiscoveryQueryServerSelector
]

data DiscoveryDependencies r = DiscoveryDependencies
{ acceptRunSyncServer :: IO (RunSyncServer IO)
, acceptRunQueryServer :: IO (RunQueryServer IO)
, connectToChainSeek :: RunClient IO Chain.RuntimeChainSeekClient
, pageSize :: Natural
, eventBackend :: EventBackend IO r DiscoverySelector
}

discovery :: Component IO DiscoveryDependencies ()
discovery = proc DiscoveryDependencies{..} -> do
changes <- discoveryChainClient -< DiscoveryChainClientDependencies{..}
DiscoveryStore{..} <- discoveryStore -< DiscoveryStoreDependencies{..}
discoverySyncServer -< DiscoverySyncServerDependencies{..}
discoveryQueryServer -< DiscoveryQueryServerDependencies{..}
discovery :: Component IO (DiscoveryDependencies r) ()
discovery = proc DiscoveryDependencies
{ acceptRunSyncServer
, acceptRunQueryServer
, connectToChainSeek
, pageSize
, eventBackend = rootBackend
} -> do
changes <- discoveryChainClient -<
let eventBackend = narrowEventBackend ChainClient rootBackend in DiscoveryChainClientDependencies{..}
DiscoveryStore{..} <- discoveryStore -<
let eventBackend = narrowEventBackend Store rootBackend in DiscoveryStoreDependencies{..}
discoverySyncServer -<
let eventBackend = narrowEventBackend SyncServer rootBackend in DiscoverySyncServerDependencies{..}
discoveryQueryServer -<
let eventBackend = narrowEventBackend QueryServer rootBackend in DiscoveryQueryServerDependencies{..}
19 changes: 15 additions & 4 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Discovery/Chain.hs
@@ -1,5 +1,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}

module Language.Marlowe.Runtime.Discovery.Chain
where
Expand All @@ -21,8 +23,16 @@ import Language.Marlowe.Runtime.Core.Api
import Language.Marlowe.Runtime.Core.ScriptRegistry (MarloweScripts(..), getMarloweVersion, getScripts)
import Language.Marlowe.Runtime.Discovery.Api (ContractHeader(..))
import Network.Protocol.Driver (RunClient)
import Observe.Event (EventBackend)
import Observe.Event.DSL (SelectorSpec(..))
import Observe.Event.Render.JSON.DSL.Compile (compile)
import Observe.Event.Syntax ((≔))
import qualified Plutus.V1.Ledger.Api as P

compile $ SelectorSpec ["discovery", "chain", "client"]
[ "todo" ''()
]

data Changes = Changes
{ headers :: !(Map Chain.BlockHeader (Set ContractHeader))
, rollbackTo :: !(Maybe Chain.ChainPoint)
Expand All @@ -44,7 +54,7 @@ applyRollback :: Chain.ChainPoint -> Changes -> Changes
applyRollback Chain.Genesis _ = Changes mempty $ Just Chain.Genesis
applyRollback (Chain.At blockHeader@Chain.BlockHeader{slotNo}) Changes{..} = Changes
{ headers = headers'
, rollbackTo = asum
, rollbackTo = asum @[]
[ guard (Map.null headers') *> (min (Just (Chain.At blockHeader)) rollbackTo <|> Just (Chain.At blockHeader))
, rollbackTo
]
Expand All @@ -53,11 +63,12 @@ applyRollback (Chain.At blockHeader@Chain.BlockHeader{slotNo}) Changes{..} = Cha
headers' = Map.filterWithKey (const . isNotRolledBack) headers
isNotRolledBack = not . Chain.isAfter slotNo

newtype DiscoveryChainClientDependencies = DiscoveryChainClientDependencies
data DiscoveryChainClientDependencies r = DiscoveryChainClientDependencies
{ connectToChainSeek :: RunClient IO Chain.RuntimeChainSeekClient
, eventBackend :: EventBackend IO r DiscoveryChainClientSelector
}

discoveryChainClient :: Component IO DiscoveryChainClientDependencies (STM Changes)
discoveryChainClient :: Component IO (DiscoveryChainClientDependencies r) (STM Changes)
discoveryChainClient = component \DiscoveryChainClientDependencies{..} -> do
changesVar <- newTVar mempty
let
Expand Down Expand Up @@ -134,4 +145,4 @@ extractHeaders blockHeader Chain.Transaction{..} =
}

marloweScriptHashes :: Set Chain.ScriptHash
marloweScriptHashes = Set.map marloweScript $ foldMap (withSomeMarloweVersion getScripts) [minBound..maxBound]
marloweScriptHashes = Set.map marloweScript $ foldMap @[] (withSomeMarloweVersion getScripts) [minBound..maxBound]
Expand Up @@ -2,8 +2,10 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE EmptyCase #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TemplateHaskell #-}

module Language.Marlowe.Runtime.Discovery.QueryServer
where
Expand All @@ -16,18 +18,27 @@ import Network.Protocol.Driver (RunServer(..))
import Network.Protocol.Query.Server
import Network.Protocol.Query.Types
import Numeric.Natural (Natural)
import Observe.Event (EventBackend)
import Observe.Event.DSL (SelectorSpec(..))
import Observe.Event.Render.JSON.DSL.Compile (compile)
import Observe.Event.Syntax ((≔))
import System.IO (hPutStrLn, stderr)

compile $ SelectorSpec ["discovery", "query", "server"]
[ "todo" ''()
]

type RunQueryServer m = RunServer m (QueryServer DiscoveryQuery)

data DiscoveryQueryServerDependencies = DiscoveryQueryServerDependencies
data DiscoveryQueryServerDependencies r = DiscoveryQueryServerDependencies
{ acceptRunQueryServer :: IO (RunQueryServer IO)
, getHeaders :: IO [ContractHeader]
, getHeadersByRoleTokenCurrency :: PolicyId -> IO [ContractHeader]
, pageSize :: Natural
, eventBackend :: EventBackend IO r DiscoveryQueryServerSelector
}

discoveryQueryServer :: Component IO DiscoveryQueryServerDependencies ()
discoveryQueryServer :: Component IO (DiscoveryQueryServerDependencies r) ()
discoveryQueryServer = serverComponent
worker
(hPutStrLn stderr . ("Query worker crashed with exception: " <>) . show)
Expand Down
17 changes: 15 additions & 2 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Discovery/Store.hs
@@ -1,3 +1,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE TemplateHaskell #-}

module Language.Marlowe.Runtime.Discovery.Store
where

Expand All @@ -13,9 +17,18 @@ import qualified Data.Set as Set
import Language.Marlowe.Runtime.ChainSync.Api (BlockHeader, ChainPoint, PolicyId, WithGenesis(..))
import Language.Marlowe.Runtime.Discovery.Api (ContractHeader(..))
import Language.Marlowe.Runtime.Discovery.Chain (Changes(..), isEmptyChanges)
import Observe.Event (EventBackend)
import Observe.Event.DSL (SelectorSpec(SelectorSpec))
import Observe.Event.Render.JSON.DSL.Compile (compile)
import Observe.Event.Syntax ((≔))

compile $ SelectorSpec ["discovery", "store"]
[ "todo" ''()
]

newtype DiscoveryStoreDependencies = DiscoveryStoreDependencies
data DiscoveryStoreDependencies r = DiscoveryStoreDependencies
{ changes :: STM Changes
, eventBackend :: EventBackend IO r DiscoveryStoreSelector
}

data DiscoveryStore = DiscoveryStore
Expand All @@ -29,7 +42,7 @@ data BlockData
= Rollback ChainPoint
| Block [ContractHeader]

discoveryStore :: Component IO DiscoveryStoreDependencies DiscoveryStore
discoveryStore :: Component IO (DiscoveryStoreDependencies r) DiscoveryStore
discoveryStore = component \DiscoveryStoreDependencies{..} -> do
blocksVar <- newTVar mempty
roleTokenIndex <- newTVar mempty
Expand Down
Expand Up @@ -2,8 +2,10 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE EmptyCase #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TemplateHaskell #-}

module Language.Marlowe.Runtime.Discovery.SyncServer
where
Expand All @@ -13,17 +15,26 @@ import Language.Marlowe.Protocol.HeaderSync.Server
import Language.Marlowe.Runtime.ChainSync.Api (BlockHeader, ChainPoint, WithGenesis(..))
import Language.Marlowe.Runtime.Discovery.Api
import Network.Protocol.Driver (RunServer(..))
import Observe.Event (EventBackend)
import Observe.Event.DSL (SelectorSpec(..))
import Observe.Event.Render.JSON.DSL.Compile (compile)
import Observe.Event.Syntax ((≔))
import System.IO (hPutStrLn, stderr)

compile $ SelectorSpec ["discovery", "sync", "server"]
[ "todo" ''()
]

type RunSyncServer m = RunServer m MarloweHeaderSyncServer

data DiscoverySyncServerDependencies = DiscoverySyncServerDependencies
data DiscoverySyncServerDependencies r = DiscoverySyncServerDependencies
{ acceptRunSyncServer :: IO (RunSyncServer IO)
, getNextHeaders :: ChainPoint -> IO (Maybe (Either ChainPoint (BlockHeader, [ContractHeader])))
, getIntersect :: [BlockHeader] -> IO (Maybe BlockHeader)
, eventBackend :: EventBackend IO r DiscoverySyncServerSelector
}

discoverySyncServer :: Component IO DiscoverySyncServerDependencies ()
discoverySyncServer :: Component IO (DiscoverySyncServerDependencies r) ()
discoverySyncServer = serverComponent
worker
(hPutStrLn stderr . ("Sync worker crashed with exception: " <>) . show)
Expand Down

0 comments on commit 47ba16e

Please sign in to comment.