Skip to content

Commit

Permalink
Added PeerSelectionCounters and respective tracers.
Browse files Browse the repository at this point in the history
Added counter null tracer to Diffusion.hs nullTracers
  • Loading branch information
bolt12 committed Apr 8, 2021
1 parent 80d70e4 commit 70a53b1
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 12 deletions.
Expand Up @@ -123,6 +123,11 @@ data ConnectionManagerArguments handlerTrace socket peerAddr handle handleError
type ConnectionManagerState peerAddr handle handleError version m
= Map peerAddr (StrictTVar m (ConnectionState peerAddr handle handleError version m))

connectionManagerStateToCounters
:: Map peerAddr (ConnectionState peerAddr handle handleError version m)
-> ConnectionManagerCounters
connectionManagerStateToCounters =
Map.foldMapWithKey (const abstractStateToCounters) . (abstractState . Known <$>)

-- | State of a connection.
--
Expand Down Expand Up @@ -447,6 +452,11 @@ withConnectionManager ConnectionManagerArguments {
traverse_ cancel (getConnThread connState) )
state
where
traceCounters :: StrictTMVar m (ConnectionManagerState peerAddr handle handleError version m) -> m ()
traceCounters stateVar = do
mState <- atomically $ readTMVar stateVar >>= traverse readTVar
traceWith tracer (TrConnectionManagerCounters (connectionManagerStateToCounters mState))

countConnections :: ConnectionManagerState peerAddr handle handleError version m
-> STM m Int
countConnections state =
Expand Down Expand Up @@ -546,7 +556,8 @@ withConnectionManager ConnectionManagerArguments {
)

