Skip to content

Commit

Permalink
Reorganize and prune some functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jasagredo committed May 26, 2023
1 parent a053819 commit 4f91f0f
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 191 deletions.
Expand Up @@ -81,7 +81,7 @@ import qualified Ouroboros.Consensus.Storage.LedgerDB.DbChangelog.Update as Ledg
import Ouroboros.Consensus.Storage.LedgerDB.ReadsKeySets
(PointNotFound)
import Ouroboros.Consensus.Storage.Serialisation
import Ouroboros.Consensus.Util (StaticEither, (..:))
import Ouroboros.Consensus.Util ((..:))
import Ouroboros.Consensus.Util.CallStack
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
Expand Down Expand Up @@ -168,12 +168,12 @@ data ChainDB m blk = ChainDB {
-- | Return the LedgerDB containing the last @k@ ledger states.
, getLedgerDB :: STM m (LedgerDB.DbChangelog' blk)

-- | Acquire a value handle and ledger DB, both anchored at the same slot
-- and truncated to the specified point if the provided point exists on
-- the db.
--
-- Note that the ValueHandle should be closed by the caller of this
-- function.
-- -- | Acquire a value handle and ledger DB, both anchored at the same slot
-- -- and truncated to the specified point if the provided point exists on
-- -- the db.
-- --
-- -- Note that the ValueHandle should be closed by the caller of this
-- -- function.
, getLedgerDBViewAtPoint ::
Maybe (Point blk)
-> m ( Either
Expand Down Expand Up @@ -346,45 +346,12 @@ data ChainDB m blk = ChainDB {
-- invalid block is detected. These blocks are likely to be valid.
, getIsInvalidBlock :: STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk)))

-- | Get a 'LedgerDB' and a handle to a value of the backing store
-- corresponding to the anchor of the 'LedgerDB'
--
-- In the 'StaticRight' case, 'Left pt' out means the requested point is
-- not on current chain, so that 'LedgerDB' is unavailable.
--
-- The return type in the end contains a value handle that can be used to
-- perform reads on the backing store, a LedgerDB truncated at the
-- requested point and a function for releasing the value handle.
--
-- The value handle is allocated in the given registry.
--
-- This is intended to be used on queries, to get an ephemeral stable view
-- of the backing store.
, getLedgerBackingStoreValueHandle ::
forall b.
ResourceRegistry m
-> StaticEither b () (Point blk)
-> m (StaticEither
b
( LedgerBackingStoreValueHandle m (ExtLedgerState blk)
, LedgerDB.DbChangelog' blk
, m ()
)
(Either
(Point blk)
( LedgerBackingStoreValueHandle m (ExtLedgerState blk)
, LedgerDB.DbChangelog' blk
, m ()
)
)
)

-- | Read and forward the values up to the given point on the chain.
-- Returns Nothing if the anchor moved or if the state is not found on the
-- ledger db.
--
-- This is intended to be used by the mempool to hydrate a ledger state at
-- a specific point
-- -- | Read and forward the values up to the given point on the chain.
-- -- Returns Nothing if the anchor moved or if the state is not found on the
-- -- ledger db.
-- --
-- -- This is intended to be used by the mempool to hydrate a ledger state at
-- -- a specific point
, getLedgerTablesAtFor ::
Point blk
-> LedgerTables (ExtLedgerState blk) KeysMK
Expand Down
Expand Up @@ -33,7 +33,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl (
, openDBInternal
) where

import Control.Monad (when)
import Control.Monad (void, when)
import Control.Monad.Trans.Class (lift)
import Control.Tracer
import Data.Functor ((<&>))
Expand Down Expand Up @@ -204,33 +204,31 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
}
h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
let chainDB = API.ChainDB
{ addBlockAsync = getEnv2 h ChainSel.addBlockAsync
, getCurrentChain = getEnvSTM h Query.getCurrentChain
, getLedgerDB = getEnvSTM h Query.getLedgerDB
, getTipBlock = getEnv h Query.getTipBlock
, getTipHeader = getEnv h Query.getTipHeader
, getTipPoint = getEnvSTM h Query.getTipPoint
, getBlockComponent = getEnv2 h Query.getBlockComponent
, getIsFetched = getEnvSTM h Query.getIsFetched
, getIsValid = getEnvSTM h Query.getIsValid
, getMaxSlotNo = getEnvSTM h Query.getMaxSlotNo
, stream = Iterator.stream h
, newFollower = Follower.newFollower h
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
, closeDB = closeDB h
, isOpen = isOpen h
, getLedgerBackingStoreValueHandle = \rreg p -> getEnv h $ \env' ->
Query.getLedgerBackingStoreValueHandle env' rreg p
, getLedgerTablesAtFor = \pt keys -> getEnv h (\st -> LedgerDB.getLedgerTablesAtFor (cdbLedgerDB st) pt keys)
, getLedgerDBViewAtPoint = \pt ->
getEnv h $ \env' -> Query.getLedgerDBViewAtPoint env' pt
{ addBlockAsync = getEnv2 h ChainSel.addBlockAsync
, getCurrentChain = getEnvSTM h Query.getCurrentChain
, getLedgerDB = getEnvSTM h Query.getLedgerDB
, getTipBlock = getEnv h Query.getTipBlock
, getTipHeader = getEnv h Query.getTipHeader
, getTipPoint = getEnvSTM h Query.getTipPoint
, getBlockComponent = getEnv2 h Query.getBlockComponent
, getIsFetched = getEnvSTM h Query.getIsFetched
, getIsValid = getEnvSTM h Query.getIsValid
, getMaxSlotNo = getEnvSTM h Query.getMaxSlotNo
, stream = Iterator.stream h
, newFollower = Follower.newFollower h
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
, closeDB = closeDB h
, isOpen = isOpen h
, getLedgerTablesAtFor = getEnv2 h $ LedgerDB.getLedgerTablesAtFor . cdbLedgerDB
, getLedgerDBViewAtPoint = getEnv1 h Query.getLedgerDBViewAtPoint
}
testing = Internal
{ intCopyToImmutableDB = getEnv h Background.copyToImmutableDB
, intGarbageCollect = getEnv1 h Background.garbageCollect
, intUpdateLedgerSnapshots = getEnv h Background.updateLedgerSnapshots
, intAddBlockRunner = getEnv h Background.addBlockRunner
, intKillBgThreads = varKillBgThreads
{ intCopyToImmutableDB = getEnv h Background.copyToImmutableDB
, intGarbageCollect = getEnv1 h Background.garbageCollect
, intTryTakeSnapshot = getEnv h $ \env' ->
void $ LedgerDB.tryTakeSnapshot (cdbLedgerDB env') Nothing maxBound
, intAddBlockRunner = getEnv h Background.addBlockRunner
, intKillBgThreads = varKillBgThreads
}

traceWith tracer $ TraceOpenEvent $ OpenedDB
Expand Down
Expand Up @@ -6,6 +6,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

-- | Background tasks:
--
Expand All @@ -19,7 +20,6 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background (
-- * Copying blocks from the VolatileDB to the ImmutableDB
, copyAndSnapshotRunner
, copyToImmutableDB
, updateLedgerSnapshots
-- * Executing garbage collection
, garbageCollect
-- * Scheduling garbage collections
Expand All @@ -37,7 +37,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background (
) where

import Control.Exception (assert)
import Control.Monad (forM_, forever, void, when)
import Control.Monad (forM_, forever)
import Control.Tracer
import Data.Foldable (toList)
import qualified Data.Map.Strict as Map
Expand All @@ -60,12 +60,8 @@ import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
(addBlockSync)
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import Ouroboros.Consensus.Storage.LedgerDB.API
import qualified Ouroboros.Consensus.Storage.LedgerDB.API as LedgerDB
import Ouroboros.Consensus.Storage.LedgerDB.Config
import Ouroboros.Consensus.Storage.LedgerDB.DbChangelog.Query
(flushableLength)
import Ouroboros.Consensus.Storage.LedgerDB.DbChangelog.Update
(splitForFlushing)
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util ((.:))
import Ouroboros.Consensus.Util.Condense
Expand Down Expand Up @@ -207,13 +203,6 @@ copyToImmutableDB CDB{..} = withCopyLock $ do
Snapshotting
-------------------------------------------------------------------------------}

data SnapCounters = SnapCounters {
-- | When was the last time we made a snapshot
prevSnapshotTime :: !(TimeSinceLast Time)
-- | How many blocks have we processed since the last snapshot
, ntBlocksSinceLastSnap :: !Word64
}

-- | Copy blocks from the VolatileDB to ImmutableDB and take snapshots of the
-- LgrDB
--
Expand Down Expand Up @@ -249,16 +238,11 @@ copyAndSnapshotRunner
-> GcSchedule m
-> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup
-> m Void
copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed =
if onDiskShouldTakeSnapshot NoSnapshotTakenYet replayed then do
updateLedgerSnapshots cdb
now <- getMonotonicTime
loop $ SnapCounters (TimeSinceLast now) 0
else
loop $ SnapCounters NoSnapshotTakenYet replayed
copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed = do
LedgerDB.tryFlush cdbLedgerDB
loop =<< LedgerDB.tryTakeSnapshot cdbLedgerDB Nothing replayed
where
SecurityParam k = configSecurityParam cdbTopLevelConfig
DiskPolicy{..} = diskPolicy cdbLedgerDB

loop :: SnapCounters -> m Void
loop counters = do
Expand All @@ -279,27 +263,12 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed =
-- copied to disk (though not flushed, necessarily).
copyToImmutableDB cdb >>= scheduleGC'

ldb <- atomically $ getCurrent cdbLedgerDB

when (onDiskShouldFlush $ flushableLength ldb)
(withWriteLock cdbLedgerDB $ do
diffs <- atomically $ do
ldb' <- getCurrent cdbLedgerDB
let (toFlush, toKeep) = splitForFlushing FlushAllImmutable ldb'
setCurrent cdbLedgerDB toKeep
pure toFlush
mapM_ (flushIntoBackingStore cdbLedgerDB) diffs
)
LedgerDB.tryFlush cdbLedgerDB

now <- getMonotonicTime
let ntBlocksSinceLastSnap' = ntBlocksSinceLastSnap + numToWrite
elapsed = (\prev -> now `diffTime` prev) <$> prevSnapshotTime

if onDiskShouldTakeSnapshot elapsed ntBlocksSinceLastSnap' then do
updateLedgerSnapshots cdb
loop $ SnapCounters (TimeSinceLast now) 0
else
loop $ counters { ntBlocksSinceLastSnap = ntBlocksSinceLastSnap' }
loop =<< LedgerDB.tryTakeSnapshot cdbLedgerDB ((,now) <$> prevSnapshotTime) ntBlocksSinceLastSnap'

scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' Origin = return ()
Expand All @@ -313,15 +282,6 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed =
}
gcSchedule

-- | Write a snapshot of the LedgerDB to disk and remove old snapshots
-- (typically one) so that only 'onDiskNumSnapshots' snapshots are on disk.
updateLedgerSnapshots ::
Monad m
=> ChainDbEnv m blk -> m ()
updateLedgerSnapshots CDB{cdbLedgerDB} = do
void $ takeSnapshot cdbLedgerDB
void $ trimSnapshots cdbLedgerDB

{-------------------------------------------------------------------------------
Executing garbage collection
-------------------------------------------------------------------------------}
Expand All @@ -345,7 +305,7 @@ garbageCollect :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect CDB{..} slotNo = do
VolatileDB.garbageCollect cdbVolatileDB slotNo
atomically $ do
gcPrevApplied cdbLedgerDB slotNo
LedgerDB.garbageCollect cdbLedgerDB slotNo
modifyTVar cdbInvalid $ fmap $ Map.filter ((>= slotNo) . invalidBlockSlotNo)
traceWith cdbTracer $ TraceGCEvent $ PerformedGC slotNo

Expand Down
Expand Up @@ -111,7 +111,7 @@ initialChainSelection immutableDB volatileDB lgrDB tracer cfg varInvalid
varFutureBlocks futureCheck = do
-- We follow the steps from section "## Initialization" in ChainDB.md

((i :: Anchor blk, succsOf), seLdb) <- LedgerDB.acquireLDBReadView' lgrDB (StaticLeft ()) (do
((i :: Anchor blk, succsOf), seLdb) <- LedgerDB.acquireLDBReadView lgrDB (StaticLeft ()) (do
invalid <- forgetFingerprint <$> readTVar varInvalid
(,) <$> ImmutableDB.getTipAnchor immutableDB
<*> (ignoreInvalidSuc volatileDB invalid <$>
Expand Down Expand Up @@ -458,7 +458,7 @@ chainSelectionForBlock
-> m (Point blk)
chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
((invalid, succsOf, lookupBlockInfo, curChain, tipPoint), seLedger) <-
LedgerDB.acquireLDBReadView' cdbLedgerDB (StaticLeft ())
LedgerDB.acquireLDBReadView cdbLedgerDB (StaticLeft ())
( (,,,,) <$> (forgetFingerprint <$> readTVar cdbInvalid)
<*> VolatileDB.filterByPredecessor cdbVolatileDB
<*> VolatileDB.getBlockInfo cdbVolatileDB
Expand Down
Expand Up @@ -42,7 +42,8 @@ import qualified Ouroboros.Consensus.Storage.LedgerDB.BackingStore as BackingSto
import Ouroboros.Consensus.Storage.LedgerDB.DbChangelog
import Ouroboros.Consensus.Storage.VolatileDB (VolatileDB)
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util (StaticEither (..), eitherToMaybe)
import Ouroboros.Consensus.Util (StaticEither (..), eitherToMaybe,
fromStaticLeft, fromStaticRight)
import Ouroboros.Consensus.Util.IOLike
import qualified Ouroboros.Consensus.Util.ResourceRegistry as RR
import Ouroboros.Consensus.Util.STM (WithFingerprint (..))
Expand Down Expand Up @@ -206,7 +207,7 @@ getLedgerBackingStoreValueHandle :: forall b m blk.
)
)
getLedgerBackingStoreValueHandle CDB{..} rreg seP = do
view <- LedgerDB.acquireLDBReadView cdbLedgerDB seP
((), view) <- LedgerDB.acquireLDBReadView cdbLedgerDB seP (pure ())
case view of
StaticRight (Left p) -> pure (StaticRight (Left p))
StaticLeft (vh, ldb) -> StaticLeft <$> allocateInReg vh ldb
Expand Down Expand Up @@ -241,13 +242,11 @@ getLedgerDBViewAtPoint ::
)
)
getLedgerDBViewAtPoint CDB{..} Nothing = do
s <- LedgerDB.acquireLDBReadView cdbLedgerDB (StaticLeft ())
pure $ case s of
StaticLeft v -> Right v
((), s) <- LedgerDB.acquireLDBReadView cdbLedgerDB (StaticLeft ()) (pure ())
pure $ Right $ fromStaticLeft s
getLedgerDBViewAtPoint CDB{..} (Just p) = do
s <- LedgerDB.acquireLDBReadView cdbLedgerDB (StaticRight p)
pure $ case s of
StaticRight v -> v
((), s) <- LedgerDB.acquireLDBReadView cdbLedgerDB (StaticRight p) (pure ())
pure $ fromStaticRight s

{-------------------------------------------------------------------------------
Unifying interface over the immutable DB and volatile DB, but independent
Expand Down
Expand Up @@ -282,21 +282,21 @@ instance (IOLike m, LedgerSupportsProtocol blk)
-------------------------------------------------------------------------------}

data Internal m blk = Internal
{ intCopyToImmutableDB :: m (WithOrigin SlotNo)
{ intCopyToImmutableDB :: m (WithOrigin SlotNo)
-- ^ Copy the blocks older than @k@ from to the VolatileDB to the
-- ImmutableDB and update the in-memory chain fragment correspondingly.
--
-- The 'SlotNo' of the tip of the ImmutableDB after copying the blocks is
-- returned. This can be used for a garbage collection on the VolatileDB.
, intGarbageCollect :: SlotNo -> m ()
, intGarbageCollect :: SlotNo -> m ()
-- ^ Perform garbage collection for blocks <= the given 'SlotNo'.
, intUpdateLedgerSnapshots :: m ()
, intTryTakeSnapshot :: m ()
-- ^ Write a new LedgerDB snapshot to disk and remove the oldest one(s).
, intAddBlockRunner :: m Void
, intAddBlockRunner :: m Void
-- ^ Start the loop that adds blocks to the ChainDB retrieved from the
-- queue populated by 'ChainDB.addBlock'. Execute this loop in a separate
-- thread.
, intKillBgThreads :: StrictTVar m (m ())
, intKillBgThreads :: StrictTVar m (m ())
-- ^ A handle to kill the background threads.
}

Expand Down

0 comments on commit 4f91f0f

Please sign in to comment.