Skip to content

Commit

Permalink
Refactor changes into discovery store
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Dec 2, 2022
1 parent 53a8c8b commit 8746305
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 50 deletions.
2 changes: 1 addition & 1 deletion marlowe-runtime/src/Language/Marlowe/Runtime/Discovery.hs
Expand Up @@ -44,7 +44,7 @@ discovery = proc DiscoveryDependencies
, pageSize
, eventBackend = rootBackend
} -> do
changes <- discoveryChainClient -<
chainEvents <- discoveryChainClient -<
let eventBackend = narrowEventBackend ChainClient rootBackend in DiscoveryChainClientDependencies{..}
DiscoveryStore{..} <- discoveryStore -<
let eventBackend = narrowEventBackend Store rootBackend in DiscoveryStoreDependencies{..}
Expand Down
55 changes: 12 additions & 43 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Discovery/Chain.hs
Expand Up @@ -6,15 +6,12 @@
module Language.Marlowe.Runtime.Discovery.Chain
where

import Control.Applicative ((<|>))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, atomically, modifyTVar, newTVar, readTVar, writeTVar)
import Control.Concurrent.STM (STM, atomically, newTQueue, readTQueue, writeTQueue)
import Control.Monad (guard)
import Data.Coerce (coerce)
import Data.Crosswalk (crosswalk)
import Data.Foldable (asum, fold, for_)
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Foldable (fold, for_)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Text (Text)
Expand Down Expand Up @@ -51,44 +48,18 @@ compile $ SelectorSpec ["discovery", "chain", "client"]
, ["roll", "backward"] ''Chain.ChainPoint
]

data Changes = Changes
{ headers :: !(Map Chain.BlockHeader (Set ContractHeader))
, rollbackTo :: !(Maybe Chain.ChainPoint)
} deriving (Show, Eq)

instance Semigroup Changes where
c1 <> Changes{..} = c1' { headers = Map.unionWith (<>) headers1 headers }
where
c1'@Changes{headers=headers1} = maybe c1 (flip applyRollback c1) rollbackTo

instance Monoid Changes where
mempty = Changes Map.empty Nothing

isEmptyChanges :: Changes -> Bool
isEmptyChanges (Changes headers Nothing) = null $ fold headers
isEmptyChanges _ = False

applyRollback :: Chain.ChainPoint -> Changes -> Changes
applyRollback Chain.Genesis _ = Changes mempty $ Just Chain.Genesis
applyRollback (Chain.At blockHeader@Chain.BlockHeader{slotNo}) Changes{..} = Changes
{ headers = headers'
, rollbackTo = asum @[]
[ guard (Map.null headers') *> (min (Just (Chain.At blockHeader)) rollbackTo <|> Just (Chain.At blockHeader))
, rollbackTo
]
}
where
headers' = Map.filterWithKey (const . isNotRolledBack) headers
isNotRolledBack = not . Chain.isAfter slotNo
data ChainEvent
= RolledForward Chain.BlockHeader (Set ContractHeader)
| RolledBackward Chain.ChainPoint

data DiscoveryChainClientDependencies r = DiscoveryChainClientDependencies
{ connectToChainSeek :: RunClient IO Chain.RuntimeChainSeekClient
, eventBackend :: EventBackend IO r DiscoveryChainClientSelector
}

discoveryChainClient :: Component IO (DiscoveryChainClientDependencies r) (STM Changes)
discoveryChainClient :: Component IO (DiscoveryChainClientDependencies r) (STM ChainEvent)
discoveryChainClient = component \DiscoveryChainClientDependencies{..} -> do
changesVar <- newTVar mempty
queue <- newTQueue
let
clientInit = Chain.SendMsgRequestHandshake Chain.moveSchema clientHandshake
clientHandshake = Chain.ClientStHandshake
Expand Down Expand Up @@ -118,12 +89,13 @@ discoveryChainClient = component \DiscoveryChainClientDependencies{..} -> do
addField ev $ Block block
addField ev $ Tip tip
addField ev $ Results txs
atomically $ for_ (fmap fold $ crosswalk (extractHeaders block) $ Set.toList txs) \headers ->
modifyTVar changesVar (<> mempty { headers = Map.singleton block headers})
atomically
$ for_ (fmap fold $ crosswalk (extractHeaders block) $ Set.toList txs)
$ writeTQueue queue . RolledForward block
pure clientIdle
, recvMsgRollBackward = \point _ -> withEvent eventBackend RollBackward \ev -> do
addField ev point
atomically $ modifyTVar changesVar (<> mempty { rollbackTo = Just point })
atomically $ writeTQueue queue $ RolledBackward point
pure clientIdle
}

Expand All @@ -133,10 +105,7 @@ discoveryChainClient = component \DiscoveryChainClientDependencies{..} -> do
connectToChainSeek $ Chain.ChainSeekClient do
finalize ev
pure clientInit
, do
changes <- readTVar changesVar
writeTVar changesVar mempty
pure changes
, readTQueue queue
)