case mConnVar of
Left !connState ->
Left !connState -> do
traceCounters stateVar
traceWith trTracer (TransitionTrace peerAddr
Transition
{ fromState = connState
Expand Down Expand Up @@ -585,6 +596,7 @@ withConnectionManager ConnectionManagerArguments {
)
let connState = maybe Unknown Known mConnState
kConnState' = Known connState'

case updated of
Nothing ->
return [ Transition { fromState = connState
Expand All @@ -599,6 +611,8 @@ withConnectionManager ConnectionManagerArguments {
, toState = kConnState'
}
]

traceCounters stateVar
traverse_ (traceWith trTracer . TransitionTrace peerAddr) trs

-- start connection thread
Expand Down Expand Up @@ -685,6 +699,7 @@ withConnectionManager ConnectionManagerArguments {
(Just handleError)
HandshakeProtocolViolation -> TerminatedState (Just handleError)
modifyTMVarPure_ stateVar (Map.delete peerAddr)
traceCounters stateVar
return (Disconnected connId (Just handleError))

Right (handle, version) -> do
Expand Down Expand Up @@ -748,7 +763,7 @@ withConnectionManager ConnectionManagerArguments {
(connectionDataFlow version)
writeTVar connVar connState'
return (mkTransition connState connState')

traceCounters stateVar
-- Note that we don't set a timeout thread here which would perform
-- @
-- Commit^{dataFlow}
Expand Down Expand Up @@ -865,6 +880,7 @@ withConnectionManager ConnectionManagerArguments {
, UnsupportedState TerminatedSt )

traverse_ cancel mbThread
traceCounters stateVar
traverse_ (traceWith trTracer . TransitionTrace peerAddr) mbTransition
return result

Expand Down Expand Up @@ -1033,6 +1049,7 @@ withConnectionManager ConnectionManagerArguments {
else Just connVar')
peerAddr)
return (mkTransition connState connState')
traceCounters stateVar
traceWith trTracer (TransitionTrace peerAddr tr)

)
Expand Down Expand Up @@ -1071,6 +1088,7 @@ withConnectionManager ConnectionManagerArguments {
let connState' = UnnegotiatedState provenance connId connThread
writeTVar connVar connState'
return (mkTransition connState connState')
traceCounters stateVar
traceWith trTracer (TransitionTrace peerAddr tr)

res <- atomically (readPromise reader)
Expand All @@ -1088,6 +1106,7 @@ withConnectionManager ConnectionManagerArguments {
(Just handleError)
HandshakeProtocolViolation ->
TerminatedState (Just handleError)

return ( Map.update
(\connVar' ->
if eqTVar (Proxy :: Proxy m) connVar' connVar
Expand Down Expand Up @@ -1123,6 +1142,7 @@ withConnectionManager ConnectionManagerArguments {
newOutboundConnection controlChannel connId dataFlow handle
NotInResponderMode -> return ()
return (mkTransition connState connState')
traceCounters stateVar
traceWith
trTracer
(TransitionTrace peerAddr transition)
Expand Down Expand Up @@ -1212,10 +1232,12 @@ withConnectionManager ConnectionManagerArguments {
case etr of
Left tr' -> traceWith trTracer tr'
Right tr' -> traceWith tracer tr'
traceCounters stateVar
return connected

-- Connection manager has a connection which can be reused.
Right (Here connected) ->
Right (Here connected) -> do
traceCounters stateVar
return connected


Expand Down Expand Up @@ -1350,6 +1372,7 @@ withConnectionManager ConnectionManagerArguments {
TerminatedState _handleError ->
return (DemoteToColdLocalNoop (mkTransition connState connState))

traceCounters stateVar
case transition of
DemotedToColdLocal _connId connThread tr -> do
traceWith trTracer (TransitionTrace peerAddr tr)
Expand Down Expand Up @@ -1424,6 +1447,8 @@ withConnectionManager ConnectionManagerArguments {
TerminatedState {} ->
return (UnsupportedState TerminatedSt)

traceCounters stateVar

-- trace transition
case result of
OperationSuccess tr ->
Expand Down Expand Up @@ -1485,6 +1510,7 @@ withConnectionManager ConnectionManagerArguments {
TerminatedState {} ->
return (UnsupportedState TerminatedSt)

traceCounters stateVar
-- trace transition
case result of
OperationSuccess tr ->
Expand Down
Expand Up @@ -125,6 +125,9 @@ module Ouroboros.Network.ConnectionManager.Types
, ConnectionManagerError (..)
, SomeConnectionManagerError (..)
, AbstractState (..)
-- * Counters
, ConnectionManagerCounters (..)
, abstractStateToCounters
-- * Mux types
, WithMuxMode (..)
, withInitiatorMode
Expand Down Expand Up @@ -635,6 +638,47 @@ data AbstractState
| TerminatedSt
deriving (Eq, Show, Typeable)

-- | Counters for tracing and analysis purposes
--
data ConnectionManagerCounters = ConnectionManagerCounters {
numberConns :: !Int, -- ^ number of connections relevant for pruning
duplexConns :: !Int, -- ^ number of negotiated duplex connections (including DuplexState connections)
uniConns :: !Int, -- ^ number of negotiated unidirectional connections
incomingConns :: !Int, -- ^ number of inbound connections
outgoingConns :: !Int -- ^ number of outbound connections
} deriving Show

instance Semigroup ConnectionManagerCounters where
ConnectionManagerCounters c1 d1 s1 i1 o1 <> ConnectionManagerCounters c2 d2 s2 i2 o2 =
ConnectionManagerCounters (c1 + c2) (d1 + d2) (s1 + s2) (i1 + i2) (o1 + o2)

instance Monoid ConnectionManagerCounters where
mempty = ConnectionManagerCounters 0 0 0 0 0

-- | Perform counting from an 'AbstractState'
abstractStateToCounters :: AbstractState -> ConnectionManagerCounters
abstractStateToCounters state =
case state of
UnknownConnectionSt -> mempty
ReservedOutboundSt -> mempty
(UnnegotiatedSt Inbound) -> conn <> incomingConns
(UnnegotiatedSt Outbound) -> outgoingConn
(InboundIdleSt Unidirectional) -> conn <> uniConn <> incomingConn
(InboundIdleSt Duplex) -> conn <> duplexConn <> incomingConn
(InboundSt Unidirectional) -> conn <> uniConn <> incommingConn
(InboundSt Duplex) -> conn <> duplexConn <> incommingConn
OutboundUniSt -> uniConn <> outgoingConns
(OutboundDupSt _) -> conn <> duplexConn <> outgoingConn
DuplexSt -> conn <> duplexConn <> incommingConn <> outgoingConn
WaitRemoteIdleSt -> mempty
TerminatingSt -> mempty
TerminatedSt -> mempty
where
conn = ConnectionManagerCounters 1 0 0 0 0
duplexConn = ConnectionManagerCoutners 0 1 0 0 0
uniConn = ConnectionManagerCoutners 0 0 1 0 0
incommingConn = ConnectionManagerCounters 0 0 0 1 0
outgoingConn = ConnectionManagerCounters 0 0 0 0 1

-- | Exceptions used by 'ConnectionManager'.
--
Expand Down Expand Up @@ -789,7 +833,7 @@ data ConnectionManagerTrace peerAddr handlerTrace
| TrConnectionCleanup !(ConnectionId peerAddr)
| TrConnectionTimeWait !(ConnectionId peerAddr)
| TrConnectionTimeWaitDone !(ConnectionId peerAddr)
| TrDebugState !(Map peerAddr AbstractState)
| TrConnectionManagerCounters !ConnectionManagerCounters
deriving Show


Expand Down
17 changes: 13 additions & 4 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Expand Up @@ -31,8 +31,10 @@ module Ouroboros.Network.Diffusion
, TracePeerSelection (..)
, DebugPeerSelection (..)
, PeerSelectionActionsTrace (..)
, PeerSelectionCounters (..)
, ConnectionManagerTrace (..)
, ConnectionHandlerTrace (..)
, ConnectionManagerCounters (..)
, ServerTrace (..)
)
where
Expand Down Expand Up @@ -90,6 +92,7 @@ import Ouroboros.Network.PeerSelection.RootPeersDNS ( DomainAddress
import qualified Ouroboros.Network.PeerSelection.Governor as Governor
import Ouroboros.Network.PeerSelection.Governor.Types ( TracePeerSelection (..)
, DebugPeerSelection (..)
, PeerSelectionCounters (..)
)
import Ouroboros.Network.PeerSelection.LedgerPeers ( LedgerPeersConsensusInterface (..)
, TraceLedgerPeers
Expand Down Expand Up @@ -183,6 +186,8 @@ data DiffusionTracers = DiffusionTracers {
SockAddr
(NodeToNodePeerConnectionHandle InitiatorResponderMode ()))

, dtTracePeerSelectionCounters
:: Tracer IO PeerSelectionCounters

, dtPeerSelectionActionsTracer
:: Tracer IO (PeerSelectionActionsTrace SockAddr)
Expand Down Expand Up @@ -235,6 +240,7 @@ nullTracers = DiffusionTracers {
, dtTracePeerSelectionTracer = nullTracer
, dtDebugPeerSelectionInitiatorTracer = nullTracer
, dtDebugPeerSelectionInitiatorResponderTracer = nullTracer
, dtTracePeerSelectionCounters = nullTracer
, dtPeerSelectionActionsTracer = nullTracer
, dtConnectionManagerTracer = nullTracer
, dtServerTracer = nullTracer
Expand Down Expand Up @@ -354,7 +360,7 @@ data DiffusionApplications ntnAddr ntcAddr ntnVersionData ntcVersionData m =
(OuroborosBundle
InitiatorResponderMode ntnAddr
ByteString m () ())


-- | NodeToClient responder application (server role)
--
Expand Down Expand Up @@ -837,6 +843,7 @@ runDataDiffusion tracers
(Governor.peerSelectionGovernor
dtTracePeerSelectionTracer
dtDebugPeerSelectionInitiatorTracer
dtTracePeerSelectionCounters
peerSelectionActions
(Diffusion.Policies.simplePeerSelectionPolicy policyRngVar))
$ \governorThread ->
Expand Down Expand Up @@ -945,6 +952,7 @@ runDataDiffusion tracers
(Governor.peerSelectionGovernor
dtTracePeerSelectionTracer
dtDebugPeerSelectionInitiatorResponderTracer
dtTracePeerSelectionCounters
peerSelectionActions
(Diffusion.Policies.simplePeerSelectionPolicy policyRngVar))
$ \governorThread -> do
Expand Down Expand Up @@ -1005,6 +1013,7 @@ runDataDiffusion tracers
, dtTracePeerSelectionTracer
, dtDebugPeerSelectionInitiatorTracer
, dtDebugPeerSelectionInitiatorResponderTracer
, dtTracePeerSelectionCounters
, dtPeerSelectionActionsTracer
, dtTraceLocalRootPeersTracer
, dtTracePublicRootPeersTracer
Expand All @@ -1024,7 +1033,7 @@ runDataDiffusion tracers
miniProtocolBundleInitiatorResponderMode =
combineMiniProtocolBundles miniProtocolBundleInitiatorMode
miniProtocolBundleResponderMode

-- node-to-node responder bundle; it is only used in combination with
-- the node-to-node initiator bundle defined below.
--
Expand Down Expand Up @@ -1217,9 +1226,9 @@ withLocalSocket iocp tracer localAddress k =
)
-- We close the socket here, even if it was provided to us.
(\case
Left (sn, sd) -> Snocket.close sn sd
Left (sn, sd) -> Snocket.close sn sd
Right (sn, sd, _) -> Snocket.close sn sd)
$ \case
$ \case
-- unconfigured socket
Right (sn, sd, addr) -> do
traceWith tracer . ConfiguringLocalSocket addr
Expand Down
Expand Up @@ -31,6 +31,7 @@ module Ouroboros.Network.PeerSelection.Governor (
sanePeerSelectionTargets,
establishedPeersStatus,
PeerSelectionState(..),
PeerSelectionCounters(..)
) where

import Data.Void (Void)
Expand All @@ -44,7 +45,7 @@ import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer (Tracer(..), traceWith)
import Control.Tracer (Tracer(..), traceWith, contramap)
import System.Random

import Ouroboros.Network.Diffusion.Policies (closeConnectionTimeout)
Expand Down Expand Up @@ -432,16 +433,20 @@ peerSelectionGovernor :: (MonadAsync m, MonadMask m, MonadTime m, MonadTimer m,
Ord peeraddr)
=> Tracer m (TracePeerSelection peeraddr)
-> Tracer m (DebugPeerSelection peeraddr peerconn)
-> Tracer m PeerSelectionCounters
-> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionPolicy peeraddr m
-> m Void
peerSelectionGovernor tracer debugTracer actions policy =
peerSelectionGovernor tracer debugTracer countersTracer actions policy =
JobPool.withJobPool $ \jobPool ->
peerSelectionGovernorLoop
tracer debugTracer
tracer (debugTracer <> contramap transform countersTracer)
actions policy
jobPool
emptyPeerSelectionState
where
transform :: Ord peeraddr => DebugPeerSelection peeraddr peerconn -> PeerSelectionCounters
transform (TraceGovernorState _ _ st) = peerStateToCounters st


-- | Our pattern here is a loop with two sets of guarded actions:
Expand Down
Expand Up @@ -254,6 +254,20 @@ data PeerSelectionState peeraddr peerconn = PeerSelectionState {
}
deriving (Show, Functor)

data PeerSelectionCounters = PeerSelectionCounters {
coldPeers :: !Int,
warmPeers :: !Int,
hotPeers :: !Int
} deriving Show

peerStateToCounters :: Ord peeraddr => PeerSelectionState peeraddr peerconn -> PeerSelectionCounters
peerStateToCounters st = PeerSelectionCounters { coldPeers, warmPeers, hotPeers }
where
knownPeersSet = KnownPeers.toSet (knownPeers st)
establishedPeersSet = EstablishedPeers.toSet (establishedPeers st)
coldPeers = Set.size $ knownPeersSet Set.\\ establishedPeersSet
warmPeers = Set.size $ establishedPeersSet Set.\\ activePeers st
hotPeers = Set.size $ activePeers st

emptyPeerSelectionState :: PeerSelectionState peeraddr peerconn
emptyPeerSelectionState =
Expand Down
Expand Up @@ -267,6 +267,7 @@ runGovernorInMockEnvironment mockEnv =
peerSelectionGovernor
tracerTracePeerSelection
tracerDebugPeerSelection
tracerTracePeerSelectionCounters
actions
policy

Expand Down Expand Up @@ -731,6 +732,7 @@ prop_governor_connstatus (GovernorMockEnvironmentWAD env) =

data TestTraceEvent = GovernorDebug (DebugPeerSelection PeerAddr ())
| GovernorEvent (TracePeerSelection PeerAddr)
| GovernorCounters PeerSelectionCounters
| MockEnvEvent TraceMockEnv
deriving Show

Expand All @@ -741,6 +743,9 @@ tracerDebugPeerSelection :: Tracer (IOSim s) (DebugPeerSelection PeerAddr peerco
tracerDebugPeerSelection = contramap (GovernorDebug . fmap (const ()))
tracerTestTraceEvent

tracerTracePeerSelectionCounters :: Tracer (IOSim s) PeerSelectionCounters
tracerTracePeerSelectionCounters = contramap GovernorCounters tracerTestTraceEvent

tracerMockEnv :: Tracer (IOSim s) TraceMockEnv
tracerMockEnv = contramap MockEnvEvent tracerTestTraceEvent

Expand Down Expand Up @@ -1180,7 +1185,7 @@ _governorFindingPublicRoots targetNumberOfRootPeers domains =
domains $ \requestPublicRootPeers ->

peerSelectionGovernor
tracer tracer
tracer tracer tracer
actions { requestPublicRootPeers }
policy
where
Expand Down

0 comments on commit 70a53b1

Please sign in to comment.