Skip to content

Commit

Permalink
p2p-governor: peer metrics for bytes downloaded with policy
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu authored and coot committed Oct 26, 2021
1 parent 4d0f950 commit 803ac6e
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 26 deletions.
3 changes: 2 additions & 1 deletion ouroboros-network/demo/chain-sync.hs
Expand Up @@ -298,7 +298,8 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do
nullTracer -- (contramap (show . TraceLabelPeer connectionId) stdoutTracer)
codecBlockFetch
channel
(blockFetchClient NodeToNodeV_1 (continueForever (Proxy :: Proxy IO)) clientCtx)
(blockFetchClient NodeToNodeV_1 (continueForever (Proxy :: Proxy IO))
(\_ _ _ -> return ()) clientCtx)

blockFetchPolicy :: BlockFetchConsensusInterface
LocalConnectionId BlockHeader Block IO
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Expand Up @@ -79,6 +79,7 @@ library
Ouroboros.Network.PeerSelection.LedgerPeers
Ouroboros.Network.PeerSelection.LocalRootPeers
Ouroboros.Network.PeerSelection.PeerMetric
Ouroboros.Network.PeerSelection.PeerMetric.Type
Ouroboros.Network.PeerSelection.PeerStateActions
Ouroboros.Network.PeerSelection.RelayAccessPoint
Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions
Expand Down
9 changes: 8 additions & 1 deletion ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs
Expand Up @@ -56,6 +56,7 @@ import Ouroboros.Network.BlockFetch.ClientState
, rejectedFetchBatch )
import Ouroboros.Network.BlockFetch.DeltaQ
( PeerGSV(..), PeerFetchInFlightLimits(..) )
import Ouroboros.Network.PeerSelection.PeerMetric.Type (ReportFetchedMetricsSTM)


