Skip to content

Commit

Permalink
Simplify mempool interface with the ledger
Browse files Browse the repository at this point in the history
Arising from the discussion on the mempool tests, it followed that the
mempool could probably become less aware of the LedgerDB, the Changelog
and the BackingStore, and indeed the mempool can just ask the LedgerDB
for UTxO values. This PR implements that and some side changes. In
particular this is a comprehensive list of the changes:

- Some general cleanup on comments

- Remove the `Proxy` from `ledgerTipPoint`

- Remove the `idx` type parameter from the `Mempool` and
`MempoolSnapshot`

- Superflous indirection datatypes in `Mempool.Impl.Pure` are
gone (`SyncWithLedger`, `RemoveTxs`,...)

- Pure functions that revalidate got the revalidation factored out into
`revalidateTxsFor` and the arguments re-sorted

- Mempool now asks the ledgerdb to forward values. In particular

    - `LedgerInterface.getCurrentLedgerAndChangelog` is renamed to
      `LedgerInterface.getCurrentLedger`

    - `MempoolChangelog` is gone

    - new function `LedgerInterface.getLedgerTablesAtFor` which calls
      `LgrDB.getLedgerTablesAtFor`

    - `LedgerInterface.getBackingStore` is gone

    - Depending on the value returned from `getLedgerTablesAtFor` (of type
      `TablesForKeys`), new actions are performed. In particular if the
      anchor moved we read again, and if the ledger state is not found we
      recurse the whole action

    - The read lock is gone on the mempool side.

- `Mempool.API.getSnapshotFor` now might fail. Acquiring the lock
prevents it from failing.

- `isTip` is now a `Point` which still contains the `ChainHash` it held
before, but now it can be used to query for a point on the ledger
database when forwarding
  • Loading branch information
jasagredo committed Dec 2, 2022
1 parent 87bede7 commit 321a24f
Show file tree
Hide file tree
Showing 23 changed files with 478 additions and 575 deletions.
Expand Up @@ -223,7 +223,7 @@ protocolInfoDualByron abstractGenesis@ByronSpecGenesis{..} params credss =
instance NodeInitStorage DualByronBlock where
-- Just like Byron, we need to start with an EBB
nodeInitChainDB cfg InitChainDB { getCurrentLedger, addBlock } = do
tip <- ledgerTipPoint (Proxy @DualByronBlock) <$> getCurrentLedger
tip <- ledgerTipPoint <$> getCurrentLedger
case tip of
BlockPoint {} -> return ()
GenesisPoint -> addBlock genesisEBB
Expand Down
Expand Up @@ -273,7 +273,7 @@ instance NodeInitStorage ByronBlock where
-- If the current chain is empty, produce a genesis EBB and add it to the
-- ChainDB. Only an EBB can have Genesis (= empty chain) as its predecessor.
nodeInitChainDB cfg InitChainDB { getCurrentLedger, addBlock } = do
tip <- ledgerTipPoint (Proxy @ByronBlock) <$> getCurrentLedger
tip <- ledgerTipPoint <$> getCurrentLedger
case tip of
BlockPoint {} -> return ()
GenesisPoint -> addBlock genesisEBB
Expand Down
Expand Up @@ -120,8 +120,8 @@ runForge epochSize_ nextSlot opts chainDB blockForging cfg = do
unticked <- do
mExtLedger <- lift $ atomically $ ChainDB.getPastLedger chainDB bcPrevPoint
case mExtLedger of
Just (l, _) -> return l
Nothing -> exitEarly' "no ledger state"
Just l -> return l
Nothing -> exitEarly' "no ledger state"

