Skip to content

Commit

Permalink
Adjust number of active peers based on fetchmode
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu committed Apr 6, 2021
1 parent 7de018d commit 77a9ece
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 30 deletions.
5 changes: 4 additions & 1 deletion ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Expand Up @@ -299,6 +299,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
ntnApps
ntcApps
nodeKernel
btime

llrnRunDataDiffusion registry diffusionApplications
where
Expand Down Expand Up @@ -347,11 +348,12 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
-> NTC.Apps m (ConnectionId addrNTC) ByteString ByteString ByteString ()
)
-> NodeKernel m (ConnectionId addrNTN) (ConnectionId addrNTC) blk
-> BlockchainTime m
-> DiffusionApplications
addrNTN addrNTC
versionDataNTN versionDataNTC
m
mkDiffusionApplications miniProtocolParams ntnApps ntcApps kernel =
mkDiffusionApplications miniProtocolParams ntnApps ntcApps kernel btime =
DiffusionApplications {
daApplicationInitiatorMode = combineVersions
[ simpleSingletonVersions
Expand All @@ -378,6 +380,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
, daRethrowPolicy = consensusRethrowPolicy (Proxy @blk)
, daLocalRethrowPolicy = mempty
, daLedgerPeersCtx = LedgerPeersConsensusInterface (getPeersFromCurrentLedgerAfterSlot kernel)
, daBlockFetchMode = getFetchMode (getChainDB kernel) btime
}

-- | Did the ChainDB already have existing clean-shutdown marker on disk?
Expand Down
47 changes: 29 additions & 18 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Expand Up @@ -22,6 +22,7 @@ module Ouroboros.Consensus.NodeKernel (
, getMempoolWriter
, getPeersFromCurrentLedger
, getPeersFromCurrentLedgerAfterSlot
, getFetchMode
) where


Expand Down Expand Up @@ -321,24 +322,7 @@ initBlockFetchConsensusInterface cfg chainDB getCandidates blockFetchSize btime
readCurrentChain = ChainDB.getCurrentChain chainDB

readFetchMode :: STM m FetchMode
readFetchMode = do
mCurSlot <- getCurrentSlot btime
case mCurSlot of
-- The current chain's tip far away from "now", so use bulk sync mode.
CurrentSlotUnknown -> return FetchModeBulkSync
CurrentSlot curSlot -> do
curChainSlot <- AF.headSlot <$> ChainDB.getCurrentChain chainDB
let slotsBehind = case curChainSlot of
-- There's nothing in the chain. If the current slot is 0, then
-- we're 1 slot behind.
Origin -> unSlotNo curSlot + 1
NotOrigin slot -> unSlotNo curSlot - unSlotNo slot
maxSlotsBehind = 1000
return $ if slotsBehind < maxSlotsBehind
-- When the current chain is near to "now", use deadline mode,
-- when it is far away, use bulk sync mode.
then FetchModeDeadline
else FetchModeBulkSync
readFetchMode = getFetchMode chainDB btime

readFetchedBlocks :: STM m (Point blk -> Bool)
readFetchedBlocks = ChainDB.getIsFetched chainDB
Expand Down Expand Up @@ -794,3 +778,30 @@ getPeersFromCurrentLedgerAfterSlot kernel slotNo =
case ledgerTipSlot st of
Origin -> False
NotOrigin tip -> tip > slotNo

getFetchMode
:: forall m blk.
( IOLike m
, BlockSupportsProtocol blk
)
=> ChainDB m blk
-> BlockchainTime m
-> STM m FetchMode
getFetchMode chainDB btime = do
mCurSlot <- getCurrentSlot btime
case mCurSlot of
-- The current chain's tip far away from "now", so use bulk sync mode.
CurrentSlotUnknown -> return FetchModeBulkSync
CurrentSlot curSlot -> do
curChainSlot <- AF.headSlot <$> ChainDB.getCurrentChain chainDB
let slotsBehind = case curChainSlot of
-- There's nothing in the chain. If the current slot is 0, then
-- we're 1 slot behind.
Origin -> unSlotNo curSlot + 1
NotOrigin slot -> unSlotNo curSlot - unSlotNo slot
maxSlotsBehind = 1000
return $ if slotsBehind < maxSlotsBehind
-- When the current chain is near to "now", use deadline mode,
-- when it is far away, use bulk sync mode.
then FetchModeDeadline
else FetchModeBulkSync
8 changes: 8 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Expand Up @@ -74,6 +74,7 @@ import Ouroboros.Network.Snocket ( FileDescriptor
)
import qualified Ouroboros.Network.Snocket as Snocket

import Ouroboros.Network.BlockFetch
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Version
import Ouroboros.Network.Protocol.Handshake.Codec
Expand Down Expand Up @@ -379,6 +380,8 @@ data DiffusionApplications ntnAddr ntcAddr ntnVersionData ntcVersionData m =

, daLedgerPeersCtx :: LedgerPeersConsensusInterface m
-- ^ Interface used to get peers from the current ledger.
, daBlockFetchMode :: STM m FetchMode
-- ^ Used by churn-governor
}