data BlockFetchProtocolFailure =
Expand Down Expand Up @@ -84,9 +85,10 @@ blockFetchClient :: forall header block m.
HeaderHash header ~ HeaderHash block)
=> NodeToNodeVersion
-> ControlMessageSTM m
-> ReportFetchedMetricsSTM m
-> FetchClientContext header block m
-> PeerPipelined (BlockFetch block (Point block)) AsClient BFIdle m ()
blockFetchClient _version controlMessageSTM
blockFetchClient _version controlMessageSTM reportFetched
FetchClientContext {
fetchClientCtxTracer = tracer,
fetchClientCtxPolicy = FetchClientPolicy {
Expand Down Expand Up @@ -267,6 +269,7 @@ blockFetchClient _version controlMessageSTM

(MsgBlock block, header:headers') -> ReceiverEffect $ do
now <- getCurrentTime
nowMono <- getMonotonicTime
--TODO: consider how to enforce expected block size limit.
-- They've lied and are sending us a massive amount of data.
-- Resource consumption attack.
Expand Down Expand Up @@ -298,6 +301,10 @@ blockFetchClient _version controlMessageSTM
forgeTime <- atomically $ blockForgeUTCTime $ FromConsensus block
let blockDelay = diffUTCTime now forgeTime

let hf = getHeaderFields header
slotNo = headerFieldSlot hf
atomically $ reportFetched (blockFetchSize header) slotNo nowMono

-- Note that we add the block to the chain DB /before/ updating our
-- current status and in-flight stats. Otherwise blocks will
-- disappear from our in-flight set without yet appearing in the
Expand Down
10 changes: 8 additions & 2 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs
Expand Up @@ -40,9 +40,10 @@ closeConnectionTimeout = 120

simplePeerSelectionPolicy :: forall m. MonadSTM m
=> StrictTVar m StdGen
-> STM m ChurnMode
-> PeerMetrics m SockAddr
-> PeerSelectionPolicy SockAddr m
simplePeerSelectionPolicy rngVar metrics = PeerSelectionPolicy {
simplePeerSelectionPolicy rngVar getChurnMode metrics = PeerSelectionPolicy {
policyPickKnownPeersForGossip = simplePromotionPolicy,
policyPickColdPeersToPromote = simplePromotionPolicy,
policyPickWarmPeersToPromote = simplePromotionPolicy,
Expand Down Expand Up @@ -72,7 +73,12 @@ simplePeerSelectionPolicy rngVar metrics = PeerSelectionPolicy {

hotDemotionPolicy :: PickPolicy SockAddr m
hotDemotionPolicy _ _ available pickNum = do
scores <- upstreamyness <$> (getHeaderMetrics metrics)
mode <- getChurnMode
scores <- case mode of
ChurnModeNormal ->
upstreamyness <$> (getHeaderMetrics metrics)
ChurnModeBulkSync ->
fetchyness <$> (getFetchedMetrics metrics)
available' <- addRand available
return $ Set.fromList
. map fst
Expand Down
40 changes: 26 additions & 14 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs
Expand Up @@ -587,47 +587,56 @@ peerChurnGovernor :: forall m peeraddr.
)
=> Tracer m (TracePeerSelection peeraddr)
-> PeerMetrics m peeraddr
-> StrictTVar m ChurnMode
-> StdGen
-> STM m FetchMode
-> PeerSelectionTargets
-> StrictTVar m PeerSelectionTargets
-> m Void
peerChurnGovernor tracer _metrics inRng getFetchMode base peerSelectionVar = do
peerChurnGovernor tracer _metrics churnModeVar inRng getFetchMode base peerSelectionVar = do
-- Wait a while so that not only the closest peers have had the time
-- to become warm.
startTs0 <- getMonotonicTime
-- TODO: revisit the policy once we have local root peers in the governor.
-- The intention is to give local root peers give head start and avoid
-- giving advantage to hostile and quick root peers.
threadDelay 3
atomically increaseActivePeers
mode <- atomically updateChurnMode
atomically $ increaseActivePeers mode
endTs0 <- getMonotonicTime
fuzzyDelay inRng (endTs0 `diffTime` startTs0) >>= go

where

updateChurnMode :: STM m ChurnMode
updateChurnMode = do
fm <- getFetchMode
let mode = case fm of
FetchModeDeadline -> ChurnModeNormal
FetchModeBulkSync -> ChurnModeBulkSync
writeTVar churnModeVar mode
return mode

-- TODO: #3396 revisit the policy for genesis
increaseActivePeers :: STM m ()
increaseActivePeers = do
mode <- getFetchMode
increaseActivePeers :: ChurnMode -> STM m ()
increaseActivePeers mode = do
modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers =
case mode of
FetchModeDeadline ->
ChurnModeNormal ->
targetNumberOfActivePeers base
FetchModeBulkSync ->
ChurnModeBulkSync ->
min 2 (targetNumberOfActivePeers base)
})

decreaseActivePeers :: STM m ()
decreaseActivePeers = do
mode <- getFetchMode
decreaseActivePeers :: ChurnMode -> STM m ()
decreaseActivePeers mode = do
modifyTVar peerSelectionVar (\targets -> targets {
targetNumberOfActivePeers =
case mode of
FetchModeDeadline ->
ChurnModeNormal ->
decrease $ targetNumberOfActivePeers base
FetchModeBulkSync ->
ChurnModeBulkSync ->
min 1 (targetNumberOfActivePeers base - 1)
})

Expand All @@ -636,15 +645,18 @@ peerChurnGovernor tracer _metrics inRng getFetchMode base peerSelectionVar = do
go !rng = do
startTs <- getMonotonicTime

churnMode <- atomically updateChurnMode
traceWith tracer $ TraceChurnMode churnMode

-- Purge the worst active peer(s).
atomically decreaseActivePeers
atomically $ decreaseActivePeers churnMode

-- Short delay, we may have no active peers right now
threadDelay 1

-- Pick new active peer(s) based on the best performing established
-- peers.
atomically increaseActivePeers
atomically $ increaseActivePeers churnMode

-- Give the promotion process time to start
threadDelay 1
Expand Down
Expand Up @@ -19,6 +19,7 @@ module Ouroboros.Network.PeerSelection.Governor.Types
-- These records are needed to run the peer selection.
, PeerStateActions (..)
, PeerSelectionActions (..)
, ChurnMode (..)

-- * P2P govnernor internals
, PeerSelectionState (..)
Expand Down Expand Up @@ -598,10 +599,15 @@ data TracePeerSelection peeraddr =
| TraceDemoteAsynchronous (Map peeraddr PeerStatus)
| TraceGovernorWakeup
| TraceChurnWait DiffTime
| TraceChurnMode ChurnMode
deriving Show

data DebugPeerSelection peeraddr peerconn =
TraceGovernorState Time -- blocked time
(Maybe DiffTime) -- wait time
(PeerSelectionState peeraddr peerconn)
deriving (Show, Functor)

data ChurnMode = ChurnModeBulkSync
| ChurnModeNormal deriving Show

Expand Up @@ -15,7 +15,10 @@ import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime

import Cardano.Slotting.Slot (SlotNo (..))
import Ouroboros.Network.DeltaQ ( SizeInBytes )
import Ouroboros.Network.NodeToNode ( ConnectionId (..))
import Ouroboros.Network.PeerSelection.PeerMetric.Type


-- The maximum numbers of slots we will store data for.
-- On some chains sometimes this corresponds to 1h
Expand All @@ -24,27 +27,54 @@ maxSlotsToTrack :: Int
maxSlotsToTrack = 180


type ReportHeaderMetricsSTM m = (SlotNo -> Time -> STM m ())

type SlotMetric p = IntPSQ SlotNo (p, Time)

data PeerMetrics m p = PeerMetrics {
headerMetrics :: StrictTVar m (SlotMetric p)
, fetchedMetrics :: StrictTVar m (SlotMetric (p, SizeInBytes))
}

reportMetric
:: forall m p.
( MonadSTM m )
=> PeerMetrics m p
-> ReportPeerMetrics m (ConnectionId p)
reportMetric peerMetrics =
ReportPeerMetrics (addHeaderMetric peerMetrics)
(addFetchedMetric peerMetrics)

nullMetric
:: MonadSTM m
=> ReportPeerMetrics m p
nullMetric =
ReportPeerMetrics (\_ _ _ -> pure ())
(\_ _ _ _ -> pure ())

slotMetricKey :: SlotNo -> Int
slotMetricKey (SlotNo s) = fromIntegral s

addHeaderMetric
:: forall m p.
( MonadSTM m )
=> PeerMetrics m p
-> (ConnectionId p)
-> ConnectionId p
-> SlotNo
-> Time
-> STM m ()
addHeaderMetric PeerMetrics{headerMetrics} con slotNo time =
addMetrics headerMetrics (remoteAddress con) slotNo time
addHeaderMetric PeerMetrics{headerMetrics} con =
addMetrics headerMetrics (remoteAddress con)

addFetchedMetric
:: forall m p.
( MonadSTM m )
=> PeerMetrics m p
-> ConnectionId p
-> SizeInBytes
-> SlotNo
-> Time
-> STM m ()
addFetchedMetric PeerMetrics{fetchedMetrics} con bytes =
addMetrics fetchedMetrics (remoteAddress con, bytes)


getHeaderMetrics
Expand All @@ -53,6 +83,12 @@ getHeaderMetrics
-> STM m (SlotMetric p)
getHeaderMetrics PeerMetrics{headerMetrics} = readTVar headerMetrics

getFetchedMetrics
:: MonadSTM m
=> PeerMetrics m p
-> STM m (SlotMetric (p, SizeInBytes))
getFetchedMetrics PeerMetrics{fetchedMetrics} = readTVar fetchedMetrics

addMetrics
:: forall m p. ( MonadSTM m )
=> StrictTVar m (SlotMetric p)
Expand Down Expand Up @@ -84,7 +120,8 @@ newPeerMetric
=> m (PeerMetrics m p)
newPeerMetric = do
hs <- newTVarIO Pq.empty
return $ PeerMetrics hs
bs <- newTVarIO Pq.empty
return $ PeerMetrics hs bs

-- Returns a Map which counts the number of times a given peer
-- was the first to present us with a block/header.
Expand All @@ -107,4 +144,25 @@ upstreamyness = Pq.fold' count Map.empty
fn (Just c) = Just $! c + 1


-- Returns a Map which counts the number of bytes downloaded
-- for a given peer.
fetchyness
:: forall p. ( Ord p )
=> SlotMetric (p, SizeInBytes)
-> Map p Int
fetchyness = Pq.fold' count Map.empty
where
count :: Int
-> SlotNo
-> ((p, SizeInBytes), Time)
-> Map p Int
-> Map p Int
count _ _ ((peer, bytes),_) m =
Map.alter fn peer m
where
fn :: Maybe Int -> Maybe Int
fn Nothing = Just $ fromIntegral bytes
fn (Just oldBytes) = Just $! oldBytes + fromIntegral bytes



@@ -0,0 +1,15 @@
module Ouroboros.Network.PeerSelection.PeerMetric.Type where

import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime

import Cardano.Slotting.Slot (SlotNo (..))
import Ouroboros.Network.DeltaQ ( SizeInBytes )

type ReportHeaderMetricsSTM m = (SlotNo -> Time -> STM m ())
type ReportFetchedMetricsSTM m = (SizeInBytes -> SlotNo -> Time -> STM m ())

data ReportPeerMetrics m p = ReportPeerMetrics {
reportHeader :: p -> ReportHeaderMetricsSTM m
, reportFetch :: p -> ReportFetchedMetricsSTM m
}
Expand Up @@ -88,7 +88,7 @@ blockFetchExample0 decisionTracer clientStateTracer clientMsgTracer
(contramap (TraceLabelPeer peerno) serverMsgTracer)
clientDelay serverDelay
registry peerno
(blockFetchClient NodeToNodeV_1 controlMessageSTM)
(blockFetchClient NodeToNodeV_1 controlMessageSTM (\_ _ _ -> pure ()))
(mockBlockFetchServer1 candidateChain)

fetchAsync <- async $ do
Expand Down Expand Up @@ -194,7 +194,7 @@ blockFetchExample1 decisionTracer clientStateTracer clientMsgTracer
(contramap (TraceLabelPeer peerno) serverMsgTracer)
clientDelay serverDelay
registry peerno
(blockFetchClient NodeToNodeV_1 controlMessageSTM)
(blockFetchClient NodeToNodeV_1 controlMessageSTM (\_ _ _ -> pure ()))
(mockBlockFetchServer1 candidateChain)
| (peerno, candidateChain) <- zip [1..] candidateChains
]
Expand Down
Expand Up @@ -488,6 +488,7 @@ traceNum TraceDemoteHotDone{} = 22
traceNum TraceDemoteAsynchronous{} = 23
traceNum TraceGovernorWakeup{} = 24
traceNum TraceChurnWait{} = 25
traceNum TraceChurnMode{} = 26

allTraceNames :: Map Int String
allTraceNames =
Expand Down

0 comments on commit 803ac6e

Please sign in to comment.