ledgerView <-
case runExcept $ forecastFor
Expand Down
Expand Up @@ -737,7 +737,7 @@ translateLedgerStateByronToShelleyWrapper =
$ ShelleyLedgerState {
shelleyLedgerTip =
translatePointByronToShelley
(ledgerTipPoint (Proxy @ByronBlock) ledgerByron)
(ledgerTipPoint ledgerByron)
(byronLedgerTipBlockNo ledgerByron)
, shelleyLedgerState =
SL.translateToShelleyLedgerState
Expand Down
23 changes: 12 additions & 11 deletions ouroboros-consensus-diffusion/src/Ouroboros/Consensus/NodeKernel.hs
Expand Up @@ -24,8 +24,6 @@ module Ouroboros.Consensus.NodeKernel (
, initNodeKernel
) where



import Control.DeepSeq (force)
import Control.Monad
import Control.Monad.Except
Expand Down Expand Up @@ -92,7 +90,7 @@ data NodeKernel m remotePeer localPeer blk = NodeKernel {
getChainDB :: ChainDB m blk

-- | The node's mempool
, getMempool :: Mempool m blk TicketNo
, getMempool :: Mempool m blk

-- | The node's top-level static configuration
, getTopLevelConfig :: TopLevelConfig blk
Expand Down Expand Up @@ -184,7 +182,7 @@ data InternalState m remotePeer localPeer blk = IS {
, blockFetchInterface :: BlockFetchConsensusInterface remotePeer (Header blk) blk m
, fetchClientRegistry :: FetchClientRegistry remotePeer (Header blk) blk m
, varCandidates :: StrictTVar m (Map remotePeer (StrictTVar m (AnchoredFragment (Header blk))))
, mempool :: Mempool m blk TicketNo
, mempool :: Mempool m blk
}

initInternalState
Expand Down Expand Up @@ -303,7 +301,7 @@ forkBlockForging IS{..} blockForging =
-- produce a block that fits onto the ledger we got above; if the
-- ledger in the meantime changes, the block we produce here may or
-- may not be adopted, but it won't be invalid.
(unticked, dbch) <- do
unticked <- do
mExtLedger <- lift $ atomically $ ChainDB.getPastLedger chainDB bcPrevPoint
case mExtLedger of
Just val -> return val
Expand Down Expand Up @@ -396,11 +394,14 @@ forkBlockForging IS{..} blockForging =

mempoolSnapshot <- lift $ getSnapshotFor
mempool
(castPoint $ getTip unticked)
currentSlot
tickedLedgerState
dbch

pure ( mempoolSnapshot
let e = error "Mempool snapshot failed in forging loop. RAWLock violation"

pure ( -- OK because we are holding the read lock
maybe e id mempoolSnapshot
, bcPrevPoint
, mempoolHash
, mempoolSlotNo
Expand Down Expand Up @@ -437,7 +438,7 @@ forkBlockForging IS{..} blockForging =

trace $ TraceForgedBlock
currentSlot
(ledgerTipPoint (Proxy @blk) (ledgerState unticked))
(ledgerTipPoint (ledgerState unticked))
newBlock
(snapshotMempoolSize mempoolSnapshot)

Expand Down Expand Up @@ -586,15 +587,15 @@ getMempoolReader
, IOLike m
, HasTxId (GenTx blk)
)
=> Mempool m blk TicketNo
=> Mempool m blk
-> TxSubmissionMempoolReader (GenTxId blk) (Validated (GenTx blk)) TicketNo m
getMempoolReader mempool = MempoolReader.TxSubmissionMempoolReader
{ mempoolZeroIdx = zeroTicketNo
, mempoolGetSnapshot = convertSnapshot <$> getSnapshot mempool
}
where
convertSnapshot
:: MempoolSnapshot blk TicketNo
:: MempoolSnapshot blk
-> MempoolReader.MempoolSnapshot (GenTxId blk) (Validated (GenTx blk)) TicketNo
convertSnapshot MempoolSnapshot { snapshotTxsAfter, snapshotLookupTx,
snapshotHasTx } =
Expand All @@ -612,7 +613,7 @@ getMempoolWriter
, IOLike m
, HasTxId (GenTx blk)
)
=> Mempool m blk TicketNo
=> Mempool m blk
-> TxSubmissionMempoolWriter (GenTxId blk) (GenTx blk) TicketNo m
getMempoolWriter mempool = Inbound.TxSubmissionMempoolWriter
{ Inbound.txId = txId
Expand Down
Expand Up @@ -546,7 +546,7 @@ instance SmallQuery (BlockQuery (SimpleBlock c ext)) where
instance MockProtocolSpecific c ext => QueryLedger (SimpleBlock c ext) where
answerBlockQuery _cfg QueryLedgerTip =
castPoint
. ledgerTipPoint (Proxy @(SimpleBlock c ext))
. ledgerTipPoint
. ledgerState

instance EqQuery (BlockQuery (SimpleBlock c ext)) where
Expand Down
10 changes: 3 additions & 7 deletions ouroboros-consensus-test/src/Test/ThreadNet/Network.hs
Expand Up @@ -112,15 +112,12 @@ import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB
import qualified Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment as InvalidBlockPunishment
import Ouroboros.Consensus.Storage.ChainDB.Impl (ChainDbArgs (..))
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import qualified Ouroboros.Consensus.Storage.ImmutableDB.Impl.Index as Index
import qualified Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy as LgrDB
import Ouroboros.Consensus.Storage.LedgerDB.HD.BackingStore
(RangeQuery (RangeQuery))
import Ouroboros.Consensus.Storage.LedgerDB.InMemory (LedgerDB)
import Ouroboros.Consensus.Storage.LedgerDB.OnDisk
(BackingStoreSelector (..), LedgerBackingStoreValueHandle,
LedgerDB', mkDiskLedgerView)
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util.Enclose (pattern FallingEdge)

import Test.ThreadNet.TxGen
Expand Down Expand Up @@ -602,7 +599,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
-> (SlotNo -> STM m ())
-> LedgerConfig blk
-> STM m (LedgerState blk EmptyMK)
-> Mempool m blk TicketNo
-> Mempool m blk
-> [GenTx blk]
-- ^ valid transactions the node should immediately propagate
-> m ()
Expand Down Expand Up @@ -637,8 +634,7 @@ runThreadNetwork systemTime ThreadNetworkArgs

-- a new ledger state might render a crucial transaction valid
ldgrChanged = do
let prj = ledgerTipPoint (Proxy @blk)
(ledger', _) <- atomically $ blockUntilChanged prj (prj ledger) getLdgr
(ledger', _) <- atomically $ blockUntilChanged ledgerTipPoint (ledgerTipPoint ledger) getLdgr
pure (slot, ledger', mempFp)

-- wake up when any of those change
Expand Down Expand Up @@ -666,7 +662,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
-> Seed
-> DiskLedgerView m (ExtLedgerState blk)
-- ^ How to get the current ledger state
-> Mempool m blk TicketNo
-> Mempool m blk
-> m ()
forkTxProducer coreNodeId registry clock cfg nodeSeed dlv mempool =
void $ OracularClock.forkEachSlot registry clock "txProducer" $ \curSlotNo -> do
Expand Down
7 changes: 3 additions & 4 deletions ouroboros-consensus-test/src/Test/Util/Orphans/NFData.hs
Expand Up @@ -15,7 +15,6 @@ import Data.Foldable
import Data.Sequence (Seq)
import Data.Sequence.NonEmpty (NESeq)

import Data.FingerTree.RootMeasured.Strict as RMFT
import qualified Data.FingerTree.Strict as FT
import Data.Map.Diff.Strict (Diff (..), DiffEntry (..),
DiffHistory (..), Keys (..), NEDiffHistory (..),
Expand All @@ -24,9 +23,9 @@ import Ouroboros.Consensus.Storage.LedgerDB.HD (SeqUtxoDiff (..),
SudElement (..), SudMeasure (..), UtxoDiff (..),
UtxoEntryDiff (..), UtxoEntryDiffState (..), UtxoKeys (..),
UtxoValues (..))
import Ouroboros.Consensus.Storage.LedgerDB.HD.DiffSeq (DiffSeq (..),
Element (..), InternalMeasure (..), Length (..),
RootMeasure (..), SlotNoLB (..), SlotNoUB (..))
import Ouroboros.Consensus.Storage.LedgerDB.HD.DiffSeq (Element (..),
InternalMeasure (..), Length (..), RootMeasure (..),
SlotNoLB (..), SlotNoUB (..))

{------------------------------------------------------------------------------
StrictFingerTree
Expand Down
15 changes: 5 additions & 10 deletions ouroboros-consensus-test/test-consensus/Test/Consensus/Mempool.hs
Expand Up @@ -51,9 +51,6 @@ import Ouroboros.Consensus.Mempool.TxSeq as TxSeq
import Ouroboros.Consensus.Mock.Ledger hiding (TxId)
import Ouroboros.Consensus.Node.ProtocolInfo (NumCoreNodes (..))
import Ouroboros.Consensus.Protocol.BFT
import Ouroboros.Consensus.Storage.LedgerDB.HD.BackingStore
import Ouroboros.Consensus.Storage.LedgerDB.OnDisk
(LedgerBackingStore (LedgerBackingStore))
import Ouroboros.Consensus.Util (repeatedly, repeatedlyM,
safeMaximumOn, (.:))
import Ouroboros.Consensus.Util.Condense (condense)
Expand Down Expand Up @@ -736,7 +733,7 @@ data TestMempool m = TestMempool
{ -- | A mempool with random contents.
--
-- Starts out synced with the ledger.
mempool :: Mempool m TestBlock TicketNo
mempool :: Mempool m TestBlock

-- | When called, obtains all events traced after opening the mempool at
-- the given state from oldest-to-newest.
Expand Down Expand Up @@ -785,11 +782,9 @@ withTestMempool setup@TestSetup {..} prop =

-- Set up the LedgerInterface
varCurrentLedgerState <- uncheckedNewTVarM testLedgerState
trivialLedgerBackingStore <- trivialBackingStore polyEmptyLedgerTables
let ledgerInterface = LedgerInterface
{ getCurrentLedgerAndChangelog = (, MempoolChangelog Origin polyEmptyLedgerTables) <$> readTVar varCurrentLedgerState
, getBackingStore = pure $ LedgerBackingStore trivialLedgerBackingStore
, withReadLock = id
{ getCurrentLedgerState = readTVar varCurrentLedgerState
, getLedgerTablesAtFor = \_ _ -> pure $ Right polyEmptyLedgerTables
}

-- Set up the Tracer
Expand Down Expand Up @@ -852,7 +847,7 @@ withTestMempool setup@TestSetup {..} prop =
-- | Check whether the transactions in the 'MempoolSnapshot' are valid
-- w.r.t. the current ledger state.
checkMempoolValidity :: LedgerState TestBlock EmptyMK
-> MempoolSnapshot TestBlock TicketNo
-> MempoolSnapshot TestBlock
-> Property
checkMempoolValidity ledgerState
MempoolSnapshot {
Expand Down Expand Up @@ -1173,7 +1168,7 @@ executeAction testMempool action = case action of
return $ mapMaybe extractor evs

currentTicketAssignment :: IOLike m
=> Mempool m TestBlock TicketNo -> m TicketAssignment
=> Mempool m TestBlock -> m TicketAssignment
currentTicketAssignment Mempool { syncWithLedger } = do
MempoolSnapshot { snapshotTxs } <- syncWithLedger
return $ Map.fromList
Expand Down
12 changes: 4 additions & 8 deletions ouroboros-consensus/src/Ouroboros/Consensus/Ledger/Abstract.hs
Expand Up @@ -36,7 +36,6 @@ module Ouroboros.Consensus.Ledger.Abstract (

import Control.Monad.Except
import Data.Kind (Type)
import Data.Proxy
import GHC.Stack (HasCallStack)

import Ouroboros.Consensus.Block.Abstract
Expand Down Expand Up @@ -269,20 +268,17 @@ refoldLedger cfg = repeatedly (\blk state -> applyLedgerTablesDiffs state $ tick
Short-hand
-------------------------------------------------------------------------------}

-- | Wrapper around 'ledgerTipPoint' that uses a proxy to fix @blk@
--
-- This is occassionally useful to guide type inference
ledgerTipPoint ::
UpdateLedger blk
=> Proxy blk -> LedgerState blk mk -> Point blk
ledgerTipPoint _ = castPoint . getTip
=> LedgerState blk mk -> Point blk
ledgerTipPoint = castPoint . getTip

ledgerTipHash ::
forall blk mk. UpdateLedger blk
=> LedgerState blk mk -> ChainHash blk
ledgerTipHash = pointHash . (ledgerTipPoint (Proxy @blk))
ledgerTipHash = pointHash . ledgerTipPoint

ledgerTipSlot ::
forall blk mk. UpdateLedger blk
=> LedgerState blk mk -> WithOrigin SlotNo
ledgerTipSlot = pointSlot . (ledgerTipPoint (Proxy @blk))
ledgerTipSlot = pointSlot . ledgerTipPoint
2 changes: 0 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Consensus/Mempool.hs
Expand Up @@ -3,6 +3,4 @@ module Ouroboros.Consensus.Mempool (module X) where

import Ouroboros.Consensus.Mempool.API as X
import Ouroboros.Consensus.Mempool.Impl as X
import Ouroboros.Consensus.Mempool.Impl.Types as X
(MempoolChangelog (..))
import Ouroboros.Consensus.Mempool.TxSeq as X (TicketNo)

0 comments on commit 321a24f

Please sign in to comment.