Skip to content

Commit

Permalink
Merge pull request #584 from input-output-hk/kderme/fix-rollback
Browse files Browse the repository at this point in the history
fix rollbacks
  • Loading branch information
erikd committed May 4, 2021
2 parents 5486110 + c85dfae commit 6805d61
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 49 deletions.
94 changes: 64 additions & 30 deletions cardano-sync/src/Cardano/Sync.hs
Expand Up @@ -44,8 +44,8 @@ import Cardano.Slotting.Slot (SlotNo (..), WithOrigin (..))

import Cardano.Sync.Api
import Cardano.Sync.Config
import Cardano.Sync.Database (DbAction (..), DbActionQueue, lengthDbActionQueue,
mkDbApply, mkDbRollback, newDbActionQueue, runDbStartup, writeDbActionQueue)
import Cardano.Sync.Database (runDbStartup)
import Cardano.Sync.DbAction
import Cardano.Sync.Error
import Cardano.Sync.Metrics
import Cardano.Sync.Plugin (SyncNodePlugin (..))
Expand All @@ -66,7 +66,7 @@ import qualified Data.Text as Text
import Network.Mux (MuxTrace, WithMuxBearer)
import Network.Mux.Types (MuxMode (..))

import Network.TypedProtocol.Pipelined (Nat (Succ, Zero))
import Network.TypedProtocol.Pipelined (N (..), Nat (Succ, Zero))
import Ouroboros.Network.Driver.Simple (runPipelinedPeer)

import Ouroboros.Consensus.Block.Abstract (CodecConfig)
Expand Down Expand Up @@ -258,7 +258,7 @@ dbSyncProtocols trce env metricsSetters plugin queryVar runDBThreadFunction vers
(cChainSyncCodec codecs)
channel
(chainSyncClientPeerPipelined
$ chainSyncClient (envDataLayer env) metricsSetters trce env queryVar latestPoints currentTip actionQueue)
$ chainSyncClient plugin metricsSetters trce env queryVar latestPoints currentTip actionQueue)
)

