Skip to content

Commit

Permalink
Peer metrics for bytes downloaded with policy
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu committed May 4, 2021
1 parent 7dc765c commit e40d838
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 39 deletions.
3 changes: 2 additions & 1 deletion ouroboros-consensus-test/src/Test/ThreadNet/Network.hs
Expand Up @@ -67,6 +67,7 @@ import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS

import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.PeerSelection.PeerMetric (nullMetric)
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
import Ouroboros.Network.Protocol.TxSubmission.Type
Expand Down Expand Up @@ -1010,7 +1011,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
, intersectTimeout = waitForever
, mustReplyTimeout = waitForever
})
(\_ _ _ -> return ())
nullMetric
(NTN.mkHandlers nodeKernelArgs nodeKernel)

-- In practice, a robust wallet/user can persistently add a transaction
Expand Down
Expand Up @@ -56,7 +56,7 @@ import qualified Ouroboros.Network.AnchoredFragment as AF
import qualified Ouroboros.Network.AnchoredSeq as AS
import Ouroboros.Network.Block (Tip, getTipBlockNo)
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.PeerSelection.PeerMetric (ReportHeaderMetricsSTM)
import Ouroboros.Network.PeerSelection.PeerMetric.Type (ReportHeaderMetricsSTM)
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision

Expand Down
Expand Up @@ -55,7 +55,8 @@ import Ouroboros.Network.Driver
import Ouroboros.Network.KeepAlive
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToNode
import Ouroboros.Network.PeerSelection.PeerMetric (ReportHeaderMetricsSTM)
import Ouroboros.Network.PeerSelection.PeerMetric.Type (ReportFetchedMetricsSTM,
ReportHeaderMetricsSTM, ReportPeerMetrics (..))
import Ouroboros.Network.Protocol.BlockFetch.Codec
import Ouroboros.Network.Protocol.BlockFetch.Server (BlockFetchServer,
blockFetchServerPeer)
Expand Down Expand Up @@ -125,6 +126,7 @@ data Handlers m peer blk = Handlers {
, hBlockFetchClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> ReportFetchedMetricsSTM m
-> BlockFetchClient (Header blk) blk m ()

, hBlockFetchServer
Expand Down Expand Up @@ -448,10 +450,10 @@ mkApps
-> Tracers m remotePeer blk e
-> Codecs blk e m bCS bCS bBF bBF bTX bTX2 bKA
-> m ChainSyncTimeout
-> (remotePeer -> ReportHeaderMetricsSTM m)
-> ReportPeerMetrics m remotePeer
-> Handlers m remotePeer blk
-> Apps m remotePeer bCS bBF bTX bTX2 bKA ()
mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout addHeaderMetrics Handlers {..} =
mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout ReportPeerMetrics {..} Handlers {..} =
Apps {..}
where
aChainSyncClient
Expand Down Expand Up @@ -484,7 +486,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout addHeaderMetrics Hand
channel
$ chainSyncClientPeerPipelined
$ hChainSyncClient them version controlMessageSTM
(addHeaderMetrics them) varCandidate
(reportHeader them) varCandidate
return ((), trailing)

aChainSyncServer
Expand Down Expand Up @@ -523,7 +525,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout addHeaderMetrics Hand
(byteLimitsBlockFetch (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsBlockFetch
channel
$ hBlockFetchClient version controlMessageSTM clientCtx
$ hBlockFetchClient version controlMessageSTM (reportFetch them) clientCtx

aBlockFetchServer
:: NodeToNodeVersion
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Expand Up @@ -74,7 +74,7 @@ import Ouroboros.Network.NodeToNode (DiffusionMode,
RemoteAddress, combineVersions,
defaultMiniProtocolParameters)
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..),
newPeerMetric, addHeaderMetric)
newPeerMetric, reportMetric)
import Ouroboros.Network.Protocol.Limits (shortWait)

