Skip to content

Commit

Permalink
Address Duncan's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mrBliss committed Jul 11, 2019
1 parent 9b3d791 commit 3d9a78b
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 73 deletions.
26 changes: 26 additions & 0 deletions ouroboros-consensus/ChainDB.md
Expand Up @@ -375,6 +375,32 @@ special case.
have to be careful to use `preferCandidate` rather than `compareCandidates` as
appropriate.

### Concurrency

Multiple blocks might be added concurrently, and since this operation is not
atomic, as it involves writing a block to disk and reading headers from disk,
we explore the possible interleavings.

The three main steps are:

1. Add a block
2. Compute candidates and perform chain selection which might result in a
candidate that is preferred over the current chain.
3. Try to install the candidate as the new chain.

We want that all possible interleavings will result in installing the most
preferable candidate as the new chain. We will reason that this is the case
(for two concurrent threads).

If either of the two computations (step 2) is done with
knowledge of both blocks (after step 1), then the computation with knowledge
of only one block can't possibly construct a candidate that is preferred over
the candidate produced by the other computation.

For this not to be true, both computations would have to be done with only
knowledge of their own block (step 1). This is impossible, as the execution of
step 1 is serialised, so at least one thread must see both blocks.

## Short volatile fragment

Nothing above relies in any way on the length of the current fragment, but
Expand Down
20 changes: 16 additions & 4 deletions ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl.hs
Expand Up @@ -40,9 +40,11 @@ import Control.Monad.Class.MonadTimer
import Control.Tracer

import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (castPoint)
import Ouroboros.Network.Block (blockNo, castPoint)
import Ouroboros.Network.Chain (genesisBlockNo)

import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Protocol.Abstract

import Ouroboros.Storage.ChainDB.API

Expand Down Expand Up @@ -110,11 +112,20 @@ openDBInternal args launchBgTasks = do
(Args.cdbNodeConfig args)
varInvalid

let chain = ChainSel.clChain chainAndLedger
ledger = ChainSel.clLedger chainAndLedger
-- Get the actual BlockNo of the tip of the ImmutableDB. Note that this
-- might not end up being the \"immutable\" block(no), because the current
-- chain computed from the VolatileDB could be longer than @k@.
immDbBlockNo <- maybe genesisBlockNo blockNo <$> ImmDB.getBlockAtTip immDB

let chain = ChainSel.clChain chainAndLedger
ledger = ChainSel.clLedger chainAndLedger
cfg = Args.cdbNodeConfig args
secParam = protocolSecurityParam cfg
immBlockNo = ChainSel.getImmBlockNo secParam chain immDbBlockNo

atomically $ LgrDB.setCurrent lgrDB ledger
varChain <- atomically $ newTVar chain
varImmBlockNo <- atomically $ newTVar immBlockNo
varIterators <- atomically $ newTVar Map.empty
varReaders <- atomically $ newTVar Map.empty
varNextIteratorId <- atomically $ newTVar $ IteratorId 0
Expand All @@ -126,9 +137,10 @@ openDBInternal args launchBgTasks = do
, cdbVolDB = volDB
, cdbLgrDB = lgrDB
, cdbChain = varChain
, cdbImmBlockNo = varImmBlockNo
, cdbIterators = varIterators
, cdbReaders = varReaders
, cdbNodeConfig = Args.cdbNodeConfig args
, cdbNodeConfig = cfg
, cdbInvalid = varInvalid
, cdbNextIteratorId = varNextIteratorId
, cdbNextReaderId = varNextReaderId
Expand Down
Expand Up @@ -26,8 +26,12 @@ module Ouroboros.Storage.ChainDB.Impl.Background
, updateLedgerSnapshotsRunner
) where

import Control.Exception (assert)
import Control.Monad (forM_, forever)
import Data.Maybe (fromMaybe)
import qualified Data.Set as Set
import Data.Typeable (Typeable)
import GHC.Stack (HasCallStack)

