Skip to content

Commit

Permalink
Adjust number of active peers based on fetchmode
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu committed Apr 8, 2021
1 parent 794e3c3 commit 2296155
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 31 deletions.
5 changes: 4 additions & 1 deletion ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Expand Up @@ -303,6 +303,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
ntcApps
nodeKernel
peerMetrics
btime

llrnRunDataDiffusion registry diffusionApplications
where
Expand Down Expand Up @@ -354,11 +355,12 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
)
-> NodeKernel m (ConnectionId addrNTN) (ConnectionId addrNTC) blk
-> PeerMetrics m addrNTN
-> BlockchainTime m
-> DiffusionApplications
addrNTN addrNTC
versionDataNTN versionDataNTC
m
mkDiffusionApplications miniProtocolParams ntnApps ntcApps kernel peerMetrics =
mkDiffusionApplications miniProtocolParams ntnApps ntcApps kernel peerMetrics btime =
DiffusionApplications {
daApplicationInitiatorMode = combineVersions
[ simpleSingletonVersions
Expand Down Expand Up @@ -386,6 +388,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
, daLocalRethrowPolicy = mempty
, daLedgerPeersCtx = LedgerPeersConsensusInterface (getPeersFromCurrentLedgerAfterSlot kernel)
, daPeerMetrics = peerMetrics
, daBlockFetchMode = getFetchMode (getChainDB kernel) btime
}

-- | Did the ChainDB already have existing clean-shutdown marker on disk?
Expand Down
47 changes: 29 additions & 18 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Expand Up @@ -22,6 +22,7 @@ module Ouroboros.Consensus.NodeKernel (
, getMempoolWriter
, getPeersFromCurrentLedger
, getPeersFromCurrentLedgerAfterSlot
, getFetchMode
) where


Expand Down Expand Up @@ -321,24 +322,7 @@ initBlockFetchConsensusInterface cfg chainDB getCandidates blockFetchSize btime
readCurrentChain = ChainDB.getCurrentChain chainDB

readFetchMode :: STM m FetchMode
readFetchMode = do
mCurSlot <- getCurrentSlot btime
case mCurSlot of
-- The current chain's tip far away from "now", so use bulk sync mode.
CurrentSlotUnknown -> return FetchModeBulkSync
CurrentSlot curSlot -> do
curChainSlot <- AF.headSlot <$> ChainDB.getCurrentChain chainDB
let slotsBehind = case curChainSlot of
-- There's nothing in the chain. If the current slot is 0, then
-- we're 1 slot behind.
Origin -> unSlotNo curSlot + 1
NotOrigin slot -> unSlotNo curSlot - unSlotNo slot
maxSlotsBehind = 1000
return $ if slotsBehind < maxSlotsBehind
-- When the current chain is near to "now", use deadline mode,
-- when it is far away, use bulk sync mode.
then FetchModeDeadline
else FetchModeBulkSync
readFetchMode = getFetchMode chainDB btime

readFetchedBlocks :: STM m (Point blk -> Bool)
readFetchedBlocks = ChainDB.getIsFetched chainDB
Expand Down Expand Up @@ -794,3 +778,30 @@ getPeersFromCurrentLedgerAfterSlot kernel slotNo =
case ledgerTipSlot st of
Origin -> False
NotOrigin tip -> tip > slotNo

getFetchMode
:: forall m blk.
( IOLike m
, BlockSupportsProtocol blk
)
=> ChainDB m blk
-> BlockchainTime m
-> STM m FetchMode
getFetchMode chainDB btime = do
mCurSlot <- getCurrentSlot btime
case mCurSlot of
-- The current chain's tip far away from "now", so use bulk sync mode.
CurrentSlotUnknown -> return FetchModeBulkSync
CurrentSlot curSlot -> do
curChainSlot <- AF.headSlot <$> ChainDB.getCurrentChain chainDB
let slotsBehind = case curChainSlot of
-- There's nothing in the chain. If the current slot is 0, then
-- we're 1 slot behind.
Origin -> unSlotNo curSlot + 1
NotOrigin slot -> unSlotNo curSlot - unSlotNo slot
maxSlotsBehind = 1000
return $ if slotsBehind < maxSlotsBehind
-- When the current chain is near to "now", use deadline mode,
-- when it is far away, use bulk sync mode.
then FetchModeDeadline
else FetchModeBulkSync
6 changes: 6 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Expand Up @@ -74,6 +74,7 @@ import Ouroboros.Network.Snocket ( FileDescriptor
)
import qualified Ouroboros.Network.Snocket as Snocket

import Ouroboros.Network.BlockFetch
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Version
import Ouroboros.Network.Protocol.Handshake.Codec
Expand Down Expand Up @@ -381,6 +382,8 @@ data DiffusionApplications ntnAddr ntcAddr ntnVersionData ntcVersionData m =
, daLedgerPeersCtx :: LedgerPeersConsensusInterface m
-- ^ Interface used to get peers from the current ledger.
, daPeerMetrics :: PeerMetrics m ntnAddr
, daBlockFetchMode :: STM m FetchMode
-- ^ Used by churn-governor
}


