Skip to content

Commit

Permalink
Introduce the MKs to the ChainDB
Browse files Browse the repository at this point in the history
  • Loading branch information
jasagredo committed Feb 8, 2023
1 parent b9704de commit b06b57f
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 83 deletions.
59 changes: 53 additions & 6 deletions ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/API.hs
Expand Up @@ -80,7 +80,7 @@ import Ouroboros.Consensus.HeaderStateHistory
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Util ((..:))
import Ouroboros.Consensus.Util (StaticEither, (..:))
import Ouroboros.Consensus.Util.CallStack
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
Expand All @@ -93,6 +93,10 @@ import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.FS.API.Types (FsError)
import Ouroboros.Consensus.Storage.LedgerDB (LedgerDB')
import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB
import Ouroboros.Consensus.Storage.LedgerDB.HD.BackingStore
(LedgerBackingStoreValueHandle)
import Ouroboros.Consensus.Storage.LedgerDB.HD.ReadsKeySets
(PointNotFound)
import Ouroboros.Consensus.Storage.Serialisation

-- Support for tests
Expand Down Expand Up @@ -335,6 +339,45 @@ data ChainDB m blk = ChainDB {
-- invalid block is detected. These blocks are likely to be valid.
, getIsInvalidBlock :: STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk)))

-- | Get a 'LedgerDB' and a handle to a value of the backing store
-- corresponding to the anchor of the 'LedgerDB'
--
-- In the 'StaticRight' case, 'Left pt' out means the requested point is
-- not on current chain, so that 'LedgerDB' is unavailable.
--
-- The return type in the end contains a value handle that can be used to
-- perform reads on the backing store, a LedgerDB truncated at the
-- requested point and a function for releasing the value handle.
--
-- The value handle is allocated in the given registry.
, getLedgerBackingStoreValueHandle ::
forall b.
ResourceRegistry m
-> StaticEither b () (Point blk)
-> m (StaticEither
b
( LedgerBackingStoreValueHandle m (ExtLedgerState blk)
, LedgerDB' blk
, m ()
)
(Either
(Point blk)
( LedgerBackingStoreValueHandle m (ExtLedgerState blk)
, LedgerDB' blk
, m ()
)
)
)

-- | Read and forward the values up to the given point on the chain. Returns
-- Nothing if the anchor moved or if the state is not found on the ledger db.
, getLedgerTablesAtFor ::
Point blk
-> LedgerTables (ExtLedgerState blk) KeysMK
-> m (Either
(PointNotFound blk)
(LedgerTables (ExtLedgerState blk) ValuesMK))

-- | Close the ChainDB
--
-- Idempotent.
Expand All @@ -346,6 +389,10 @@ data ChainDB m blk = ChainDB {
--
-- 'False' when the database is closed.
, isOpen :: STM m Bool

-- | Perform a monadic operation holding the read lock on the DB
-- changelog.
, withLgrReadLock :: forall a. m a -> m a
}

getCurrentTip :: (Monad (STM m), HasHeader (Header blk))
Expand All @@ -359,13 +406,13 @@ getTipBlockNo = fmap Network.getTipBlockNo . getCurrentTip
-- | Get current ledger
getCurrentLedger ::
(Monad (STM m), IsLedger (LedgerState blk))
=> ChainDB m blk -> STM m (ExtLedgerState blk)
=> ChainDB m blk -> STM m (ExtLedgerState blk EmptyMK)
getCurrentLedger = fmap LedgerDB.ledgerDbCurrent . getLedgerDB

-- | Get the immutable ledger, i.e., typically @k@ blocks back.
getImmutableLedger ::
Monad (STM m)
=> ChainDB m blk -> STM m (ExtLedgerState blk)
=> ChainDB m blk -> STM m (ExtLedgerState blk EmptyMK)
getImmutableLedger = fmap LedgerDB.ledgerDbAnchor . getLedgerDB

-- | Get the ledger for the given point.
Expand All @@ -374,8 +421,8 @@ getImmutableLedger = fmap LedgerDB.ledgerDbAnchor . getLedgerDB
-- chain (i.e., older than @k@ or not on the current chain), 'Nothing' is
-- returned.
getPastLedger ::
(Monad (STM m), LedgerSupportsProtocol blk)
=> ChainDB m blk -> Point blk -> STM m (Maybe (ExtLedgerState blk))
(Monad (STM m), LedgerSupportsProtocol blk, StandardHash (ExtLedgerState blk))
=> ChainDB m blk -> Point blk -> STM m (Maybe (ExtLedgerState blk EmptyMK))
getPastLedger db pt = LedgerDB.ledgerDbPast pt <$> getLedgerDB db

