Skip to content

Commit

Permalink
Check for rollback blocks in local changes before going to database
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Dec 2, 2022
1 parent 5cdbd6f commit 384b534
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 78 deletions.
7 changes: 4 additions & 3 deletions marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync.hs
Expand Up @@ -49,10 +49,11 @@ data ChainSyncDependencies = ChainSyncDependencies

chainSync :: Component IO ChainSyncDependencies ()
chainSync = proc ChainSyncDependencies{..} -> do
let DatabaseQueries{..} = databaseQueries
NodeClient{..} <- nodeClient -< NodeClientDependencies{..}
let rateLimit = persistRateLimit
ChainStore{..} <- chainStore -< ChainStoreDependencies{..}
rec
let DatabaseQueries{..} = chainStoreDatabaseQueries
NodeClient{..} <- nodeClient -< NodeClientDependencies{..}
ChainStore{..} <- chainStore -< ChainStoreDependencies{..}
chainSyncServer -< ChainSyncServerDependencies{..}
chainSyncQueryServer -< ChainSyncQueryServerDependencies{..}
chainSyncJobServer -< ChainSyncJobServerDependencies{..}
@@ -1,4 +1,5 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StrictData #-}

module Language.Marlowe.Runtime.ChainSync.Database
where
Expand Down Expand Up @@ -39,7 +40,7 @@ hoistCommitGenesisBlock transformation = CommitGenesisBlock . fmap transformatio
-- Queries

newtype GetHeaderAtPoint m = GetHeaderAtPoint
{ runGetHeaderAtPoint :: ChainPoint -> m (WithOrigin BlockHeader) }
{ runGetHeaderAtPoint :: ChainPoint -> m (Maybe (WithOrigin BlockHeader)) }

newtype GetIntersectionPoints m = GetIntersectionPoints
{ runGetIntersectionPoints :: m [ChainPoint] }
Expand Down Expand Up @@ -79,14 +80,15 @@ hoistMoveClient transformation (MoveClient runMoveClient) =
-- Bundles

