Skip to content

Commit

Permalink
Merge #2893
Browse files Browse the repository at this point in the history
2893: More explicit resource management in ChainSync client and server r=nfrisby a=nfrisby

Fixes #2892.

Co-authored-by: Nicolas Frisby <nick.frisby@iohk.io>
  • Loading branch information
iohk-bors[bot] and nfrisby committed Jan 26, 2021
2 parents 1efb25d + bcf8af8 commit 4f2e9ca
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 139 deletions.
29 changes: 11 additions & 18 deletions ouroboros-consensus-test/src/Test/Util/LogicalClock.hs
Expand Up @@ -16,7 +16,7 @@ module Test.Util.LogicalClock (
, new
, sufficientTimeFor
-- * Scheduling actions
, onEachTick
, tickWatcher
, onTick
, blockUntilTick
) where
Expand Down Expand Up @@ -83,23 +83,16 @@ tickDelay = 0.5
-------------------------------------------------------------------------------}

-- | Execute action on every clock tick
--
-- Returns a handle to cancel the thread.
onEachTick :: (IOLike m, HasCallStack)
=> ResourceRegistry m
-> LogicalClock m
-> String
-> (Tick -> m ())
-> m (m ())
onEachTick registry clock threadLabel action =
cancelThread <$>
onEachChange
registry
threadLabel
id
Nothing
(getCurrentTick clock)
action
tickWatcher :: LogicalClock m
-> (Tick -> m ())
-> Watcher m Tick Tick
tickWatcher clock action =
Watcher {
wFingerprint = id
, wInitial = Nothing
, wNotify = action
, wReader = getCurrentTick clock
}

-- | Execute action once at the specified tick
onTick :: (IOLike m, HasCallStack)
Expand Down
Expand Up @@ -30,6 +30,7 @@ import Ouroboros.Consensus.Block
import Ouroboros.Consensus.BlockchainTime
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (withWatcher)
import Ouroboros.Consensus.Util.Time

import Test.Util.Orphans.Arbitrary ()
Expand Down Expand Up @@ -293,24 +294,25 @@ testOverrideDelay :: forall m. (IOLike m, MonadTime m, MonadDelay (OverrideDelay
-> Int -- ^ Number of slots to collect
-> OverrideDelay m [SlotNo]
testOverrideDelay systemStart slotLength maxClockRewind numSlots = do
result <- withRegistry $ \registry -> do
time <- simpleBlockchainTime
registry
(defaultSystemTime systemStart nullTracer)
slotLength
maxClockRewind
bracketWithPrivateRegistry
(\registry -> simpleBlockchainTime
registry
(defaultSystemTime systemStart nullTracer)
slotLength
maxClockRewind)
(\_btime -> pure ())
$ \btime -> do
slotsVar <- uncheckedNewTVarM []
cancelCollection <-
onKnownSlotChange registry time "testOverrideDelay" $ \slotNo ->
atomically $ modifyTVar slotsVar (slotNo :)
-- Wait to collect the required number of slots
slots <- atomically $ do
slots <- readTVar slotsVar
when (length slots < numSlots) $ retry
return slots
cancelCollection
return $ reverse slots
return result
withWatcher
"testOverrideDelay"
( knownSlotWatcher btime $ \slotNo -> do
atomically $ modifyTVar slotsVar (slotNo :)
) $ do
-- Wait to collect the required number of slots
atomically $ do
slots <- readTVar slotsVar
when (length slots < numSlots) $ retry
return $ reverse slots

{-------------------------------------------------------------------------------
Test-programmable time
Expand Down
Expand Up @@ -62,7 +62,7 @@ import Ouroboros.Consensus.Util.Condense
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (Fingerprint (..),
WithFingerprint (..))
WithFingerprint (..), forkLinkedWatcher)

import Test.Util.LogicalClock (LogicalClock, NumTicks (..), Tick (..))
import qualified Test.Util.LogicalClock as LogicalClock
Expand Down Expand Up @@ -305,7 +305,12 @@ runChainSync securityParam (ClientUpdates clientUpdates)

-- Schedule updates of the client and server chains
varLastUpdate <- uncheckedNewTVarM 0
void $ LogicalClock.onEachTick registry clock "scheduled updates" $ \tick -> do
let forkLinkedTickWatcher :: (Tick -> m ()) -> m ()
forkLinkedTickWatcher =
void
. forkLinkedWatcher registry "scheduled updates"
. LogicalClock.tickWatcher clock
forkLinkedTickWatcher $ \tick -> do
-- Stop updating the client and server chains when the chain sync client
-- has thrown an exception or has gracefully terminated, so that at the
-- end, we can read the chains in the states they were in when the
Expand Down
32 changes: 15 additions & 17 deletions ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs
Expand Up @@ -9,17 +9,15 @@
module Ouroboros.Consensus.BlockchainTime.API (
BlockchainTime(..)
, CurrentSlot(..)
, onKnownSlotChange
, knownSlotWatcher
) where

import GHC.Generics (Generic)
import GHC.Stack
import NoThunks.Class (OnlyCheckWhnfNamed (..))

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (onEachChange)
import Ouroboros.Consensus.Util.STM (Watcher (..))

{-------------------------------------------------------------------------------
API
Expand Down Expand Up @@ -54,22 +52,22 @@ data CurrentSlot =
Derived functionality
-------------------------------------------------------------------------------}

-- | Spawn a thread to run an action each time the slot changes
-- | Watches for changes in the current slot
--
-- The action will not be called until the current slot becomes known
-- (if the tip of our ledger is too far away from the current wallclock time,
-- we may not know what the current 'SlotId' is).
--
-- Returns a handle to kill the thread.
onKnownSlotChange :: forall m. (IOLike m, HasCallStack)
=> ResourceRegistry m
-> BlockchainTime m
-> String -- ^ Label for the thread
-> (SlotNo -> m ()) -- ^ Action to execute
-> m (m ())
onKnownSlotChange registry btime label =
fmap cancelThread
. onEachChange registry label id Nothing getCurrentSlot'
-- we may not know what the current 'SlotNo' is).
knownSlotWatcher :: forall m. IOLike m
=> BlockchainTime m
-> (SlotNo -> m ()) -- ^ Action to execute
-> Watcher m SlotNo SlotNo
knownSlotWatcher btime notify =
Watcher {
wFingerprint = id
, wInitial = Nothing
, wNotify = notify
, wReader = getCurrentSlot'
}
where
getCurrentSlot' :: STM m SlotNo
getCurrentSlot' = do
Expand Down
14 changes: 8 additions & 6 deletions ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs
Expand Up @@ -44,7 +44,7 @@ import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
import Ouroboros.Consensus.Util (repeatedly)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (onEachChange)
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)