extractHeaders :: Chain.BlockHeader -> Chain.Transaction -> Maybe (Set ContractHeader)
Expand Down
76 changes: 70 additions & 6 deletions marlowe-runtime/src/Language/Marlowe/Runtime/Discovery/Store.hs
@@ -1,22 +1,28 @@
{-# LANGUAGE Arrows #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE TemplateHaskell #-}

module Language.Marlowe.Runtime.Discovery.Store
where

import Control.Applicative ((<|>))
import Control.Arrow ((&&&))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, atomically, newTVar, readTVar, readTVarIO, writeTVar)
import Control.Monad (forever, mfilter, (<=<))
import Data.Foldable (fold)
import Control.Concurrent.STM (STM, atomically, modifyTVar, newTVar, readTVar, readTVarIO, writeTVar)
import Control.Monad (forever, guard, mfilter, (<=<))
import Data.Foldable (asum, fold)
import Data.Function (on)
import Data.List (groupBy, sortOn)
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Set (Set)
import qualified Data.Set as Set
import Language.Marlowe.Runtime.ChainSync.Api (BlockHeader, ChainPoint, PolicyId, WithGenesis(..))
import qualified Language.Marlowe.Runtime.ChainSync.Api as Chain
import Language.Marlowe.Runtime.Discovery.Api (ContractHeader(..))
import Language.Marlowe.Runtime.Discovery.Chain (Changes(..), isEmptyChanges)
import Language.Marlowe.Runtime.Discovery.Chain (ChainEvent(..))
import Observe.Event (EventBackend)
import Observe.Event.DSL (SelectorSpec(SelectorSpec))
import Observe.Event.Render.JSON.DSL.Compile (compile)
Expand All @@ -27,7 +33,7 @@ compile $ SelectorSpec ["discovery", "store"]
]

data DiscoveryStoreDependencies r = DiscoveryStoreDependencies
{ changes :: STM Changes
{ chainEvents :: STM ChainEvent
, eventBackend :: EventBackend IO r DiscoveryStoreSelector
}

Expand All @@ -38,12 +44,70 @@ data DiscoveryStore = DiscoveryStore
, getIntersect :: [BlockHeader] -> IO (Maybe BlockHeader)
}

data Changes = Changes
{ headers :: !(Map Chain.BlockHeader (Set ContractHeader))
, rollbackTo :: !(Maybe Chain.ChainPoint)
} deriving (Show, Eq)

instance Semigroup Changes where
c1 <> Changes{..} = c1' { headers = Map.unionWith (<>) headers1 headers }
where
c1'@Changes{headers=headers1} = maybe c1 (flip applyRollback c1) rollbackTo

instance Monoid Changes where
mempty = Changes Map.empty Nothing

isEmptyChanges :: Changes -> Bool
isEmptyChanges (Changes headers Nothing) = null $ fold headers
isEmptyChanges _ = False

applyRollback :: Chain.ChainPoint -> Changes -> Changes
applyRollback Chain.Genesis _ = Changes mempty $ Just Chain.Genesis
applyRollback (Chain.At blockHeader@Chain.BlockHeader{slotNo}) Changes{..} = Changes
{ headers = headers'
, rollbackTo = asum @[]
[ guard (Map.null headers') *> (min (Just (Chain.At blockHeader)) rollbackTo <|> Just (Chain.At blockHeader))
, rollbackTo
]
}
where
headers' = Map.filterWithKey (const . isNotRolledBack) headers
isNotRolledBack = not . Chain.isAfter slotNo

data BlockData
= Rollback ChainPoint
| Block [ContractHeader]

discoveryStore :: Component IO (DiscoveryStoreDependencies r) DiscoveryStore
discoveryStore = component \DiscoveryStoreDependencies{..} -> do
discoveryStore = proc DiscoveryStoreDependencies{..} -> do
changes <- eventAggregator -< chainEvents
worker -< WorkerDependencies{..}

eventAggregator :: Component IO (STM ChainEvent) (STM Changes)
eventAggregator = component \chainEvents -> do
changesVar <- newTVar mempty
let
runAggregator = forever $ atomically do
chainEvent <- chainEvents
modifyTVar changesVar (<> eventToChanges chainEvent)
getChanges = do
changes <- readTVar changesVar
writeTVar changesVar mempty
pure changes
pure (runAggregator, getChanges)

eventToChanges :: ChainEvent -> Changes
eventToChanges = \case
RolledForward block headers -> mempty { headers = Map.singleton block headers }
RolledBackward point -> mempty { rollbackTo = Just point }

data WorkerDependencies r = WorkerDependencies
{ changes :: STM Changes
, eventBackend :: EventBackend IO r DiscoveryStoreSelector
}

worker :: Component IO (WorkerDependencies r) DiscoveryStore
worker = component \WorkerDependencies{..} -> do
blocksVar <- newTVar mempty
roleTokenIndex <- newTVar mempty
let
Expand Down

0 comments on commit 8746305

Please sign in to comment.