atomically $ writeDbActionQueue actionQueue DbFinish
Expand Down Expand Up @@ -326,8 +326,13 @@ getCurrentTipBlockNo dataLayer = do
-- * update its state when the client receives next block or is requested to
-- rollback, see 'clientStNext' below.
--
-- When an intersect with the node is found, we are sure that the next message
-- will trigger the 'recvMsgRollBackward', so this is where we actually handle
-- any necessary rollback. This means that at this point, the 'currentTip' may not
-- be correct. This is not an issue, because we only use it for performance reasons
-- in the pipeline policy.
chainSyncClient
:: SyncDataLayer
:: SyncNodePlugin
-> MetricSetters
-> Trace IO Text
-> SyncEnv
Expand All @@ -336,45 +341,57 @@ chainSyncClient
-> WithOrigin BlockNo
-> DbActionQueue
-> ChainSyncClientPipelined CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
chainSyncClient dataLayer metricsSetters trce env queryVar latestPoints currentTip actionQueue = do
ChainSyncClientPipelined $ pure $
chainSyncClient _plugin metricsSetters trce env queryVar latestPoints currentTip actionQueue = do
ChainSyncClientPipelined $ pure $ clientPipelinedStIdle currentTip latestPoints
where
clientPipelinedStIdle
:: WithOrigin BlockNo -> [CardanoPoint]
-> ClientPipelinedStIdle 'Z CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
clientPipelinedStIdle clintTip points =
-- Notify the core node about the our latest points at which we are
-- synchronised. This client is not persistent and thus it just
-- synchronises from the genesis block. A real implementation should send
-- a list of points up to a point which is k blocks deep.
SendMsgFindIntersect
(if null latestPoints then [genesisPoint] else latestPoints)
(if null points then [genesisPoint] else points)
ClientPipelinedStIntersect
{ recvMsgIntersectFound = \ _hdr tip -> pure $ go policy Zero currentTip (getTipBlockNo tip)
, recvMsgIntersectNotFound = pure . go policy Zero currentTip . getTipBlockNo
{ recvMsgIntersectFound = \ _hdr tip -> pure $ go policy Zero clintTip (getTipBlockNo tip) Nothing
, recvMsgIntersectNotFound = \tip -> pure $ goTip policy Zero clintTip tip Nothing
}
where

policy :: MkPipelineDecision
policy = pipelineDecisionLowHighMark 1 50

go :: MkPipelineDecision -> Nat n -> WithOrigin BlockNo -> WithOrigin BlockNo
goTip :: MkPipelineDecision -> Nat n -> WithOrigin BlockNo -> Tip CardanoBlock -> Maybe [CardanoPoint]
-> ClientPipelinedStIdle n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
goTip mkPipelineDecision n clientTip serverTip mPoint =
go mkPipelineDecision n clientTip (getTipBlockNo serverTip) mPoint

go :: MkPipelineDecision -> Nat n -> WithOrigin BlockNo -> WithOrigin BlockNo -> Maybe [CardanoPoint]
-> ClientPipelinedStIdle n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
go mkPipelineDecision n clientTip serverTip =
case (n, runPipelineDecision mkPipelineDecision n clientTip serverTip) of
(_Zero, (Request, mkPipelineDecision')) ->
go mkPipelineDecision n clientTip serverTip mPoint =
case (mPoint, n, runPipelineDecision mkPipelineDecision n clientTip serverTip) of
(Just points, _, _) -> drainThePipe n $ clientPipelinedStIdle clientTip points
(_, _Zero, (Request, mkPipelineDecision')) ->
SendMsgRequestNext clientStNext (pure clientStNext)
where
clientStNext = mkClientStNext $ \clientBlockNo newServerTip -> go mkPipelineDecision' n clientBlockNo (getTipBlockNo newServerTip)
(_, (Pipeline, mkPipelineDecision')) ->
clientStNext = mkClientStNext $ goTip mkPipelineDecision' n
(_, _, (Pipeline, mkPipelineDecision')) ->
SendMsgRequestNextPipelined
(go mkPipelineDecision' (Succ n) clientTip serverTip)
(Succ n', (CollectOrPipeline, mkPipelineDecision')) ->
(go mkPipelineDecision' (Succ n) clientTip serverTip Nothing)
(_, Succ n', (CollectOrPipeline, mkPipelineDecision')) ->
CollectResponse
(Just . pure $ SendMsgRequestNextPipelined $ go mkPipelineDecision' (Succ n) clientTip serverTip)
(mkClientStNext $ \clientBlockNo newServerTip -> go mkPipelineDecision' n' clientBlockNo (getTipBlockNo newServerTip))
(Succ n', (Collect, mkPipelineDecision')) ->
(Just . pure $ SendMsgRequestNextPipelined $ go mkPipelineDecision' (Succ n) clientTip serverTip Nothing)
(mkClientStNext $ goTip mkPipelineDecision' n')
(_, Succ n', (Collect, mkPipelineDecision')) ->
CollectResponse
Nothing
(mkClientStNext $ \clientBlockNo newServerTip -> go mkPipelineDecision' n' clientBlockNo (getTipBlockNo newServerTip))
(mkClientStNext $ goTip mkPipelineDecision' n')

mkClientStNext :: (WithOrigin BlockNo -> Tip CardanoBlock
-> ClientPipelinedStIdle n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO a)
-> ClientStNext n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO a
mkClientStNext
:: (WithOrigin BlockNo -> Tip CardanoBlock -> Maybe [CardanoPoint]
-> ClientPipelinedStIdle n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ())
-> ClientStNext n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
mkClientStNext finish =
ClientStNext
{ recvMsgRollForward = \blk tip ->
Expand All @@ -389,18 +406,35 @@ chainSyncClient dataLayer metricsSetters trce env queryVar latestPoints currentT

setDbQueueLength metricsSetters newSize

pure $ finish (At (blockNo blk)) tip
pure $ finish (At (blockNo blk)) tip Nothing
, recvMsgRollBackward = \point tip ->
logException trce "recvMsgRollBackward: " $ do
-- This will get the current tip rather than what we roll back to
-- but will only be incorrect for a short time span.
atomically $ writeDbActionQueue actionQueue (mkDbRollback point)
newTip <- getCurrentTipBlockNo dataLayer
pure $ finish newTip tip
mPoints <- waitRollback actionQueue point
newTip <- getCurrentTipBlockNo (envDataLayer env)
pure $ finish newTip tip mPoints
}

logProtocolMagicId :: Trace IO Text -> Crypto.ProtocolMagicId -> ExceptT SyncNodeError IO ()
logProtocolMagicId tracer pm =
liftIO . logInfo tracer $ mconcat
[ "NetworkMagic: ", textShow (Crypto.unProtocolMagicId pm)
]

drainThePipe
:: Nat n -> ClientPipelinedStIdle 'Z CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
-> ClientPipelinedStIdle n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
drainThePipe n0 client = go n0
where
go :: forall n'. Nat n'
-> ClientPipelinedStIdle n' CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
go n =
case n of
Zero -> client
Succ n' ->
CollectResponse Nothing $
ClientStNext
{ recvMsgRollForward = \_hdr _tip -> pure $ go n'
, recvMsgRollBackward = \_pt _tip -> pure $ go n'
}
11 changes: 8 additions & 3 deletions cardano-sync/src/Cardano/Sync/Api.hs
Expand Up @@ -6,6 +6,7 @@ module Cardano.Sync.Api
, LedgerEnv (..)
, SyncDataLayer (..)
, mkSyncEnvFromConfig
, verifyFilePoints
, getLatestPoints
) where

Expand Down Expand Up @@ -87,9 +88,8 @@ mkSyncEnvFromConfig dataLayer dir genCfg =
(SystemStart (Byron.gdStartTime $ Byron.configGenesisData bCfg))
dir

getLatestPoints :: SyncEnv -> IO [CardanoPoint]
getLatestPoints env = do
files <- listLedgerStateFilesOrdered $ leDir $ envLedger env
verifyFilePoints :: SyncEnv -> [LedgerStateFile] -> IO [CardanoPoint]
verifyFilePoints env files =
catMaybes <$> mapM validLedgerFileToPoint files
where
validLedgerFileToPoint :: LedgerStateFile -> IO (Maybe CardanoPoint)
Expand All @@ -106,3 +106,8 @@ getLatestPoints env = do

convertHashBlob :: ByteString -> Maybe (HeaderHash CardanoBlock)
convertHashBlob = Just . fromRawHash (Proxy @CardanoBlock)

getLatestPoints :: SyncEnv -> IO [CardanoPoint]
getLatestPoints env = do
files <- listLedgerStateFilesOrdered $ leDir $ envLedger env
verifyFilePoints env files
62 changes: 55 additions & 7 deletions cardano-sync/src/Cardano/Sync/Database.hs
@@ -1,23 +1,24 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}

module Cardano.Sync.Database
( DbAction (..)
, DbActionQueue (..)
, lengthDbActionQueue
, mkDbApply
, mkDbRollback
, newDbActionQueue
, runDbStartup
, runDbThread
, writeDbActionQueue
) where

import Cardano.Prelude
import Cardano.Prelude hiding (atomically)

import Cardano.BM.Trace (Trace, logDebug, logError, logInfo)

import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Extra (whenJust)
import Control.Monad.Trans.Except.Extra (newExceptT)

Expand All @@ -30,6 +31,16 @@ import Cardano.Sync.Plugin
import Cardano.Sync.Types
import Cardano.Sync.Util hiding (whenJust)

import Cardano.Slotting.Slot (WithOrigin (..))

import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Extended

import Ouroboros.Network.Block (Point (..))
import Ouroboros.Network.Point (blockPointHash, blockPointSlot)

import System.IO.Error

data NextState
= Continue
| Done
Expand Down Expand Up @@ -80,20 +91,57 @@ runActions trce env plugin actions = do
case spanDbApply xs of
([], DbFinish:_) -> do
pure Done
([], DbRollBackToPoint pt:ys) -> do
runRollbacks trce plugin pt
liftIO $ loadLedgerStateAtPoint (envLedger env) pt
([], DbRollBackToPoint pt resultVar : ys) -> do
runRollbacksDB trce plugin pt
res <- lift $ rollbackLedger trce plugin env pt
lift $ atomically $ putTMVar resultVar res
dbAction Continue ys
(ys, zs) -> do
insertBlockList trce env plugin ys
if null zs
then pure Continue
else dbAction Continue zs

runRollbacks
rollbackLedger :: Trace IO Text -> SyncNodePlugin -> SyncEnv -> CardanoPoint -> IO (Maybe [CardanoPoint])
rollbackLedger _trce _plugin env point = do
mst <- loadLedgerAtPoint (envLedger env) point True
dbBlock <- sdlGetLatestBlock (envDataLayer env)
case mst of
Right st -> do
checkDBWithState point dbBlock
let statePoint = headerStatePoint $ headerState $ clsState st
-- This check should always succeed, since 'loadLedgerAtPoint' returns succesfully.
-- we just leave it here for extra validation.
checkDBWithState statePoint dbBlock
pure Nothing
Left lsfs -> do
points <- verifyFilePoints env lsfs
pure $ Just points
where

checkDBWithState :: CardanoPoint -> Maybe Block -> IO ()
checkDBWithState pnt blk =
if compareTips pnt blk
then pure ()
else throwIO . userError $
mconcat
[ "Ledger state point ", show pnt, " and db tip "
, show blk, " don't match"
]

compareTips :: CardanoPoint -> Maybe Block -> Bool
compareTips = go
where
go (Point Origin) Nothing = True
go (Point (At blk)) (Just tip) =
getHeaderHash (blockPointHash blk) == bHash tip
&& blockPointSlot blk == bSlotNo tip
go _ _ = False

runRollbacksDB
:: Trace IO Text -> SyncNodePlugin -> CardanoPoint
-> ExceptT SyncNodeError IO ()
runRollbacks trce plugin point =
runRollbacksDB trce plugin point =
newExceptT
. traverseMEither (\ f -> f trce point)
$ plugRollbackBlock plugin
Expand Down
15 changes: 11 additions & 4 deletions cardano-sync/src/Cardano/Sync/DbAction.hs
Expand Up @@ -8,9 +8,9 @@ module Cardano.Sync.DbAction
, blockingFlushDbActionQueue
, lengthDbActionQueue
, mkDbApply
, mkDbRollback
, newDbActionQueue
, writeDbActionQueue
, waitRollback
) where

import Cardano.Prelude
Expand All @@ -21,9 +21,11 @@ import qualified Control.Concurrent.STM as STM
import Control.Concurrent.STM.TBQueue (TBQueue)
import qualified Control.Concurrent.STM.TBQueue as TBQ

import Control.Monad.Class.MonadSTM.Strict (StrictTMVar, newEmptyTMVarIO, takeTMVar)

data DbAction
= DbApplyBlock !BlockDetails
| DbRollBackToPoint !CardanoPoint
| DbRollBackToPoint !CardanoPoint (StrictTMVar IO (Maybe [CardanoPoint]))
| DbFinish

newtype DbActionQueue = DbActionQueue
Expand All @@ -34,8 +36,13 @@ mkDbApply :: CardanoBlock -> SlotDetails -> DbAction
mkDbApply cblk details =
DbApplyBlock (BlockDetails cblk details)

mkDbRollback :: CardanoPoint -> DbAction
mkDbRollback = DbRollBackToPoint
-- | This simulates a synhronous operations, since the thread waits for the db
-- worker thread to finish the rollback.
waitRollback :: DbActionQueue -> CardanoPoint -> IO (Maybe [CardanoPoint])
waitRollback queue point = do
resultVar <- newEmptyTMVarIO
atomically $ writeDbActionQueue queue $ DbRollBackToPoint point resultVar
atomically $ takeTMVar resultVar

lengthDbActionQueue :: DbActionQueue -> STM Natural
lengthDbActionQueue (DbActionQueue q) = STM.lengthTBQueue q
Expand Down

0 comments on commit 6805d61

Please sign in to comment.