data DatabaseQueries m = DatabaseQueries
{ commitRollback :: !(CommitRollback m)
, commitBlocks :: !(CommitBlocks m)
, commitGenesisBlock :: !(CommitGenesisBlock m)
, getHeaderAtPoint :: !(GetHeaderAtPoint m)
, getIntersectionPoints :: !(GetIntersectionPoints m)
, getGenesisBlock :: !(GetGenesisBlock m)
, getUTxOs :: !(GetUTxOs m)
, moveClient :: !(MoveClient m)
-- Explicitly Lazy to support value recursion
{ commitRollback :: ~(CommitRollback m)
, commitBlocks :: ~(CommitBlocks m)
, commitGenesisBlock :: ~(CommitGenesisBlock m)
, getHeaderAtPoint :: ~(GetHeaderAtPoint m)
, getIntersectionPoints :: ~(GetIntersectionPoints m)
, getGenesisBlock :: ~(GetGenesisBlock m)
, getUTxOs :: ~(GetUTxOs m)
, moveClient :: ~(MoveClient m)
}

hoistDatabaseQueries :: (forall a. m a -> n a) -> DatabaseQueries m -> DatabaseQueries n
Expand Down
Expand Up @@ -96,7 +96,6 @@ import Data.Profunctor (rmap)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as T
import Data.These (These(..))
import Data.Vector (Vector)
import qualified Data.Vector as V
Expand Down Expand Up @@ -179,19 +178,18 @@ getGenesisBlock = GetGenesisBlock $ HT.statement () $ refineResult (decodeResult

getHeaderAtPoint :: GetHeaderAtPoint Transaction
getHeaderAtPoint = GetHeaderAtPoint \case
ChainPointAtGenesis -> pure Origin
point@(ChainPoint slotNo hash) ->
HT.statement (slotNoToParam slotNo, headerHashToParam hash) $ refineResult decodeResults
ChainPointAtGenesis -> pure $ Just Origin
ChainPoint slotNo hash ->
HT.statement (slotNoToParam slotNo, headerHashToParam hash) $ fmap decodeResults <$>
[maybeStatement|
SELECT block.blockNo :: bigint
FROM chain.block AS block
WHERE block.slotNo = $1 :: bigint
AND block.id = $2 :: bytea
|]
where
decodeResults :: Maybe Int64 -> Either Text (WithOrigin BlockHeader)
decodeResults Nothing = Left $ "No block found at " <> T.pack (show point)
decodeResults (Just blockNo) = Right $ At $ BlockHeader slotNo hash $ BlockNo $ fromIntegral blockNo
decodeResults :: Int64 -> WithOrigin BlockHeader
decodeResults blockNo = At $ BlockHeader slotNo hash $ BlockNo $ fromIntegral blockNo

decodeTxId :: ByteString -> Either Text TxId
decodeTxId txId = case deserialiseFromRawBytes AsTxId txId of
Expand Down
@@ -1,5 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TupleSections #-}

module Language.Marlowe.Runtime.ChainSync.NodeClient
Expand All @@ -8,6 +9,7 @@ module Language.Marlowe.Runtime.ChainSync.NodeClient
, NodeClient(..)
, NodeClientDependencies(..)
, isEmptyChanges
, lookupPointInChanges
, nodeClient
, toEmptyChanges
) where
Expand Down Expand Up @@ -43,22 +45,24 @@ import Control.Arrow ((&&&))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, TVar, atomically, modifyTVar, newTVar, readTVar, writeTVar)
import Control.Monad (guard)
import Data.List (sortOn)
import Data.Functor ((<&>))
import Data.List (find, sortOn)
import Data.Ord (Down(..))
import Language.Marlowe.Runtime.ChainSync.Database (CardanoBlock, GetHeaderAtPoint(..), GetIntersectionPoints(..))
import Ouroboros.Network.Point (WithOrigin(..))
import System.IO (hPutStrLn, stderr)

type NumberedCardanoBlock = (BlockNo, CardanoBlock)
type NumberedChainTip = (WithOrigin BlockNo, ChainTip)