{-------------------------------------------------------------------------------
Top-level API
Expand Down Expand Up @@ -280,13 +280,15 @@ forkSyncStateOnTipPointChange :: forall m blk. (
-> MempoolEnv m blk
-> m ()
forkSyncStateOnTipPointChange registry menv =
void $ onEachChange
void $ forkLinkedWatcher
registry
"Mempool.syncStateOnTipPointChange"
id
Nothing
getCurrentTip
action
Watcher {
wFingerprint = id
, wInitial = Nothing
, wNotify = action
, wReader = getCurrentTip
}
where
action :: Point blk -> m ()
action _tipPoint = void $ implSyncWithLedger menv
Expand Down
Expand Up @@ -73,9 +73,8 @@ import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Util
import Ouroboros.Consensus.Util.Assert (assertWithMsg)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (WithFingerprint (..),
onEachChange)
import Ouroboros.Consensus.Util.STM (Fingerprint, Watcher (..),
WithFingerprint (..), withWatcher)

import Ouroboros.Consensus.Storage.ChainDB (ChainDB,
InvalidBlockReason)
Expand Down Expand Up @@ -129,23 +128,27 @@ bracketChainSyncClient
-> m a
bracketChainSyncClient tracer ChainDbView { getIsInvalidBlock } varCandidates
peer body =
withRegistry $ \registry ->
bracket register unregister $ \varCandidate -> do
rejectInvalidBlocks
tracer
registry
getIsInvalidBlock
(readTVar varCandidate)
body varCandidate
bracket newCandidateVar releaseCandidateVar
$ \varCandidate ->
withWatcher
"ChainSync.Client.rejectInvalidBlocks"
(invalidBlockWatcher varCandidate)
$ body varCandidate
where
register = do
newCandidateVar = do
varCandidate <- newTVarIO $ AF.Empty AF.AnchorGenesis
atomically $ modifyTVar varCandidates $ Map.insert peer varCandidate
return varCandidate

unregister _ = do
releaseCandidateVar _ = do
atomically $ modifyTVar varCandidates $ Map.delete peer

invalidBlockWatcher varCandidate =
invalidBlockRejector
tracer
getIsInvalidBlock
(readTVar varCandidate)

-- Our task: after connecting to an upstream node, try to maintain an
-- up-to-date header-only fragment representing their chain. We maintain
-- such candidate chains in a map with upstream nodes as keys.
Expand Down Expand Up @@ -960,32 +963,30 @@ attemptRollback rollBackPoint (frag, state) = do
-- node could have rolled back such that its candidate chain no longer
-- contains the invalid block, in which case we do not disconnect from it.
--
-- This function spawns a background thread using the given 'ResourceRegistry'.
--
-- The cost of this check is \( O(cand * check) \) where /cand/ is the size of
-- the candidate fragment and /check/ is the cost of checking whether a block
-- is invalid (typically \( O(\log(invalid)) \) where /invalid/ is the number
-- of invalid blocks).
rejectInvalidBlocks
invalidBlockRejector
:: forall m blk.
( IOLike m
, BlockSupportsProtocol blk
, LedgerSupportsProtocol blk
)
=> Tracer m (TraceChainSyncClientEvent blk)
-> ResourceRegistry m
-> STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk)))
-- ^ Get the invalid block checker
-> STM m (AnchoredFragment (Header blk))
-> m ()
rejectInvalidBlocks tracer registry getIsInvalidBlock getCandidate =
void $ onEachChange
registry
"ChainSync.Client.rejectInvalidBlocks"
getFingerprint
Nothing
getIsInvalidBlock
(checkInvalid . forgetFingerprint)
-> Watcher m
(WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk)))
Fingerprint
invalidBlockRejector tracer getIsInvalidBlock getCandidate =
Watcher {
wFingerprint = getFingerprint
, wInitial = Nothing
, wNotify = checkInvalid . forgetFingerprint
, wReader = getIsInvalidBlock
}
where
checkInvalid :: (HeaderHash blk -> Maybe (InvalidBlockReason blk)) -> m ()
checkInvalid isInvalidBlock = do
Expand Down
Expand Up @@ -359,17 +359,16 @@ mkApps kernel Tracers {..} Codecs {..} Handlers {..} =
-> m ((), Maybe bCS)
aChainSyncServer them channel = do
labelThisThread "LocalChainSyncServer"
withRegistry $ \registry ->
bracket
(chainSyncBlockServerFollower (getChainDB kernel) registry)
ChainDB.followerClose
(\flr -> runPeer
bracketWithPrivateRegistry
(chainSyncBlockServerFollower (getChainDB kernel))
ChainDB.followerClose
$ \flr ->
runPeer
(contramap (TraceLabelPeer them) tChainSyncTracer)
cChainSyncCodec
channel
$ chainSyncServerPeer
$ hChainSyncServer flr
)

