Skip to content

Commit

Permalink
Implement LedgerDB initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jasagredo committed Jan 18, 2022
1 parent f0dbc34 commit ba2098b
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 176 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -79,3 +79,5 @@ haddocks/

# Ignore directory used by .github/bin/check-dependencies
/tmp

.dir-locals.el
5 changes: 0 additions & 5 deletions ouroboros-consensus/.dir-locals.el

This file was deleted.

Expand Up @@ -61,7 +61,7 @@ data ChainDbArgs f m blk = ChainDbArgs {
, cdbTopLevelConfig :: HKD f (TopLevelConfig blk)
, cdbChunkInfo :: HKD f ChunkInfo
, cdbCheckIntegrity :: HKD f (blk -> Bool)
, cdbGenesis :: HKD f (m (ExtLedgerState blk EmptyMK))
, cdbGenesis :: HKD f (m (ExtLedgerState blk ValuesMK))
, cdbCheckInFuture :: HKD f (CheckInFuture m blk)
, cdbImmutableDbCacheConfig :: ImmutableDB.CacheConfig

Expand Down
Expand Up @@ -124,7 +124,7 @@ data LgrDB m blk = LgrDB {
-- When a garbage-collection is performed on the VolatileDB, the points
-- of the blocks eligible for garbage-collection should be removed from
-- this set.
, lgrOnDiskLedgerStDb :: !(LedgerDB.OnDiskLedgerStDb m (ExtLedgerState blk))
, lgrOnDiskLedgerStDb :: !(LedgerDB.OnDiskLedgerStDb m (ExtLedgerState blk) blk)
-- ^
--
-- TODO: align the other fields.
Expand All @@ -149,6 +149,8 @@ type LgrDbSerialiseConstraints blk =
( Serialise (HeaderHash blk)
, EncodeDisk blk (LedgerState blk EmptyMK)
, DecodeDisk blk (LedgerState blk EmptyMK)
, EncodeDisk blk (LedgerState blk ValuesMK)
, DecodeDisk blk (LedgerState blk ValuesMK)
, EncodeDisk blk (AnnTip blk)
, DecodeDisk blk (AnnTip blk)
, EncodeDisk blk (ChainDepState (BlockProtocol blk))
Expand All @@ -161,7 +163,7 @@ type LgrDbSerialiseConstraints blk =

data LgrDbArgs f m blk = LgrDbArgs {
lgrDiskPolicy :: DiskPolicy
, lgrGenesis :: HKD f (m (ExtLedgerState blk EmptyMK))
, lgrGenesis :: HKD f (m (ExtLedgerState blk ValuesMK))
, lgrHasFS :: SomeHasFS m
, lgrHasFSLedgerSt :: SomeHasFS m
, lgrTopLevelConfig :: HKD f (TopLevelConfig blk)
Expand Down Expand Up @@ -273,7 +275,7 @@ initFromDisk
=> LgrDbArgs Identity m blk
-> Tracer m (ReplayGoal blk -> TraceReplayEvent blk)
-> ImmutableDB m blk
-> m (LedgerDB' blk, Word64, OnDiskLedgerStDb m (ExtLedgerState blk))
-> m (LedgerDB' blk, Word64, OnDiskLedgerStDb m (ExtLedgerState blk) blk)
initFromDisk args replayTracer immutableDB = wrapFailure (Proxy @blk) $ do
onDiskLedgerStDb <- LedgerDB.mkOnDiskLedgerStDb lgrHasFSLedgerSt
-- TODO: is it correct that we pick a instance of the 'OnDiskLedgerStDb' here?
Expand All @@ -282,28 +284,32 @@ initFromDisk args replayTracer immutableDB = wrapFailure (Proxy @blk) $ do
replayTracer
lgrTracer
hasFS
decodeExtLedgerState'
lgrHasFSLedgerSt
onDiskLedgerStDb
decodeExtLedgerState'
decode
(configLedgerDb lgrTopLevelConfig)
lgrGenesis
(streamAPI immutableDB)
True
return (db, replayed, onDiskLedgerStDb)
where
LgrDbArgs { lgrHasFS = hasFS, .. } = args

ccfg = configCodec lgrTopLevelConfig

decodeExtLedgerState' :: forall s. Decoder s (ExtLedgerState blk EmptyMK)
decodeExtLedgerState' :: forall s mk. DecodeDisk blk (LedgerState blk mk) => Decoder s (ExtLedgerState blk mk)
decodeExtLedgerState' = decodeExtLedgerState
(decodeDisk ccfg)
(decodeDisk ccfg)
(decodeDisk ccfg)


-- | For testing purposes
mkLgrDB :: StrictTVar m (LedgerDB' blk)
-> StrictTVar m (Set (RealPoint blk))
-> LedgerDB.OnDiskLedgerStDb m (ExtLedgerState blk)
-> LedgerDB.OnDiskLedgerStDb m (ExtLedgerState blk) blk
-> FlushLock
-> (RealPoint blk -> m blk)
-> LgrDbArgs Identity m blk
Expand Down Expand Up @@ -458,7 +464,7 @@ streamAPI ::
forall m blk.
(IOLike m, HasHeader blk)
=> ImmutableDB m blk -> StreamAPI m blk
streamAPI immutableDB = StreamAPI streamAfter
streamAPI immutableDB = StreamAPI streamAfter streamAfterBefore
where
streamAfter :: HasCallStack
=> Point blk
Expand All @@ -481,6 +487,22 @@ streamAPI immutableDB = StreamAPI streamAfter
ImmutableDB.IteratorExhausted -> return $ NoMoreBlocks
ImmutableDB.IteratorResult blk -> return $ NextBlock blk

streamAfterBefore :: HasCallStack
=> Point blk
-> Point blk
-> (Either (RealPoint blk) (m (NextBlock blk)) -> m a)
-> m a
streamAfterBefore from to k = withRegistry $ \registry -> do
eItr <- ImmutableDB.streamWithBounds
immutableDB
registry
GetBlock
from
to
case eItr of
Left err -> k $ Left $ ImmutableDB.missingBlockPoint err
Right itr -> k $ Right $ streamUsing itr

{-------------------------------------------------------------------------------
Previously applied blocks
-------------------------------------------------------------------------------}
Expand Down
Expand Up @@ -48,12 +48,12 @@ module Ouroboros.Consensus.Storage.ImmutableDB.API (
, streamAfterKnownPoint
, streamAfterPoint
, streamAll
, streamWithBounds
, withDB
) where

import qualified Codec.CBOR.Read as CBOR
import Control.Monad.Except (ExceptT (..), lift, runExceptT,
throwError)
import Control.Monad.Except (ExceptT (..), runExceptT, throwError)
import qualified Data.ByteString.Lazy as Lazy
import Data.Either (isRight)
import Data.Function (on)
Expand Down Expand Up @@ -491,10 +491,20 @@ streamAfterPoint ::
-> BlockComponent blk b
-> Point blk
-> m (Either (MissingBlock blk) (Iterator m blk b))
streamAfterPoint db registry blockComponent fromPt = runExceptT $ do
tipPt <- lift $ atomically $ getTipPoint db
streamAfterPoint db registry blockComponent fromPt = do
tipPt <- atomically $ getTipPoint db
streamWithBounds db registry blockComponent fromPt tipPt

streamWithBounds :: (MonadSTM m, HasHeader blk, HasCallStack)
=> ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk b
-> Point blk
-> Point blk
-> m (Either (MissingBlock blk) (Iterator m blk b))
streamWithBounds db registry blockComponent fromPt toPt = runExceptT $
case (pointToWithOriginRealPoint fromPt,
pointToWithOriginRealPoint tipPt) of
pointToWithOriginRealPoint toPt) of

(Origin, Origin) ->
-- Nothing to stream
Expand All @@ -504,22 +514,22 @@ streamAfterPoint db registry blockComponent fromPt = runExceptT $ do
-- Asked to stream after a block while the ImmutableDB is empty
throwError $ NewerThanTip fromPt' GenesisPoint

(NotOrigin fromPt', NotOrigin _) | pointSlot fromPt > pointSlot tipPt ->
(NotOrigin fromPt', NotOrigin _) | pointSlot fromPt > pointSlot toPt ->
-- Lower bound is newer than the tip, nothing to stream
throwError $ NewerThanTip fromPt' tipPt
throwError $ NewerThanTip fromPt' toPt

(NotOrigin fromPt', NotOrigin tipPt') | fromPt' == tipPt' ->
(NotOrigin fromPt', NotOrigin toPt') | fromPt' == toPt' ->
-- Nothing to stream after the tip
return emptyIterator

(_, NotOrigin tipPt') ->
(_, NotOrigin toPt') ->
-- Stream from the given point to the current tip (not genesis)
ExceptT $ stream
db
registry
blockComponent
(StreamFromExclusive fromPt)
(StreamToInclusive tipPt')
(StreamToInclusive toPt')

-- | Variant of 'streamAfterPoint' that throws a 'MissingBlockError' when the
-- point is not in the ImmutableDB (or genesis).
Expand Down
@@ -1,8 +1,10 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE EmptyDataDeriving #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
Expand All @@ -21,13 +23,14 @@
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}


module Ouroboros.Consensus.Storage.LedgerDB.InMemory (
-- * LedgerDB proper
LedgerDbCfg (..)
, ledgerDbWithAnchor
-- ** opaque
, LedgerDB
, LedgerDB (..)
, NewLedgerDB
, OldLedgerDB
-- * Ledger DB types (TODO: we might want to place this somewhere else)
, DbChangelog
, DbReader (..)
Expand Down Expand Up @@ -70,6 +73,14 @@ module Ouroboros.Consensus.Storage.LedgerDB.InMemory (
, ledgerDbPush'
, ledgerDbPushMany'
, ledgerDbSwitch'
-- ** Javier
, Checkpoint (..)
, HasSeqNo (..)
, dbChangelogStates
, extendDbChangelog
, initialDbChangelog
, seqLast
, withBlockReadSets
) where

import Codec.Serialise.Decoding (Decoder)
Expand Down Expand Up @@ -155,6 +166,9 @@ data LedgerDB (l :: LedgerStateKind) = LedgerDB {
}
deriving (Generic)

type OldLedgerDB l = AnchoredSeq (WithOrigin SlotNo) (Checkpoint (l ValuesMK)) (Checkpoint (l ValuesMK))
type NewLedgerDB l = DbChangelog l

deriving instance (Eq (l EmptyMK), Eq (l ValuesMK)) => Eq (LedgerDB l)
deriving instance (NoThunks (l EmptyMK), NoThunks (l ValuesMK)) => NoThunks (LedgerDB l)

Expand Down Expand Up @@ -350,61 +364,65 @@ applyBlock cfg ap db = case ap of

newApplyBlock :: Ap m l blk c -> m (l TrackingMK)
newApplyBlock = \case
ReapplyVal b -> withBlockReadSets b $ \lh ->
ReapplyVal b -> withBlockReadSets b (ledgerDbChangelog db) $ \lh ->
return $ tickThenReapply cfg b lh

ApplyVal b -> withBlockReadSets b $ \lh ->
ApplyVal b -> withBlockReadSets b (ledgerDbChangelog db) $ \lh ->
( either (throwLedgerError db (blockRealPoint b)) return
$ runExcept
$ tickThenApply cfg b lh)

ReapplyRef r -> do
b <- resolveBlock r -- TODO: ask: would it make sense to recursively call applyBlock using ReapplyVal?

withBlockReadSets b $ \lh ->
withBlockReadSets b (ledgerDbChangelog db) $ \lh ->
return $ tickThenReapply cfg b lh

ApplyRef r -> do
b <- resolveBlock r

withBlockReadSets b $ \lh ->
withBlockReadSets b (ledgerDbChangelog db) $ \lh ->
either (throwLedgerError db (blockRealPoint b)) return $ runExcept $
tickThenApply cfg b lh

-- A value of @Weaken@ will not make it to this point, as @applyBlock@ will recurse until it fully unwraps.
Weaken _ -> error "unreachable"

withBlockReadSets
:: ReadsKeySets m l
=> blk
-> (l ValuesMK -> m (l TrackingMK))
-> m (l TrackingMK)
withBlockReadSets b f = do
let ks = getBlockKeySets b :: TableKeySets l
let aks = rewindTableKeySets (ledgerDbChangelog db) ks :: RewoundTableKeySets l
urs <- readDb aks
case withHydratedLedgerState urs f of
Nothing ->
-- We performed the rewind;read;forward sequence in this function. So
-- the forward operation should not fail. If this is the case we're in
-- the presence of a problem that we cannot deal with at this level,
-- so we throw an error.
--
-- When we introduce pipelining, if the forward operation fails it
-- could be because the DB handle was modified by a DB flush that took
-- place when __after__ we read the unforwarded keys-set from disk.
-- However, performing rewind;read;forward with the same __locked__
-- changelog should always succeed.
error "Changelog rewind;read;forward sequence failed."
Just res -> res

withHydratedLedgerState
:: UnforwardedReadSets l
-> (l ValuesMK -> a)
-> Maybe a
withHydratedLedgerState urs f = do
rs <- forwardTableKeySets (ledgerDbChangelog db) urs
return $ f $ withLedgerTables (ledgerDbCurrent db) rs
withBlockReadSets
:: forall m blk l. (ApplyBlock l blk, Monad m, TableStuff l, ReadsKeySets m l)
=> blk
-> DbChangelog l
-> (l ValuesMK -> m (l TrackingMK))
-> m (l TrackingMK)
withBlockReadSets b db f = do
let ks = getBlockKeySets b :: TableKeySets l
let aks = rewindTableKeySets db ks :: RewoundTableKeySets l
urs <- readDb aks
case withHydratedLedgerState urs db f of
Nothing ->
-- We performed the rewind;read;forward sequence in this function. So
-- the forward operation should not fail. If this is the case we're in
-- the presence of a problem that we cannot deal with at this level,
-- so we throw an error.
--
-- When we introduce pipelining, if the forward operation fails it
-- could be because the DB handle was modified by a DB flush that took
-- place when __after__ we read the unforwarded keys-set from disk.
-- However, performing rewind;read;forward with the same __locked__
-- changelog should always succeed.
error "Changelog rewind;read;forward sequence failed."
Just res -> res

withHydratedLedgerState
:: forall l a. (TableStuff l)
=> UnforwardedReadSets l
-> DbChangelog l
-> (l ValuesMK -> a)
-> Maybe a
withHydratedLedgerState urs db f = do
rs <- forwardTableKeySets db urs
return $ f $ withLedgerTables (seqLast . dbChangelogStates $ db) rs


{-------------------------------------------------------------------------------
HD Interface that I need (Could be moved to Ouroboros.Consensus.Ledger.Basics )
Expand Down Expand Up @@ -610,7 +628,6 @@ ledgerDbPrune (SecurityParam k) db = db {
-- 'LedgerDB' and thus a space leak. Alternatively, we could disable the
-- @-fstrictness@ optimisation (enabled by default for -O1). See #2532.
{-# INLINE ledgerDbPrune #-}
{-# LANGUAGE DerivingStrategies #-}

{-------------------------------------------------------------------------------
Internal updates
Expand Down

0 comments on commit ba2098b

Please sign in to comment.