Expand Down Expand Up @@ -575,6 +578,7 @@ runDataDiffusion tracers
, daMiniProtocolParameters
, daLocalRethrowPolicy
, daLedgerPeersCtx
, daBlockFetchMode
} =
-- We run two services: for /node-to-node/ and /node-to-client/. The
-- naming convention is that we use /local/ prefix for /node-to-client/
Expand Down Expand Up @@ -842,7 +846,9 @@ runDataDiffusion tracers
$ \governorThread ->
Async.withAsync
(Governor.peerChurnGovernor
dtTracePeerSelectionTracer
churnRng
daBlockFetchMode
daPeerSelectionTargets
peerSelectionTargetsVar)
$ \churnGovernorThread ->
Expand Down Expand Up @@ -978,7 +984,9 @@ runDataDiffusion tracers
$ \serverThread ->
Async.withAsync
(Governor.peerChurnGovernor
dtTracePeerSelectionTracer
churnRng
daBlockFetchMode
daPeerSelectionTargets
peerSelectionTargetsVar)
$ \churnGovernorThread ->
Expand Down
56 changes: 45 additions & 11 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs
Expand Up @@ -56,6 +56,7 @@ import qualified Ouroboros.Network.PeerSelection.Governor.KnownPeers as Kn
import qualified Ouroboros.Network.PeerSelection.Governor.Monitor as Monitor
import qualified Ouroboros.Network.PeerSelection.Governor.RootPeers as RootPeers
import Ouroboros.Network.PeerSelection.Governor.Types
import Ouroboros.Network.BlockFetch (FetchMode (..))


{- $overview
Expand Down Expand Up @@ -563,27 +564,42 @@ $peer-churn-governor

-- |
--
peerChurnGovernor :: forall m.
peerChurnGovernor :: forall m peeraddr.
( MonadSTM m
, MonadMonotonicTime m
, MonadDelay m
)
=> StdGen
=> Tracer m (TracePeerSelection peeraddr)
-> StdGen
-> STM m FetchMode
-> PeerSelectionTargets
-> StrictTVar m PeerSelectionTargets
-> m Void
peerChurnGovernor inRng base peerSelectionVar = do
peerChurnGovernor tracer inRng getFetchMode base peerSelectionVar = do
-- Wait a while so that not only the closest peers have had the time
-- to become warm.
startTs0 <- getMonotonicTime
threadDelay 3
atomically $ modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers = targetNumberOfActivePeers base
})
atomically updateActivePeers
endTs0 <- getMonotonicTime
fuzzyDelay inRng (Time $ diffTime endTs0 startTs0) >>= go

where

updateActivePeers :: STM m ()
updateActivePeers = do
mode <- getFetchMode
modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers =
case mode of
FetchModeDeadline ->
targetNumberOfActivePeers base
FetchModeBulkSync ->
min 2 (targetNumberOfActivePeers base)

})

go :: StdGen -> m Void
go rng = do
startTs <- getMonotonicTime

Expand All @@ -597,9 +613,7 @@ peerChurnGovernor inRng base peerSelectionVar = do

-- Pick new active peer(s) based on the best performing established
-- peers.
atomically $ modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers = targetNumberOfActivePeers base
})
atomically $ updateActivePeers

-- Give the promotion process time to start
threadDelay 1
Expand Down Expand Up @@ -628,15 +642,35 @@ peerChurnGovernor inRng base peerSelectionVar = do
-- Randomly delay between churnInterval and churnInterval + maxFuzz seconds.
fuzzyDelay :: StdGen -> Time -> m StdGen
fuzzyDelay rng execTime = do
let (fuzz, rng') = randomR (0, 600 :: Double) rng
threadDelay $ (realToFrac fuzz) + (diffTime churnInterval execTime)
mode <- atomically getFetchMode
case mode of
FetchModeDeadline -> longDelay rng execTime
FetchModeBulkSync -> shortDelay rng execTime

fuzzyDelay' :: Time -> Double -> StdGen -> Time -> m StdGen
fuzzyDelay' baseDelay maxFuzz rng execTime = do
let (fuzz, rng') = randomR (0, maxFuzz :: Double) rng
!delay = (realToFrac fuzz) + (diffTime baseDelay execTime)
traceWith tracer $ TraceChurnWait delay
threadDelay delay
return rng'


longDelay :: StdGen -> Time -> m StdGen
longDelay = fuzzyDelay' churnInterval 600


shortDelay :: StdGen -> Time -> m StdGen
shortDelay = fuzzyDelay' churnIntervalBulk 60

-- The min time between running the churn governor.
churnInterval :: Time
churnInterval = Time 3300


churnIntervalBulk :: Time
churnIntervalBulk = Time 300

-- Replace 20% or at least on peer every churnInterval.
decrease :: Int -> Int
decrease v = v - max 1 (v `div` 5)
Expand Down
Expand Up @@ -460,6 +460,7 @@ data TracePeerSelection peeraddr =
| TraceDemoteHotDone Int Int peeraddr
| TraceDemoteAsynchronous (Map peeraddr PeerStatus)
| TraceGovernorWakeup
| TraceChurnWait DiffTime
deriving Show

data DebugPeerSelection peeraddr peerconn =
Expand Down

0 comments on commit 77a9ece

Please sign in to comment.