-- | Describes a batch of chain data changes to write.
data Changes = Changes
{ changesRollback :: !(Maybe ChainPoint) -- ^ Point to rollback to before writing any blocks.
, changesBlocks :: ![CardanoBlock] -- ^ New blocks to write.
, changesTip :: !ChainTip -- ^ Most recently observed tip of the local node.
, changesLocalTip :: !ChainTip -- ^ Chain tip the changes will advance the local state to.
, changesBlockCount :: !Int -- ^ Number of blocks in the change set.
, changesTxCount :: !Int -- ^ Number of transactions in the change set.
{ changesRollback :: Maybe ChainPoint -- ^ Point to rollback to before writing any blocks.
, changesBlocks :: [CardanoBlock] -- ^ New blocks to write.
, changesTip :: ChainTip -- ^ Most recently observed tip of the local node.
, changesLocalTip :: ChainTip -- ^ Chain tip the changes will advance the local state to.
, changesBlockCount :: Int -- ^ Number of blocks in the change set.
, changesTxCount :: Int -- ^ Number of transactions in the change set.
}

-- | An empty Changes collection.
Expand Down Expand Up @@ -92,13 +96,14 @@ cost CostModel{..} Changes{..} = changesBlockCount * blockCost + changesTxCount

-- | The set of dependencies needed by the NodeClient component.
data NodeClientDependencies = NodeClientDependencies
{ connectToLocalNode :: !(LocalNodeClientProtocolsInMode CardanoMode -> IO ()) -- ^ Connect to the local node.
, getHeaderAtPoint :: !(GetHeaderAtPoint IO) -- ^ How to load a block header at a given point.
, getIntersectionPoints :: !(GetIntersectionPoints IO) -- ^ How to load the set of initial intersection points for the chain sync client.
-- Explicitly lazy to support value recursion
{ connectToLocalNode :: ~(LocalNodeClientProtocolsInMode CardanoMode -> IO ()) -- ^ Connect to the local node.
, getHeaderAtPoint :: ~(GetHeaderAtPoint IO) -- ^ How to load a block header at a given point.
, getIntersectionPoints :: ~(GetIntersectionPoints IO) -- ^ How to load the set of initial intersection points for the chain sync client.
-- | The maximum cost a set of changes is allowed to incur before the
-- NodeClient blocks.
, maxCost :: Int
, costModel :: CostModel
, maxCost :: ~Int
, costModel :: ~CostModel
}

-- | The public API of the NodeClient component.
Expand Down Expand Up @@ -166,7 +171,10 @@ pipelinedClient costModel maxCost changesVar getHeaderAtPoint getIntersectionPoi
-> NumberedChainTip
-> IO (ClientPipelinedStIdle 'Z NumberedCardanoBlock ChainPoint NumberedChainTip IO ())
clientStIdle point nodeTip = do
clientTip <- fmap blockHeaderToBlockNo <$> runGetHeaderAtPoint getHeaderAtPoint point
mClientTip <- (fmap . fmap) blockHeaderToBlockNo <$> runGetHeaderAtPoint getHeaderAtPoint point
clientTip <- case mClientTip of
Nothing -> error $ "Failed to start chain sync client. Failed to load block number for chain point " <> show point
Just tip -> pure tip
pure $ mkClientStIdle costModel maxCost changesVar getHeaderAtPoint pipelinePolicy Zero clientTip nodeTip

-- How to pipeline. If we have fewer than 50 requests in flight, send
Expand Down Expand Up @@ -244,7 +252,16 @@ mkClientStNext costModel maxCost changesVar getHeaderAtPoint pipelineDecision n
let clientTip = At blockNo
pure $ mkClientStIdle costModel maxCost changesVar getHeaderAtPoint pipelineDecision n clientTip tip
, recvMsgRollBackward = \point tip -> do
clientTip <- fmap blockHeaderToBlockNo <$> runGetHeaderAtPoint getHeaderAtPoint point
tipFromChanges <- atomically $ lookupPointInChanges point <$> readTVar changesVar
clientTip <- case tipFromChanges of
Nothing -> do
mClientTip <- (fmap . fmap) blockHeaderToBlockNo <$> runGetHeaderAtPoint getHeaderAtPoint point
case mClientTip of
Nothing -> error $ "Failed to process rollback. Failed to load block number for rollback point " <> show point
Just clintTip -> pure clintTip
Just clintTip -> do
hPutStrLn stderr "Found rollback point in node client changes"
pure clintTip
atomically $ modifyTVar changesVar \Changes{..} ->
let
changesBlocks' = case point of
Expand Down Expand Up @@ -273,6 +290,16 @@ mkClientStNext costModel maxCost changesVar getHeaderAtPoint pipelineDecision n
pure $ mkClientStIdle costModel maxCost changesVar getHeaderAtPoint pipelineDecision n clientTip tip
}

lookupPointInChanges :: ChainPoint -> Changes -> Maybe (WithOrigin BlockNo)
lookupPointInChanges = \case
ChainPointAtGenesis -> const $ Just Origin
ChainPoint slotNo hash -> \Changes{..} -> do
let
isMatch (BlockInMode (Block (BlockHeader slotNo' hash' _) _) _) =
slotNo == slotNo' && hash == hash'
find isMatch changesBlocks <&> \(BlockInMode (Block (BlockHeader _ _ blockNo) _) _) ->
At blockNo

minPoint :: ChainPoint -> ChainPoint -> ChainPoint
minPoint ChainPointAtGenesis _ = ChainPointAtGenesis
minPoint _ ChainPointAtGenesis = ChainPointAtGenesis
Expand Down
117 changes: 72 additions & 45 deletions marlowe-chain-sync/src/Language/Marlowe/Runtime/ChainSync/Store.hs
@@ -1,15 +1,18 @@
{-# LANGUAGE StrictData #-}

module Language.Marlowe.Runtime.ChainSync.Store
( ChainStore(..)
, ChainStoreDependencies(..)
, Changes(..)
, chainStore
) where

import Cardano.Api (ChainPoint(..), ChainTip(..), SlotNo(..))
import Cardano.Api (BlockHeader(BlockHeader), ChainPoint(..), ChainTip(..), SlotNo(..))
import Cardano.Api.Shelley (Hash(..))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, atomically, newTVar, readTVar, writeTVar)
import Control.Concurrent.STM
(STM, atomically, newEmptyTMVar, newTVar, putTMVar, readTVar, takeTMVar, tryReadTMVar, writeTVar)
import Control.Concurrent.STM.Delay (Delay, newDelay, waitDelay)
import Control.Exception (bracket)
import Control.Monad (guard, unless, when)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Maybe (MaybeT(..))
Expand All @@ -21,39 +24,45 @@ import qualified Data.Text.IO as T
import Data.Time (NominalDiffTime, UTCTime, addUTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds)
import Language.Marlowe.Runtime.ChainSync.Database
import Language.Marlowe.Runtime.ChainSync.Genesis (GenesisBlock)
import Language.Marlowe.Runtime.ChainSync.NodeClient (Changes(..), isEmptyChanges)
import Language.Marlowe.Runtime.ChainSync.NodeClient (Changes(..), isEmptyChanges, lookupPointInChanges)
import Ouroboros.Network.Point (WithOrigin(..))
import Prelude hiding (filter)
import System.IO (stderr)
import System.IO (hPutStrLn, stderr)
import Witherable (Witherable(..))

-- | Set of dependencies required by the ChainStore
data ChainStoreDependencies = ChainStoreDependencies
{ commitRollback :: !(CommitRollback IO) -- ^ How to persist rollbacks in the database backend
, commitBlocks :: !(CommitBlocks IO) -- ^ How to commit blocks in bulk in the database backend
, rateLimit :: !NominalDiffTime -- ^ The minimum time between database writes
, getChanges :: !(STM Changes) -- ^ A source of changes to commit
, getGenesisBlock :: !(GetGenesisBlock IO)
, genesisBlock :: !GenesisBlock
, commitGenesisBlock :: !(CommitGenesisBlock IO)
{ databaseQueries :: DatabaseQueries IO -- ^ How to persist rollbacks in the database backend
, rateLimit :: NominalDiffTime -- ^ The minimum time between database writes
, getChanges :: STM Changes -- ^ A source of changes to commit
, genesisBlock :: GenesisBlock
}

-- | Public API of the ChainStore component
newtype ChainStore = ChainStore
{ localTip :: STM ChainTip -- ^ Action to read the current (local) chain tip
data ChainStore = ChainStore
-- Explicitly lazy to support value recursion
{ localTip :: ~(STM ChainTip) -- ^ Action to read the current (local) chain tip
, chainStoreDatabaseQueries :: ~(DatabaseQueries IO)
}

-- | Create a ChainStore component.
chainStore :: Component IO ChainStoreDependencies ChainStore
chainStore = component \ChainStoreDependencies{..} -> do
localTipVar <- newTVar ChainTipAtGenesis
currentChanges <- newEmptyTMVar
let
DatabaseQueries{..} = databaseQueries

awaitChanges :: Maybe Delay -> STM Changes
awaitChanges delay = do
-- Wait until allowed to write again (determined by rateLimit).
traverse_ waitDelay delay
-- Reading this STM action clears the source of changes.
changes <- getChanges
guard $ not $ isEmptyChanges changes
-- Put the current changes in a local variable to service queries while
-- the database is being updated.
putTMVar currentChanges changes
pure changes

runChainStore :: IO ()
Expand All @@ -67,40 +76,43 @@ chainStore = component \ChainStoreDependencies{..} -> do
where
go lastWrite = do
delay <- wither computeDelay lastWrite
Changes{..} <- atomically $ awaitChanges delay
for_ changesRollback \point -> do
case point of
ChainPointAtGenesis -> T.hPutStrLn stderr "Rolling back to Genesis"
ChainPoint (SlotNo slot) (HeaderHash hash) -> T.hPutStrLn stderr $ T.intercalate " "
[ "Rolling back to block"
, encodeBase16 $ fromShort hash
, "at slot"
, T.pack $ show slot
]
runCommitRollback commitRollback point
when (changesBlockCount > 0) do
T.hPutStrLn stderr $ mconcat
[ "Saving "
, T.pack $ show changesBlockCount
, " blocks, "
, T.pack $ show changesTxCount
, " transactions. New tip: "
, case changesLocalTip of
ChainTipAtGenesis -> "Genesis"
ChainTip (SlotNo slot) (HeaderHash hash) _ -> T.intercalate " "
[ "block"
bracket
(atomically $ awaitChanges delay)
(const $ atomically $ takeTMVar currentChanges)
\Changes{..} -> do
for_ changesRollback \point -> do
case point of
ChainPointAtGenesis -> T.hPutStrLn stderr "Rolling back to Genesis"
ChainPoint (SlotNo slot) (HeaderHash hash) -> T.hPutStrLn stderr $ T.intercalate " "
[ "Rolling back to block"
, encodeBase16 $ fromShort hash
, "at slot"
, T.pack $ show slot
]
, " (node tip: "
, case changesTip of
ChainTipAtGenesis -> "Genesis"
ChainTip (SlotNo slot) _ _ -> T.pack $ show slot
, ")"
]
runCommitBlocks commitBlocks changesBlocks
atomically $ writeTVar localTipVar changesLocalTip
runCommitRollback commitRollback point
when (changesBlockCount > 0) do
T.hPutStrLn stderr $ mconcat
[ "Saving "
, T.pack $ show changesBlockCount
, " blocks, "
, T.pack $ show changesTxCount
, " transactions. New tip: "
, case changesLocalTip of
ChainTipAtGenesis -> "Genesis"
ChainTip (SlotNo slot) (HeaderHash hash) _ -> T.intercalate " "
[ "block"
, encodeBase16 $ fromShort hash
, "at slot"
, T.pack $ show slot
]
, " (node tip: "
, case changesTip of
ChainTipAtGenesis -> "Genesis"
ChainTip (SlotNo slot) _ _ -> T.pack $ show slot
, ")"
]
runCommitBlocks commitBlocks changesBlocks
atomically $ writeTVar localTipVar changesLocalTip
go . Just =<< getCurrentTime

computeDelay :: UTCTime -> IO (Maybe Delay)
Expand All @@ -114,4 +126,19 @@ chainStore = component \ChainStoreDependencies{..} -> do

localTip = readTVar localTipVar

pure (runChainStore, ChainStore { localTip })
getHeaderAtPoint' = GetHeaderAtPoint \case
ChainPointAtGenesis -> pure $ Just Origin
point@(ChainPoint slotNo hash) -> atomically (tryReadTMVar currentChanges) >>= \case
-- If nothing is being saved, just query the database
Nothing -> runGetHeaderAtPoint getHeaderAtPoint point
-- Otherwise, check for the block in the current batch, as it might not
-- be in the database yet.
Just changes -> case lookupPointInChanges point changes of
Nothing -> runGetHeaderAtPoint getHeaderAtPoint point
Just block -> do
hPutStrLn stderr $ "Found point in changes currently being persisted " <> show point
pure $ Just $ BlockHeader slotNo hash <$> block

chainStoreDatabaseQueries = databaseQueries { getHeaderAtPoint = getHeaderAtPoint' }

pure (runChainStore, ChainStore { localTip, chainStoreDatabaseQueries })

0 comments on commit 384b534

Please sign in to comment.