Skip to content

Commit

Permalink
inbound-governor: added hot and warm peer metric
Browse files Browse the repository at this point in the history
Track warm → hot transition in the inbound governor, which allows to
track remote hot peers, fcilities to track warm peers where already
there.

For the sake of this metric anode is a hot peer for the remote end if
all established and hot mini-protocols are running, otherwise it is
a considered warm (if it is connected).

Added exports to Diffusion.hs
  • Loading branch information
coot committed Oct 14, 2021
1 parent 49edbd0 commit 83cc22f
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 63 deletions.
141 changes: 93 additions & 48 deletions ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs
Expand Up @@ -62,6 +62,7 @@ import Ouroboros.Network.InboundGovernor.ControlChannel (ServerControl
import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel



-- | Run the server, which consists of the following components:
--
-- * /inbound governor/, it corresponds to p2p-governor on outbound side
Expand Down Expand Up @@ -109,11 +110,15 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
:: InboundGovernorState muxMode peerAddr m a b
-> m Void
inboundGovernorLoop !state = do
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state)

event
<- atomically $
(uncurry MuxFinished <$> firstMuxToFinish state)
<|> (MiniProtocolTerminated <$> firstMiniProtocolToFinish state)
<|> (AwakeRemote <$> firstPeerPromotedToWarm state)
<|> (RemotePromotedToHot <$> firstPeerPromotedToHot state)
<|> firstPeerDemotedToCold state
<|> (NewConnection <$> ControlChannel.readMessage
serverControlChannel)
Expand All @@ -130,46 +135,58 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout

traceWith tracer (TrNewConnection provenance connId)

-- update state and continue the recursive loop
(\igsConnections -> inboundGovernorLoop state { igsConnections })
=<< Map.alterF
igsConnections <- Map.alterF
(\case
-- connection
Nothing -> do
let csMiniProtocolMap =
foldr
(\miniProtocol ->
Map.insert (miniProtocolNum miniProtocol)
miniProtocol)
Map.empty
(concat muxBundle)
let csMPMHot =
[ ( miniProtocolNum mpH
, MiniProtocolData mpH Hot
)
| mpH <- projectBundle TokHot muxBundle
]
csMPMWarm =
[ ( miniProtocolNum mpW
, MiniProtocolData mpW Warm
)
| mpW <- projectBundle TokWarm muxBundle
]
csMPMEstablished =
[ ( miniProtocolNum mpE
, MiniProtocolData mpE Established
)
| mpE <- projectBundle TokEstablished muxBundle
]
csMiniProtocolMap =
Map.fromList
(csMPMHot ++ csMPMWarm ++ csMPMEstablished)

mCompletionMap
<-
foldM
(\acc miniProtocol -> do
(\acc MiniProtocolData { mpdMiniProtocol } -> do
result <- runResponder
csMux miniProtocol
csMux mpdMiniProtocol
Mux.StartOnDemand
case result of
-- synchronous exceptions when starting
-- a mini-protocol are non-recoverable; we
-- close the connection and allow the server
-- to continue.
Left err -> do
traceWith tracer (TrResponderStartFailure connId (miniProtocolNum miniProtocol) err)
traceWith tracer (TrResponderStartFailure connId (miniProtocolNum mpdMiniProtocol) err)
Mux.stopMux csMux
return Nothing

Right completion -> do
let acc' = Map.insert (miniProtocolNum miniProtocol)
let acc' = Map.insert (miniProtocolNum mpdMiniProtocol)
completion
<$> acc
-- force under lazy 'Maybe'
case acc' of
Just !_ -> return acc'
Nothing -> return acc'
)
)
(Just Map.empty)
csMiniProtocolMap

Expand Down Expand Up @@ -208,49 +225,69 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
connId
(igsConnections state)


-- update state and continue the recursive loop
let state' = state { igsConnections }
inboundGovernorLoop state'

MuxFinished connId merr -> do

case merr of
Nothing -> traceWith tracer (TrMuxCleanExit connId)
Just err -> traceWith tracer (TrMuxErrored connId err)

-- the connection manager does should realise this on itself.
inboundGovernorLoop (unregisterConnection connId state)
let state' = unregisterConnection connId state
inboundGovernorLoop state'

MiniProtocolTerminated
Terminated {
tConnId,
tMux,
tMiniProtocol,
tMiniProtocolData = MiniProtocolData { mpdMiniProtocol,
mpdMiniProtocolTemp
},
tResult
} ->
let num = miniProtocolNum tMiniProtocol in
let num = miniProtocolNum mpdMiniProtocol in
case tResult of
Left e -> do
-- a mini-protocol errored. In this case mux will shutdown, and
-- the connection manager will tear down the socket. We can just
-- forget the connection from 'InboundGovernorState'.
traceWith tracer $
TrResponderErrored tConnId num e
inboundGovernorLoop
(unregisterConnection
tConnId
state)

let state' = unregisterConnection tConnId state
inboundGovernorLoop state'

Right _ -> do
result
<- runResponder tMux tMiniProtocol Mux.StartOnDemand
<- runResponder tMux mpdMiniProtocol Mux.StartOnDemand
case result of
Right completionAction -> do
traceWith tracer (TrResponderRestarted tConnId num)
inboundGovernorLoop
(updateMiniProtocol tConnId num completionAction state)

let isHot = mpdMiniProtocolTemp == Hot
state' = ( if isHot
then updateRemoteState tConnId RemoteWarm
else id
)
. updateMiniProtocol tConnId num completionAction
$ state