-- | Get a 'HeaderStateHistory' populated with the 'HeaderState's of the
Expand All @@ -390,7 +437,7 @@ getHeaderStateHistory = fmap toHeaderStateHistory . getLedgerDB
-> HeaderStateHistory blk
toHeaderStateHistory =
HeaderStateHistory
. LedgerDB.ledgerDbBimap headerState headerState
. LedgerDB.volatileStatesBimap headerState headerState

{-------------------------------------------------------------------------------
Adding a block
Expand Down
Expand Up @@ -42,6 +42,7 @@ import Data.Functor ((<&>))
import Data.Functor.Identity (Identity)
import qualified Data.Map.Strict as Map
import Data.Maybe.Strict (StrictMaybe (..))
import Data.SOP (K (..))
import GHC.Stack (HasCallStack)

import qualified Ouroboros.Network.AnchoredFragment as AF
Expand Down Expand Up @@ -163,7 +164,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
traceWith initChainSelTracer InitalChainSelected

let chain = VF.validatedFragment chainAndLedger
ledger = VF.validatedLedger chainAndLedger
K ledger = VF.validatedLedger chainAndLedger
cfg = Args.cdbTopLevelConfig args

atomically $ LgrDB.setCurrent lgrDB ledger
Expand Down Expand Up @@ -220,6 +221,11 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
, closeDB = closeDB h
, isOpen = isOpen h
, getLedgerBackingStoreValueHandle = \rreg p -> getEnv h $ \env' -> do
Query.getLedgerBackingStoreValueHandle env' rreg p
, getLedgerTablesAtFor = \pt keys -> getEnv h (LgrDB.getLedgerTablesAtFor pt keys . cdbLgrDB)
, withLgrReadLock = \m -> getEnv h (flip LgrDB.withReadLock m . cdbLgrDB)

}
testing = Internal
{ intCopyToImmutableDB = getEnv h Background.copyToImmutableDB
Expand Down
Expand Up @@ -20,6 +20,7 @@ import Control.Tracer (Tracer, contramap, nullTracer)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.Fragment.InFuture (CheckInFuture)
import Ouroboros.Consensus.Ledger.Basics
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)
Expand Down Expand Up @@ -63,7 +64,7 @@ data ChainDbArgs f m blk = ChainDbArgs {
-- ^ Predicate to check for integrity of
-- 'Ouroboros.Consensus.Storage.Common.GetVerifiedBlock' components when
-- extracting them from both the VolatileDB and the ImmutableDB.
, cdbGenesis :: HKD f (m (ExtLedgerState blk))
, cdbGenesis :: HKD f (m (ExtLedgerState blk ValuesMK))
, cdbCheckInFuture :: HKD f (CheckInFuture m blk)
, cdbImmutableDbCacheConfig :: ImmutableDB.CacheConfig

Expand Down
Expand Up @@ -57,7 +57,6 @@ import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HardFork.Abstract
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract
Expand Down Expand Up @@ -236,10 +235,7 @@ copyToImmutableDB CDB{..} = withCopyLock $ do
copyAndSnapshotRunner
:: forall m blk.
( IOLike m
, ConsensusProtocol (BlockProtocol blk)
, HasHeader blk
, GetHeader blk
, IsLedger (LedgerState blk)
, LedgerSupportsProtocol blk
, LgrDbSerialiseConstraints blk
)
=> ChainDbEnv m blk
Expand Down Expand Up @@ -296,10 +292,9 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed =
-- | Write a snapshot of the LedgerDB to disk and remove old snapshots
-- (typically one) so that only 'onDiskNumSnapshots' snapshots are on disk.
updateLedgerSnapshots ::
( IOLike m
( IOLike m
, LgrDbSerialiseConstraints blk
, HasHeader blk
, IsLedger (LedgerState blk)
, LedgerSupportsProtocol blk
)
=> ChainDbEnv m blk -> m ()
updateLedgerSnapshots CDB{..} = do
Expand Down
Expand Up @@ -36,6 +36,7 @@ import Data.Maybe.Strict (StrictMaybe (..), isSNothing,
strictMaybeToMaybe)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.SOP (K (..), unK)
import GHC.Stack (HasCallStack)

import Ouroboros.Network.AnchoredFragment (Anchor, AnchoredFragment,
Expand Down Expand Up @@ -116,7 +117,7 @@ initialChainSelection immutableDB volatileDB lgrDB tracer cfg varInvalid
<$> ImmutableDB.getTipAnchor immutableDB
<*> (ignoreInvalidSuc volatileDB invalid <$>
VolatileDB.filterByPredecessor volatileDB)
<*> LgrDB.getCurrent lgrDB
<*> (K <$> LgrDB.getCurrent lgrDB)

chains <- constructChains i succsOf

Expand All @@ -141,7 +142,7 @@ initialChainSelection immutableDB volatileDB lgrDB tracer cfg varInvalid
-- This is guaranteed by the fact that all constructed candidates start
-- from this tip.
toChainAndLedger
:: ValidatedChainDiff (Header blk) (LedgerDB' blk)
:: ValidatedChainDiff (Header blk) (K (LedgerDB' blk))
-> ChainAndLedger blk
toChainAndLedger (ValidatedChainDiff chainDiff ledger) =
case chainDiff of
Expand Down Expand Up @@ -185,7 +186,7 @@ initialChainSelection immutableDB volatileDB lgrDB tracer cfg varInvalid
-- @i@.
-> NonEmpty (AnchoredFragment (Header blk))
-- ^ Candidates anchored at @i@
-> m (Maybe (ValidatedChainDiff (Header blk) (LedgerDB' blk)))
-> m (Maybe (ValidatedChainDiff (Header blk) (K (LedgerDB' blk))))
chainSelection' curChainAndLedger candidates =
assert (all ((LgrDB.currentPoint ledger ==) .
castPoint . AF.anchorPoint)
Expand All @@ -195,7 +196,7 @@ initialChainSelection immutableDB volatileDB lgrDB tracer cfg varInvalid
chainSelection cse (Diff.extend <$> candidates)
where
curChain = VF.validatedFragment curChainAndLedger
ledger = VF.validatedLedger curChainAndLedger
K ledger = VF.validatedLedger curChainAndLedger
chainSelEnv = do
varTentativeState <- newTVarIO NoLastInvalidTentative
varTentativeHeader <- newTVarIO SNothing
Expand Down Expand Up @@ -460,7 +461,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
<*> VolatileDB.getBlockInfo cdbVolatileDB
<*> Query.getCurrentChain cdb
<*> Query.getTipPoint cdb
<*> LgrDB.getCurrent cdbLgrDB
<*> (K <$> LgrDB.getCurrent cdbLgrDB)
let curChainAndLedger :: ChainAndLedger blk
curChainAndLedger =
-- The current chain we're working with here is not longer than @k@
Expand Down Expand Up @@ -686,7 +687,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
cfg :: TopLevelConfig blk
cfg = cdbTopLevelConfig

ledger :: LedgerState blk
ledger :: LedgerState blk EmptyMK
ledger = ledgerState (LgrDB.ledgerDbCurrent newLedgerDB)

summary :: History.Summary (HardForkIndices blk)
Expand Down Expand Up @@ -715,7 +716,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
-- us, as we cannot roll back more than @k@ headers anyway.
switchTo
:: HasCallStack
=> ValidatedChainDiff (Header blk) (LedgerDB' blk)
=> ValidatedChainDiff (Header blk) (K (LedgerDB' blk))
-- ^ Chain and ledger to switch to
-> StrictTVar m (StrictMaybe (Header blk))
-- ^ Tentative header
Expand Down Expand Up @@ -779,7 +780,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do

return $ castPoint $ AF.headPoint newChain
where
ValidatedChainDiff chainDiff newLedger = vChainDiff
ValidatedChainDiff chainDiff (K newLedger) = vChainDiff

-- | We have a new block @b@ that doesn't fit onto the current chain, but
-- we have found a 'ChainDiff' connecting it to the current chain via
Expand Down Expand Up @@ -871,7 +872,7 @@ chainSelection
)
=> ChainSelEnv m blk
-> NonEmpty (ChainDiff (Header blk))
-> m (Maybe (ValidatedChainDiff (Header blk) (LedgerDB' blk)))
-> m (Maybe (ValidatedChainDiff (Header blk) (K (LedgerDB' blk))))
-- ^ The (valid) chain diff and corresponding LedgerDB that was selected,
-- or 'Nothing' if there is no valid chain diff preferred over the current
-- chain.
Expand Down Expand Up @@ -900,7 +901,7 @@ chainSelection chainSelEnv chainDiffs =
-- [Ouroboros] below.
go ::
[ChainDiff (Header blk)]
-> m (Maybe (ValidatedChainDiff (Header blk) (LedgerDB' blk)))
-> m (Maybe (ValidatedChainDiff (Header blk) (K (LedgerDB' blk))))
go [] = return Nothing
go (candidate:candidates0) = do
mTentativeHeader <- setTentativeHeader
Expand Down Expand Up @@ -1016,7 +1017,7 @@ chainSelection chainSelEnv chainDiffs =
data ValidationResult blk =
-- | The entire candidate fragment was valid. No blocks were from the
-- future.
FullyValid (ValidatedChainDiff (Header blk) (LedgerDB' blk))
FullyValid (ValidatedChainDiff (Header blk) (K (LedgerDB' blk)))

-- | The candidate fragment contained invalid blocks and/or blocks from
-- the future that had to be truncated from the fragment.
Expand Down Expand Up @@ -1049,7 +1050,7 @@ ledgerValidateCandidate
)
=> ChainSelEnv m blk
-> ChainDiff (Header blk)
-> m (ValidatedChainDiff (Header blk) (LedgerDB' blk))
-> m (ValidatedChainDiff (Header blk) (K (LedgerDB' blk)))
ledgerValidateCandidate chainSelEnv chainDiff@(ChainDiff rollback suffix) =
LgrDB.validate lgrDB curLedger blockCache rollback traceUpdate newBlocks >>= \case
LgrDB.ValidateExceededRollBack {} ->
Expand Down Expand Up @@ -1089,11 +1090,11 @@ ledgerValidateCandidate chainSelEnv chainDiff@(ChainDiff rollback suffix) =
-- we should punish. (Tacit assumption made here: it's impossible
-- three blocks in a row have the same slot.)

return $ ValidatedDiff.new chainDiff' ledger'
return $ ValidatedDiff.new chainDiff' (K ledger')

LgrDB.ValidateSuccessful ledger' -> do
traceWith validationTracer (ValidCandidate suffix)
return $ ValidatedDiff.new chainDiff ledger'
return $ ValidatedDiff.new chainDiff (K ledger')
where
ChainSelEnv {
lgrDB
Expand All @@ -1107,7 +1108,7 @@ ledgerValidateCandidate chainSelEnv chainDiff@(ChainDiff rollback suffix) =
traceUpdate = traceWith $ UpdateLedgerDbTraceEvent >$< validationTracer

curLedger :: LedgerDB' blk
curLedger = VF.validatedLedger curChainAndLedger
K curLedger = VF.validatedLedger curChainAndLedger

newBlocks :: [Header blk]
newBlocks = AF.toOldestFirst suffix
Expand All @@ -1132,9 +1133,9 @@ ledgerValidateCandidate chainSelEnv chainDiff@(ChainDiff rollback suffix) =
futureCheckCandidate
:: forall m blk. (IOLike m, LedgerSupportsProtocol blk)
=> ChainSelEnv m blk
-> ValidatedChainDiff (Header blk) (LedgerDB' blk)
-> ValidatedChainDiff (Header blk) (K (LedgerDB' blk))
-> m (Either (ChainDiff (Header blk))
(ValidatedChainDiff (Header blk) (LedgerDB' blk)))
(ValidatedChainDiff (Header blk) (K (LedgerDB' blk))))
futureCheckCandidate chainSelEnv validatedChainDiff =
checkInFuture futureCheck validatedSuffix >>= \case

Expand Down Expand Up @@ -1198,8 +1199,17 @@ futureCheckCandidate chainSelEnv validatedChainDiff =

validatedSuffix :: ValidatedFragment (Header blk) (LedgerState blk)
validatedSuffix =
ledgerState . LgrDB.ledgerDbCurrent <$>
ValidatedDiff.toValidatedFragment validatedChainDiff
let
validatedChainDiff' = ValidatedDiff.toValidatedFragment validatedChainDiff
in
validatedChainDiff' {
VF.validatedLedger = ledgerState
. LgrDB.ledgerDbCurrent
. unK
. VF.validatedLedger
$ validatedChainDiff'
}


-- | Validate a candidate chain using 'ledgerValidateCandidate' and
-- 'futureCheck'.
Expand Down Expand Up @@ -1244,7 +1254,7 @@ validateCandidate chainSelEnv chainDiff =
-------------------------------------------------------------------------------}

-- | Instantiate 'ValidatedFragment' in the way that chain selection requires.
type ChainAndLedger blk = ValidatedFragment (Header blk) (LedgerDB' blk)
type ChainAndLedger blk = ValidatedFragment (Header blk) (K (LedgerDB' blk))

{-------------------------------------------------------------------------------
Diffusion pipelining
Expand Down

0 comments on commit b06b57f

Please sign in to comment.