Skip to content

Commit

Permalink
Refactor type of getNextHeaders
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Dec 2, 2022
1 parent 9e50ce9 commit 0992e6c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 24 deletions.
3 changes: 3 additions & 0 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Core/Api.hs
Expand Up @@ -57,6 +57,9 @@ parseContractId = fmap ContractId . parseTxOutRef . T.pack
renderContractId :: ContractId -> Text
renderContractId = renderTxOutRef . unContractId

instance ToJSON ContractId where
toJSON = String . renderContractId

data MarloweVersionTag
= V1

Expand Down
13 changes: 6 additions & 7 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Discovery/Chain.hs
Expand Up @@ -11,7 +11,7 @@ import Control.Concurrent.STM (STM, atomically, newTQueue, readTQueue, writeTQue
import Control.Monad (guard)
import Data.Coerce (coerce)
import Data.Crosswalk (crosswalk)
import Data.Foldable (fold, for_)
import Data.Foldable (fold, traverse_)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Text (Text)
Expand All @@ -31,7 +31,7 @@ import Observe.Event.Syntax ((≔))
import qualified Plutus.V1.Ledger.Api as P

type Versions = [Text]
type TransactionSet = Set Chain.Transaction
type ContractIds = Set ContractId

compile $ SelectorSpec ["discovery", "chain", "client"]
[ "connect" ''Void
Expand All @@ -43,7 +43,7 @@ compile $ SelectorSpec ["discovery", "chain", "client"]
, ["roll", "forward"] FieldSpec ["roll", "forward"]
[ "block" ''Chain.BlockHeader
, "tip" ''Chain.ChainPoint
, "results" ''TransactionSet
, ["new", "headers"] ''ContractIds
]
, ["roll", "backward"] ''Chain.ChainPoint
]
Expand Down Expand Up @@ -88,10 +88,9 @@ discoveryChainClient = component \DiscoveryChainClientDependencies{..} -> do
Chain.At block -> \tip -> withEvent eventBackend RollForward \ev -> do
addField ev $ Block block
addField ev $ Tip tip
addField ev $ Results txs
atomically
$ for_ (fmap fold $ crosswalk (extractHeaders block) $ Set.toList txs)
$ writeTQueue queue . RolledForward block
let mNewHeaders = fmap fold $ crosswalk (extractHeaders block) $ Set.toList txs
addField ev $ NewHeaders $ Set.map contractId $ fold mNewHeaders
atomically $ traverse_ (writeTQueue queue . RolledForward block) mNewHeaders
pure clientIdle
, recvMsgRollBackward = \point _ -> withEvent eventBackend RollBackward \ev -> do
addField ev point
Expand Down
31 changes: 20 additions & 11 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Discovery/Store.hs
Expand Up @@ -46,10 +46,16 @@ data DiscoveryStoreDependencies r = DiscoveryStoreDependencies
, eventBackend :: EventBackend IO r DiscoveryStoreSelector
}

data NextHeaders
= Wait
| RollBackward ChainPoint
| RollForward BlockHeader [ContractHeader]


data DiscoveryStore = DiscoveryStore
{ getHeaders :: IO [ContractHeader]
, getHeadersByRoleTokenCurrency :: PolicyId -> IO [ContractHeader]
, getNextHeaders :: ChainPoint -> IO (Maybe (Either ChainPoint (BlockHeader, [ContractHeader])))
, getNextHeaders :: ChainPoint -> IO NextHeaders
, getIntersect :: [BlockHeader] -> IO (Maybe BlockHeader)
}

Expand Down Expand Up @@ -168,23 +174,26 @@ worker = component \WorkerDependencies{..} -> do
, getNextHeaders = \point -> do
blocks <- readTVarIO blocksVar
pure case point of
Genesis -> do
(blockHeader, blockData) <- fst <$> Map.minViewWithKey blocks
pure case blockData of
Rollback point' -> Left point'
Block headers -> Right (blockHeader, headers)
Genesis -> case Map.minViewWithKey blocks of
Nothing -> Wait
Just ((blockHeader, blockData), _) -> case blockData of
Rollback point' -> RollBackward point'
Block headers -> RollForward blockHeader headers
At blockHeader -> case Map.lookup blockHeader blocks of
Nothing -> Just $ Left Genesis
Nothing -> RollBackward Genesis
Just blockData ->
case blockData of
Rollback point' -> pure $ Left point'
_ -> Right <$> do
let (_, blocks') = Map.split blockHeader blocks
Rollback point' -> RollBackward point'
_ ->
let
(_, blocks') = Map.split blockHeader blocks
filtered = flip Map.mapMaybe blocks' \case
Block hs -> Just hs
_ -> Nothing
fst <$> Map.minViewWithKey filtered
in
case Map.minViewWithKey filtered of
Nothing -> Wait
Just ((blockHeader', headers), _) -> RollForward blockHeader' headers
, getIntersect = \headers -> fmap fst
. Set.maxView
. Set.intersection (Set.fromList headers)
Expand Down
Expand Up @@ -13,7 +13,7 @@ module Language.Marlowe.Runtime.Discovery.SyncServer
import Control.Concurrent.Component
import Language.Marlowe.Protocol.HeaderSync.Server
import Language.Marlowe.Runtime.ChainSync.Api (BlockHeader, ChainPoint, WithGenesis(..))
import Language.Marlowe.Runtime.Discovery.Api
import Language.Marlowe.Runtime.Discovery.Store (NextHeaders(..))
import Network.Protocol.Driver (RunServer(..))
import Observe.Event (EventBackend)
import Observe.Event.DSL (SelectorSpec(..))
Expand All @@ -29,7 +29,7 @@ type RunSyncServer m = RunServer m MarloweHeaderSyncServer

data DiscoverySyncServerDependencies r = DiscoverySyncServerDependencies
{ acceptRunSyncServer :: IO (RunSyncServer IO)
, getNextHeaders :: ChainPoint -> IO (Maybe (Either ChainPoint (BlockHeader, [ContractHeader])))
, getNextHeaders :: ChainPoint -> IO NextHeaders
, getIntersect :: [BlockHeader] -> IO (Maybe BlockHeader)
, eventBackend :: EventBackend IO r DiscoverySyncServerSelector
}
Expand All @@ -45,7 +45,7 @@ discoverySyncServer = serverComponent

data WorkerDependencies = WorkerDependencies
{ runSyncServer :: RunSyncServer IO
, getNextHeaders :: ChainPoint -> IO (Maybe (Either ChainPoint (BlockHeader, [ContractHeader])))
, getNextHeaders :: ChainPoint -> IO NextHeaders
, getIntersect :: [BlockHeader] -> IO (Maybe BlockHeader)
}

Expand Down Expand Up @@ -79,9 +79,9 @@ worker = component_ \WorkerDependencies{..} -> do
nextServer point = do
result <- getNextHeaders point
pure case result of
Nothing -> SendMsgWait $ waitServer point
Just (Left point') -> SendMsgRollBackward point' $ idleServer point'
Just (Right (block, headers)) -> SendMsgNewHeaders block headers $ idleServer $ At block
Wait -> SendMsgWait $ waitServer point
RollBackward point' -> SendMsgRollBackward point' $ idleServer point'
RollForward block headers -> SendMsgNewHeaders block headers $ idleServer $ At block

waitServer :: ChainPoint -> ServerStWait IO ()
waitServer point = ServerStWait
Expand Down

0 comments on commit 0992e6c

Please sign in to comment.