inboundGovernorLoop state'

Left err -> do
-- there is no way to recover from synchronous exceptions; we
-- stop mux which allows to close resources held by
-- connection manager.
traceWith tracer (TrResponderStartFailure tConnId num err)
Mux.stopMux tMux
inboundGovernorLoop
(unregisterConnection tConnId state)

let state' = unregisterConnection tConnId state
inboundGovernorLoop state'


WaitIdleRemote connId -> do
-- @
Expand All @@ -263,10 +300,9 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
v <- registerDelay protocolIdleTimeout
let timeoutSTM :: STM m ()
!timeoutSTM = LazySTM.readTVar v >>= check
inboundGovernorLoop
(updateRemoteState connId
(RemoteIdle timeoutSTM)
state)

let state' = updateRemoteState connId (RemoteIdle timeoutSTM) state
inboundGovernorLoop state'

-- @
-- PromotedToWarm^{Duplex}_{Remote}
Expand All @@ -280,47 +316,53 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
res <- promotedToWarmRemote connectionManager
(remoteAddress connId)
traceWith tracer (TrPromotedToWarmRemote connId res)
assert (resultInState res /= UnknownConnectionSt) $
inboundGovernorLoop
(updateRemoteState connId
RemoteEstablished
state)
assert (resultInState res /= UnknownConnectionSt) $ do
let state' = updateRemoteState
connId
RemoteWarm
state
inboundGovernorLoop state'

RemotePromotedToHot connId -> do
traceWith tracer (TrPromotedToHotRemote connId)
let state' = updateRemoteState connId RemoteHot state
inboundGovernorLoop state'

CommitRemote connId -> do
res <- unregisterInboundConnection connectionManager
(remoteAddress connId)
traceWith tracer $ TrDemotedToColdRemote connId res
case res of
UnsupportedState {} ->
UnsupportedState {} -> do
-- 'inState' can be either:
-- @'UnknownConnection'@,
-- @'InReservedOutboundState'@,
-- @'InUnnegotiatedState',
-- @'InOutboundState' 'Unidirectional'@,
-- @'InTerminatingState'@,
-- @'InTermiantedState'@.
inboundGovernorLoop
(unregisterConnection connId state)
let state' = unregisterConnection connId state
inboundGovernorLoop state'

OperationSuccess transition ->
case transition of
-- the following two cases are when the connection was not used
-- by p2p-governor, the connection will be closed.
CommitTr ->
CommitTr -> do
-- @
-- Commit^{dataFlow}_{Remote} : InboundIdleState dataFlow
-- → TerminatingState
-- @
inboundGovernorLoop
(unregisterConnection connId state)
let state' = unregisterConnection connId state
inboundGovernorLoop state'

DemotedToColdRemoteTr ->
DemotedToColdRemoteTr -> do
-- @
-- Commit^{dataFlow}_{Remote} : InboundIdleState dataFlow
-- → TerminatingState
-- @
inboundGovernorLoop
(unregisterConnection connId state)
let state' = unregisterConnection connId state
inboundGovernorLoop state'

-- the connection is still used by p2p-governor, carry on but put
-- it in 'RemoteCold' state. This will ensure we keep ready to
Expand All @@ -339,9 +381,9 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
-- edge triggered. The server state is updated once protocol
-- idleness expires rather than as soon as the connection
-- manager was requested outbound connection.
KeepTr ->
inboundGovernorLoop
(updateRemoteState connId RemoteCold state)
KeepTr -> do
let state' = updateRemoteState connId RemoteCold state
inboundGovernorLoop state'


-- | Run a responder mini-protocol.
Expand Down Expand Up @@ -435,12 +477,15 @@ data InboundGovernorTrace peerAddr
| TrResponderRestarted !(ConnectionId peerAddr) !MiniProtocolNum
| TrResponderStartFailure !(ConnectionId peerAddr) !MiniProtocolNum !SomeException
| TrResponderErrored !(ConnectionId peerAddr) !MiniProtocolNum !SomeException
| TrResponderStarted !(ConnectionId peerAddr) !MiniProtocolNum
| TrResponderTerminated !(ConnectionId peerAddr) !MiniProtocolNum
| TrPromotedToWarmRemote !(ConnectionId peerAddr) !(OperationResult AbstractState)
| TrPromotedToHotRemote !(ConnectionId peerAddr)
| TrDemotedToColdRemote !(ConnectionId peerAddr) !(OperationResult DemotedToColdRemoteTr)
-- ^ All mini-protocols terminated. The boolean is true if this connection
-- was not used by p2p-governor, and thus the connection will be terminated.
| TrWaitIdleRemote !(ConnectionId peerAddr) !(OperationResult AbstractState)
| TrMuxCleanExit !(ConnectionId peerAddr)
| TrMuxErrored !(ConnectionId peerAddr) SomeException
| TrInboundGovernorCounters !InboundGovernorCounters
deriving Show

0 comments on commit 83cc22f

Please sign in to comment.