aTxSubmissionServer
:: localPeer
Expand Down
Expand Up @@ -487,20 +487,19 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
-> m ((), Maybe bCS)
aChainSyncServer version them channel = do
labelThisThread "ChainSyncServer"
withRegistry $ \registry -> do
chainSyncTimeout <- genChainSyncTimeout
bracket
(chainSyncHeaderServerFollower (getChainDB kernel) registry)
ChainDB.followerClose
(\flr -> runPeerWithLimits
chainSyncTimeout <- genChainSyncTimeout
bracketWithPrivateRegistry
(chainSyncHeaderServerFollower (getChainDB kernel))
ChainDB.followerClose
$ \flr ->
runPeerWithLimits
(contramap (TraceLabelPeer them) tChainSyncSerialisedTracer)
cChainSyncCodecSerialised
(byteLimitsChainSync (const 0)) -- TODO: Real Bytelimits, see #1727
(timeLimitsChainSync chainSyncTimeout)
channel
$ chainSyncServerPeer
$ hChainSyncServer flr version
)

aBlockFetchClient
:: NodeToNodeVersion
Expand Down
6 changes: 4 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Expand Up @@ -349,8 +349,10 @@ forkBlockForging
-> BlockForging m blk
-> m ()
forkBlockForging maxTxCapacityOverride IS{..} blockForging =
void $ onKnownSlotChange registry btime threadLabel $
withEarlyExit_ . go
void
$ forkLinkedWatcher registry threadLabel
$ knownSlotWatcher btime
$ withEarlyExit_ . go
where
threadLabel :: String
threadLabel =
Expand Down

0 comments on commit 4f2e9ca

Please sign in to comment.