import Ouroboros.Consensus.Block
Expand Down Expand Up @@ -328,7 +328,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
rnTraceNTN
(NTN.defaultCodecs codecConfig version)
llrnChainSyncTimeout
(addHeaderMetric peerMetrics)
(reportMetric peerMetrics)
(NTN.mkHandlers nodeKernelArgs nodeKernel)

mkNodeToClientApps
Expand Down
3 changes: 2 additions & 1 deletion ouroboros-network/demo/chain-sync.hs
Expand Up @@ -296,7 +296,8 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do
nullTracer -- (contramap (show . TraceLabelPeer connectionId) stdoutTracer)
codecBlockFetch
channel
(blockFetchClient NodeToNodeV_1 (continueForever (Proxy :: Proxy IO)) clientCtx)
(blockFetchClient NodeToNodeV_1 (continueForever (Proxy :: Proxy IO))
(\_ _ _ -> return ()) clientCtx)

blockFetchPolicy :: BlockFetchConsensusInterface
LocalConnectionId BlockHeader Block IO
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Expand Up @@ -70,6 +70,7 @@ library
Ouroboros.Network.PeerSelection.LedgerPeers
Ouroboros.Network.PeerSelection.LocalRootPeers
Ouroboros.Network.PeerSelection.PeerMetric
Ouroboros.Network.PeerSelection.PeerMetric.Type
Ouroboros.Network.PeerSelection.PeerStateActions
Ouroboros.Network.PeerSelection.RootPeersDNS
Ouroboros.Network.PeerSelection.Governor
Expand Down
9 changes: 8 additions & 1 deletion ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs
Expand Up @@ -56,6 +56,7 @@ import Ouroboros.Network.BlockFetch.ClientState
, rejectedFetchBatch )
import Ouroboros.Network.BlockFetch.DeltaQ
( PeerGSV(..), PeerFetchInFlightLimits(..) )
import Ouroboros.Network.PeerSelection.PeerMetric.Type (ReportFetchedMetricsSTM)