Expand Down Expand Up @@ -578,6 +581,7 @@ runDataDiffusion tracers
, daLocalRethrowPolicy
, daLedgerPeersCtx
, daPeerMetrics
, daBlockFetchMode
} =
-- We run two services: for /node-to-node/ and /node-to-client/. The
-- naming convention is that we use /local/ prefix for /node-to-client/
Expand Down Expand Up @@ -849,6 +853,7 @@ runDataDiffusion tracers
dtTracePeerSelectionTracer
daPeerMetrics
churnRng
daBlockFetchMode
daPeerSelectionTargets
peerSelectionTargetsVar)
$ \churnGovernorThread ->
Expand Down Expand Up @@ -988,6 +993,7 @@ runDataDiffusion tracers
dtTracePeerSelectionTracer
daPeerMetrics
churnRng
daBlockFetchMode
daPeerSelectionTargets
peerSelectionTargetsVar)
$ \churnGovernorThread ->
Expand Down
67 changes: 55 additions & 12 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs
Expand Up @@ -59,6 +59,7 @@ import qualified Ouroboros.Network.PeerSelection.Governor.Monitor as Mo
import qualified Ouroboros.Network.PeerSelection.Governor.RootPeers as RootPeers
import Ouroboros.Network.PeerSelection.Governor.Types
import Ouroboros.Network.PeerSelection.PeerMetric
import Ouroboros.Network.BlockFetch (FetchMode (..))

{- $overview
Expand Down Expand Up @@ -575,40 +576,62 @@ peerChurnGovernor :: forall m peeraddr.
=> Tracer m (TracePeerSelection peeraddr)
-> PeerMetrics m peeraddr
-> StdGen
-> STM m FetchMode
-> PeerSelectionTargets
-> StrictTVar m PeerSelectionTargets
-> m Void
peerChurnGovernor tracer metrics inRng base peerSelectionVar = do
peerChurnGovernor tracer metrics inRng getFetchMode base peerSelectionVar = do
-- Wait a while so that not only the closest peers have had the time
-- to become warm.
startTs0 <- getMonotonicTime
threadDelay 3
atomically $ modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers = targetNumberOfActivePeers base
})
atomically increaseActivePeers
endTs0 <- getMonotonicTime
fuzzyDelay inRng (Time $ diffTime endTs0 startTs0) >>= go

where

increaseActivePeers :: STM m ()
increaseActivePeers = do
mode <- getFetchMode
modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers =
case mode of
FetchModeDeadline ->
targetNumberOfActivePeers base
FetchModeBulkSync ->
min 2 (targetNumberOfActivePeers base)
})

decreaseActivePeers :: STM m ()
decreaseActivePeers = do
mode <- getFetchMode
modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers =
case mode of
FetchModeDeadline ->
decrease $ targetNumberOfActivePeers base
FetchModeBulkSync ->
min 1 (targetNumberOfActivePeers base - 1)
})


go :: StdGen -> m Void
go rng = do
startTs <- getMonotonicTime

headerScores <- upstreamyness <$> (atomically $ getHeaderMetrics metrics)
traceWith tracer $ TraceXXX $ "Header: " ++ (show $ sortOn snd $ Map.toList headerScores)

-- Purge the worst active peer(s).
atomically $ modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers = decrease (targetNumberOfActivePeers base)
})
atomically decreaseActivePeers

-- Short delay, we may have no active peers right now
threadDelay 1

-- Pick new active peer(s) based on the best performing established
-- peers.
atomically $ modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers = targetNumberOfActivePeers base
})
atomically increaseActivePeers

-- Give the promotion process time to start
threadDelay 1
Expand Down Expand Up @@ -637,15 +660,35 @@ peerChurnGovernor tracer metrics inRng base peerSelectionVar = do
-- Randomly delay between churnInterval and churnInterval + maxFuzz seconds.
fuzzyDelay :: StdGen -> Time -> m StdGen
fuzzyDelay rng execTime = do
let (fuzz, rng') = randomR (0, 600 :: Double) rng
threadDelay $ (realToFrac fuzz) + (diffTime churnInterval execTime)
mode <- atomically getFetchMode
case mode of
FetchModeDeadline -> longDelay rng execTime
FetchModeBulkSync -> shortDelay rng execTime

fuzzyDelay' :: Time -> Double -> StdGen -> Time -> m StdGen
fuzzyDelay' baseDelay maxFuzz rng execTime = do
let (fuzz, rng') = randomR (0, maxFuzz :: Double) rng
!delay = realToFrac fuzz + diffTime baseDelay execTime
traceWith tracer $ TraceChurnWait delay
threadDelay delay
return rng'


longDelay :: StdGen -> Time -> m StdGen
longDelay = fuzzyDelay' churnInterval 600


shortDelay :: StdGen -> Time -> m StdGen
shortDelay = fuzzyDelay' churnIntervalBulk 60

-- The min time between running the churn governor.
churnInterval :: Time
churnInterval = Time 3300


churnIntervalBulk :: Time
churnIntervalBulk = Time 300

-- Replace 20% or at least on peer every churnInterval.
decrease :: Int -> Int
decrease v = v - max 1 (v `div` 5)
Expand Down
Expand Up @@ -460,6 +460,7 @@ data TracePeerSelection peeraddr =
| TraceDemoteHotDone Int Int peeraddr
| TraceDemoteAsynchronous (Map peeraddr PeerStatus)
| TraceGovernorWakeup
| TraceChurnWait DiffTime
| TraceXXX String
deriving Show

Expand Down

0 comments on commit 2296155

Please sign in to comment.