import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
Expand Down Expand Up @@ -111,6 +115,7 @@ copyToImmDB
, OuroborosTag (BlockProtocol blk)
, HasHeader blk
, HasHeader (Header blk)
, HasCallStack
)
=> ChainDbEnv m blk
-> m SlotNo
Expand All @@ -120,11 +125,15 @@ copyToImmDB CDB{..} = withCopyLock $ do
let nbToCopy = max 0 (AF.length curChain - fromIntegral k)
toCopy :: [Point blk]
toCopy = map headerPoint
$ take nbToCopy
$ AF.toOldestFirst curChain
$ AF.toOldestFirst
$ AF.takeOldest nbToCopy curChain
return toCopy

if null toCopy
-- This can't happen in practice, as we're only called when the fragment
-- is longer than @k@. However, in the tests, we will be calling this
-- function manually, which means it might be called when there are no
-- blocks to copy.
then trace NoBlocksToCopyToImmDB
else forM_ toCopy $ \pt -> do
let hash = case pointHash pt of
Expand All @@ -133,18 +142,16 @@ copyToImmDB CDB{..} = withCopyLock $ do
GenesisHash -> error "genesis block on current chain"
-- This call is cheap
slotNoAtImmDBTip <- ImmDB.getSlotNoAtTip cdbImmDB
if pointSlot pt > slotNoAtImmDBTip
then do
blk <- VolDB.getKnownBlock cdbVolDB hash
-- We're the only one modifying the ImmutableDB, so the tip cannot
-- have changed since we last checked it.
ImmDB.appendBlock cdbImmDB blk
-- TODO the invariant of 'cdbChain' is shortly violated between
-- these two lines: the tip was updated on the line above, but the
-- anchor point is only updated on the line below.
atomically $ removeFromChain pt
trace $ CopiedBlockToImmDB pt
else error "block to copy was older than the ImmutableDB's tip"
assert (pointSlot pt > slotNoAtImmDBTip) $ return ()
blk <- VolDB.getKnownBlock cdbVolDB hash
-- We're the only one modifying the ImmutableDB, so the tip cannot
-- have changed since we last checked it.
ImmDB.appendBlock cdbImmDB blk
-- TODO the invariant of 'cdbChain' is shortly violated between
-- these two lines: the tip was updated on the line above, but the
-- anchor point is only updated on the line below.
atomically $ removeFromChain pt
trace $ CopiedBlockToImmDB pt

-- Get the /possibly/ updated tip of the ImmDB
ImmDB.getSlotNoAtTip cdbImmDB
Expand All @@ -168,11 +175,15 @@ copyToImmDB CDB{..} = withCopyLock $ do
-- happen if the precondition was satisfied.
_ -> error "header to remove not on the current chain"

withCopyLock :: forall a. m a -> m a
withCopyLock :: forall a. HasCallStack => m a -> m a
withCopyLock = bracket_
(atomically $ takeTMVar cdbCopyLock)
(fmap mustBeUnlocked $ atomically $ tryTakeTMVar cdbCopyLock)
(atomically $ putTMVar cdbCopyLock ())

mustBeUnlocked :: forall b. HasCallStack => Maybe b -> b
mustBeUnlocked = fromMaybe
$ error "copyToImmDB running concurrently with itself"

-- | Watches the current chain for changes. Whenever the chain is longer than
-- @k@, then the headers older than @k@ are copied from the VolatileDB to the
-- ImmutableDB (with 'copyToImmDB'). Afterwards, a garbage collection of the
Expand Down Expand Up @@ -225,7 +236,9 @@ garbageCollect
-> m ()
garbageCollect CDB{..} slotNo = do
VolDB.garbageCollect cdbVolDB slotNo
atomically $ LgrDB.garbageCollectPrevApplied cdbLgrDB slotNo
atomically $ do
LgrDB.garbageCollectPrevApplied cdbLgrDB slotNo
modifyTVar' cdbInvalid $ Set.filter ((<= slotNo) . pointSlot)
traceWith cdbTracer $ TraceGCEvent $ PerformedGC slotNo

{-------------------------------------------------------------------------------
Expand Down

0 comments on commit 3d9a78b

Please sign in to comment.