data BlockFetchProtocolFailure =
Expand Down Expand Up @@ -84,9 +85,10 @@ blockFetchClient :: forall header block m.
HeaderHash header ~ HeaderHash block)
=> NodeToNodeVersion
-> ControlMessageSTM m
-> ReportFetchedMetricsSTM m
-> FetchClientContext header block m
-> PeerPipelined (BlockFetch block (Point block)) AsClient BFIdle m ()
blockFetchClient _version controlMessageSTM
blockFetchClient _version controlMessageSTM reportFetched
FetchClientContext {
fetchClientCtxTracer = tracer,
fetchClientCtxPolicy = FetchClientPolicy {
Expand Down Expand Up @@ -266,6 +268,7 @@ blockFetchClient _version controlMessageSTM

(MsgBlock block, header:headers') -> ReceiverEffect $ do
now <- getCurrentTime
nowMono <- getMonotonicTime
--TODO: consider how to enforce expected block size limit.
-- They've lied and are sending us a massive amount of data.
-- Resource consumption attack.
Expand Down Expand Up @@ -297,6 +300,10 @@ blockFetchClient _version controlMessageSTM
forgeTime <- atomically $ blockForgeUTCTime $ FromConsensus block
let blockDelay = diffUTCTime now forgeTime

let hf = getHeaderFields header
slotNo = headerFieldSlot hf
atomically $ reportFetched (blockFetchSize header) slotNo nowMono

-- Note that we add the block to the chain DB /before/ updating our
-- current status and in-flight stats. Otherwise blocks will
-- disappear from our in-flight set without yet appearing in the
Expand Down
11 changes: 8 additions & 3 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Expand Up @@ -95,7 +95,8 @@ import Ouroboros.Network.PeerSelection.RootPeersDNS ( DomainAddress
import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..))
import Ouroboros.Network.InboundGovernor.State (InboundGovernorCounters (..))
import qualified Ouroboros.Network.PeerSelection.Governor as Governor
import Ouroboros.Network.PeerSelection.Governor.Types ( TracePeerSelection (..)
import Ouroboros.Network.PeerSelection.Governor.Types ( ChurnMode (..)
, TracePeerSelection (..)
, DebugPeerSelection (..)
, PeerSelectionCounters (..)
)
Expand Down Expand Up @@ -656,6 +657,8 @@ runDataDiffusion tracers
(policyRng, churnRng) = split rng'
policyRngVar <- newTVarIO policyRng

churnModeVar <- newTVarIO ChurnModeNormal

-- Request interface, supply the number of peers desired.
ledgerPeersReq <- newEmptyTMVarIO :: IO (StrictTMVar IO NumberOfPeers)
-- Response interface, returns a Set of peers. Nothing indicates that the
Expand Down Expand Up @@ -866,12 +869,13 @@ runDataDiffusion tracers
dtTracePeerSelectionCounters
peerSelectionActions
(Diffusion.Policies.simplePeerSelectionPolicy
policyRngVar daPeerMetrics))
policyRngVar (readTVar churnModeVar) daPeerMetrics))
$ \governorThread ->
Async.withAsync
(Governor.peerChurnGovernor
dtTracePeerSelectionTracer
daPeerMetrics
churnModeVar
churnRng
daBlockFetchMode
daPeerSelectionTargets
Expand Down Expand Up @@ -979,7 +983,7 @@ runDataDiffusion tracers
dtTracePeerSelectionCounters
peerSelectionActions
(Diffusion.Policies.simplePeerSelectionPolicy
policyRngVar daPeerMetrics))
policyRngVar (readTVar churnModeVar) daPeerMetrics))
$ \governorThread -> do
let mkAddr :: AddrInfo -> (Socket.Family, SockAddr)
mkAddr addr = ( Socket.addrFamily addr
Expand Down Expand Up @@ -1014,6 +1018,7 @@ runDataDiffusion tracers
(Governor.peerChurnGovernor
dtTracePeerSelectionTracer
daPeerMetrics
churnModeVar
churnRng
daBlockFetchMode
daPeerSelectionTargets
Expand Down
10 changes: 8 additions & 2 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs
Expand Up @@ -40,9 +40,10 @@ closeConnectionTimeout = 120

simplePeerSelectionPolicy :: forall m. MonadSTM m
=> StrictTVar m StdGen
-> STM m ChurnMode
-> PeerMetrics m SockAddr
-> PeerSelectionPolicy SockAddr m
simplePeerSelectionPolicy rngVar metrics = PeerSelectionPolicy {
simplePeerSelectionPolicy rngVar getChurnMode metrics = PeerSelectionPolicy {
policyPickKnownPeersForGossip = simplePromotionPolicy,
policyPickColdPeersToPromote = simplePromotionPolicy,
policyPickWarmPeersToPromote = simplePromotionPolicy,
Expand Down Expand Up @@ -72,7 +73,12 @@ simplePeerSelectionPolicy rngVar metrics = PeerSelectionPolicy {

hotDemotionPolicy :: PickPolicy SockAddr m
hotDemotionPolicy available pickNum = do
scores <- upstreamyness <$> (getHeaderMetrics metrics)
mode <- getChurnMode
scores <- case mode of
ChurnModeNormal ->
upstreamyness <$> (getHeaderMetrics metrics)
ChurnModeBulkSync ->
fetchyness <$> (getFetchedMetrics metrics)
available' <- addRand available
return $ Set.fromList
. map fst
Expand Down
44 changes: 29 additions & 15 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs
Expand Up @@ -580,43 +580,52 @@ peerChurnGovernor :: forall m peeraddr.
)
=> Tracer m (TracePeerSelection peeraddr)
-> PeerMetrics m peeraddr
-> StrictTVar m ChurnMode
-> StdGen
-> STM m FetchMode
-> PeerSelectionTargets
-> StrictTVar m PeerSelectionTargets
-> m Void
peerChurnGovernor tracer metrics inRng getFetchMode base peerSelectionVar = do
peerChurnGovernor tracer metrics churnModeVar inRng getFetchMode base peerSelectionVar = do
-- Wait a while so that not only the closest peers have had the time
-- to become warm.
startTs0 <- getMonotonicTime
threadDelay 3
atomically increaseActivePeers
mode <- atomically updateChurnMode
atomically $ increaseActivePeers mode
endTs0 <- getMonotonicTime
fuzzyDelay inRng (Time $ diffTime endTs0 startTs0) >>= go

where

increaseActivePeers :: STM m ()
increaseActivePeers = do
mode <- getFetchMode
updateChurnMode :: STM m ChurnMode
updateChurnMode = do
fm <- getFetchMode
let mode = case fm of
FetchModeDeadline -> ChurnModeNormal
FetchModeBulkSync -> ChurnModeBulkSync
writeTVar churnModeVar mode
return mode

increaseActivePeers :: ChurnMode -> STM m ()
increaseActivePeers mode = do
modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers =
case mode of
FetchModeDeadline ->
ChurnModeNormal ->
targetNumberOfActivePeers base
FetchModeBulkSync ->
ChurnModeBulkSync ->
min 2 (targetNumberOfActivePeers base)
})

decreaseActivePeers :: STM m ()
decreaseActivePeers = do
mode <- getFetchMode
decreaseActivePeers :: ChurnMode -> STM m ()
decreaseActivePeers mode = do
modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers =
case mode of
FetchModeDeadline ->
ChurnModeNormal ->
decrease $ targetNumberOfActivePeers base
FetchModeBulkSync ->
ChurnModeBulkSync ->
min 1 (targetNumberOfActivePeers base - 1)
})

Expand All @@ -626,17 +635,22 @@ peerChurnGovernor tracer metrics inRng getFetchMode base peerSelectionVar = do
startTs <- getMonotonicTime

headerScores <- upstreamyness <$> (atomically $ getHeaderMetrics metrics)
fetchScores <- fetchyness <$> (atomically $ getFetchedMetrics metrics)
traceWith tracer $ TraceXXX $ "Header: " ++ (show $ sortOn snd $ Map.toList headerScores)
traceWith tracer $ TraceXXX $ "Bytes: " ++ (show $ sortOn snd $ Map.toList fetchScores)

churnMode <- atomically updateChurnMode
traceWith tracer $ TraceChurnMode churnMode

-- Purge the worst active peer(s).
atomically decreaseActivePeers
atomically $ decreaseActivePeers churnMode

-- Short delay, we may have no active peers right now
threadDelay 1

-- Pick new active peer(s) based on the best performing established
-- peers.
atomically increaseActivePeers
atomically $ increaseActivePeers churnMode

-- Give the promotion process time to start
threadDelay 1
Expand Down Expand Up @@ -692,7 +706,7 @@ peerChurnGovernor tracer metrics inRng getFetchMode base peerSelectionVar = do


churnIntervalBulk :: Time
churnIntervalBulk = Time 300
churnIntervalBulk = Time 600

-- Replace 20% or at least on peer every churnInterval.
decrease :: Int -> Int
Expand Down
Expand Up @@ -512,6 +512,7 @@ data TracePeerSelection peeraddr =
| TraceDemoteAsynchronous (Map peeraddr PeerStatus)
| TraceGovernorWakeup
| TraceChurnWait DiffTime
| TraceChurnMode ChurnMode
| TraceXXX String
deriving Show

Expand All @@ -520,3 +521,7 @@ data DebugPeerSelection peeraddr peerconn =
(Maybe DiffTime)
(PeerSelectionState peeraddr peerconn)
deriving (Show, Functor)

data ChurnMode = ChurnModeBulkSync
| ChurnModeNormal deriving Show

0 comments on commit e40d838

Please sign in to comment.