Skip to content

Commit

Permalink
inbound-governor: detect remote warm → hot transition
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed May 12, 2021
1 parent c36ede5 commit 210c3c8
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 158 deletions.
201 changes: 72 additions & 129 deletions ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs
Expand Up @@ -32,7 +32,7 @@ module Ouroboros.Network.InboundGovernor

import Control.Exception (SomeAsyncException (..), assert)
import Control.Applicative (Alternative (..), (<|>))
import Control.Monad (foldM)
import Control.Monad (foldM, when)
import Control.Monad.Class.MonadAsync
import qualified Control.Monad.Class.MonadSTM as LazySTM
import Control.Monad.Class.MonadSTM.Strict
Expand All @@ -44,11 +44,9 @@ import Control.Tracer (Tracer, traceWith)
import Data.ByteString.Lazy (ByteString)
import Data.Void (Void)
import Data.List (sortOn)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified System.Random as Rnd
import Data.Bool (bool)

import qualified Network.Mux as Mux

Expand All @@ -62,7 +60,7 @@ import Ouroboros.Network.InboundGovernor.Event
import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.InboundGovernor.ControlChannel (ServerControlChannel)
import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel
import Network.Mux.Types (MiniProtocolStatus (..))



-- | Run the server, which consists of the following components:
Expand Down Expand Up @@ -98,119 +96,27 @@ inboundGovernor :: forall (muxMode :: MuxMode) socket peerAddr versionNumber m a
inboundGovernor tracer serverControlChannel protocolIdleTimeout
connectionManager observableStateVar = do
let state = InboundGovernorState {
igsConnections = Map.empty,
igsObservableVar = observableStateVar,
igsRemoteTemperatures = Map.empty,
igsCounters = InboundGovernorCounters 0 0 0
igsConnections = Map.empty,
igsObservableVar = observableStateVar
}
inboundGovernorLoop state
where
localCounterWithTemp
:: ConnectionId peerAddr
-> Map (ConnectionId peerAddr) ProtocolTemperature
-> Maybe ProtocolTemperature
-> InboundGovernorCounters
localCounterWithTemp cid m t =
case (Map.lookup cid m, t) of
-- ( Previous remote temperature, Current remote temperature)
(Nothing , Nothing) -> InboundGovernorCounters 0 0 0
(Nothing , Just Established) -> InboundGovernorCounters 1 0 0
(Nothing , Just Warm) -> InboundGovernorCounters 0 1 0
(Nothing , Just Hot) -> InboundGovernorCounters 0 0 1

(Just Established , Nothing) -> InboundGovernorCounters (-1) 0 0
(Just Established , Just Established) -> InboundGovernorCounters 0 0 0
(Just Established , Just Warm) -> InboundGovernorCounters (-1) 1 0
(Just Established , Just Hot) -> InboundGovernorCounters (-1) 0 1

(Just Warm , Nothing) -> InboundGovernorCounters 0 (-1) 0
(Just Warm , Just Established) -> InboundGovernorCounters 1 (-1) 0
(Just Warm , Just Warm) -> InboundGovernorCounters 0 0 0
(Just Warm , Just Hot) -> InboundGovernorCounters 0 (-1) 1

(Just Hot , Nothing) -> InboundGovernorCounters 0 0 (-1)
(Just Hot , Just Established) -> InboundGovernorCounters 1 0 (-1)
(Just Hot , Just Warm) -> InboundGovernorCounters 0 1 (-1)
(Just Hot , Just Hot) -> InboundGovernorCounters 0 0 0

connectionIdToCounter
:: ConnectionId peerAddr
-> InboundGovernorState muxMode peerAddr m a b
-> m (InboundGovernorCounters, Maybe ProtocolTemperature)
connectionIdToCounter connId InboundGovernorState
{ igsConnections
, igsRemoteTemperatures
, igsCounters } = do
(establishedProtocolsStatus, warmProtocolsStatus, hotProtocolsStatus) <-
atomically $
(,,) <$> getTempProtocolsStatus connId Established igsConnections
<*> getTempProtocolsStatus connId Warm igsConnections
<*> getTempProtocolsStatus connId Hot igsConnections

let
ePeerRemote :: Int
ePeerRemote = bool 0 1 $ all (== StatusRunning) establishedProtocolsStatus
wPeerRemote :: Int
wPeerRemote = bool 0 1 $ all (== StatusRunning) warmProtocolsStatus
hPeerRemote :: Int
hPeerRemote = bool 0 1 $ all (== StatusRunning) hotProtocolsStatus
total = ePeerRemote + wPeerRemote + hPeerRemote

return $
case Map.lookup connId igsConnections of
Nothing -> ( igsCounters <> localCounterWithTemp connId igsRemoteTemperatures Nothing
, Nothing )
Just _ ->
-- Either there should be only one state or none
assert (total == 1 || total == 0) $
let maybeCurrentTemp =
case (ePeerRemote, wPeerRemote, hPeerRemote) of
(0, 0, 0) -> Nothing
(1, 0, 0) -> Just Established
(0, 1, 0) -> Just Warm
(0, 0, 1) -> Just Hot
x -> error ("impossible happened! " ++ show x)
in ( igsCounters
<> localCounterWithTemp connId igsRemoteTemperatures maybeCurrentTemp
, maybeCurrentTemp )
where
getTempProtocolsStatus :: ConnectionId peerAddr
-> ProtocolTemperature
-> Map (ConnectionId peerAddr) (ConnectionState mode peerAddr m a b)
-> STM m [MiniProtocolStatus]
getTempProtocolsStatus cid temp m =
case Map.lookup cid m of
Nothing -> return [StatusIdle]
Just cs -> sequence $
let tempProtocols = Map.keys . Map.filter ((== temp) . mpdMiniProtocolTemp)
in Map.elems
. Map.filterWithKey (\(num, _) _ -> num `elem` tempProtocols (csMiniProtocolMap cs) )
. Mux.miniProtocolStateMap
. csMux
$ cs

updateState :: InboundGovernorState muxMode peerAddr m a b
-> ConnectionId peerAddr
-> m (InboundGovernorState muxMode peerAddr m a b)
updateState newState connId = do
(igsCounters, mCurTemp) <- connectionIdToCounter connId newState
let newRemoteConnsMap = Map.alter (const mCurTemp) connId (igsRemoteTemperatures newState)
return $ newState { igsCounters , igsRemoteTemperatures = newRemoteConnsMap }

-- The inbound protocol governor recursive loop. The 'igsConnections' is
-- updated as we recurs.
--
inboundGovernorLoop
:: InboundGovernorState muxMode peerAddr m a b
-> m Void
inboundGovernorLoop !state = do
traceWith tracer (TrInboundGovernorCounters (igsCounters state))
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state)

event
<- atomically $
(uncurry MuxFinished <$> firstMuxToFinish state)
<|> (MiniProtocolTerminated <$> firstMiniProtocolToFinish state)
<|> (AwakeRemote <$> firstPeerPromotedToWarm state)
<|> (RemotePromotedToHot <$> firstPeerPromotedToHot state)
<|> firstPeerDemotedToCold state
<|> (NewConnection <$> ControlChannel.readMessage
serverControlChannel)
Expand Down Expand Up @@ -278,7 +184,7 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
case acc' of
Just !_ -> return acc'
Nothing -> return acc'
)
)
(Just Map.empty)
csMiniProtocolMap

Expand Down Expand Up @@ -319,8 +225,8 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout


-- update state and continue the recursive loop
newState <- updateState (state { igsConnections }) connId
inboundGovernorLoop newState
let state' = state { igsConnections }
inboundGovernorLoop state'

MuxFinished connId merr -> do

Expand All @@ -329,14 +235,18 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
Just err -> traceWith tracer (TrMuxErrored connId err)

-- the connection manager does should realise this on itself.
newState <- updateState (unregisterConnection connId state) connId
inboundGovernorLoop newState
let state' = unregisterConnection connId state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'

MiniProtocolTerminated
Terminated {
tConnId,
tMux,
tMiniProtocolData = MiniProtocolData { mpdMiniProtocol },
tMiniProtocolData = MiniProtocolData { mpdMiniProtocol,
mpdMiniProtocolTemp
},
tResult
} ->
let num = miniProtocolNum mpdMiniProtocol in
Expand All @@ -348,8 +258,10 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
traceWith tracer $
TrResponderErrored tConnId num e

newState <- updateState (unregisterConnection tConnId state) tConnId
inboundGovernorLoop newState
let state' = unregisterConnection tConnId state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'

Right _ -> do
result
Expand All @@ -358,9 +270,18 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
Right completionAction -> do
traceWith tracer (TrResponderRestarted tConnId num)

newState <- updateState (updateMiniProtocol tConnId num completionAction state)
tConnId
inboundGovernorLoop newState
let isHot = mpdMiniProtocolTemp == Hot
state' = ( if isHot
then updateRemoteState tConnId RemoteWarm
else id
)
. updateMiniProtocol tConnId num completionAction
$ state

when isHot $
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'

Left err -> do
-- there is no way to recover from synchronous exceptions; we
Expand All @@ -369,8 +290,11 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
traceWith tracer (TrResponderStartFailure tConnId num err)
Mux.stopMux tMux

newState <- updateState (unregisterConnection tConnId state) tConnId
inboundGovernorLoop newState
let state' = unregisterConnection tConnId state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'


WaitIdleRemote connId -> do
-- @
Expand All @@ -384,9 +308,8 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
let timeoutSTM :: STM m ()
!timeoutSTM = LazySTM.readTVar v >>= check

newState <- updateState (updateRemoteState connId (RemoteIdle timeoutSTM) state)
connId
inboundGovernorLoop newState
let state' = updateRemoteState connId (RemoteIdle timeoutSTM) state
inboundGovernorLoop state'

-- @
-- PromotedToWarm^{Duplex}_{Remote}
Expand All @@ -401,9 +324,20 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
(remoteAddress connId)
traceWith tracer (TrPromotedToWarmRemote connId res)
assert (resultInState res /= UnknownConnectionSt) $ do
newState <- updateState (updateRemoteState connId RemoteEstablished state)
connId
inboundGovernorLoop newState
let state' = updateRemoteState
connId
RemoteWarm
state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'

RemotePromotedToHot connId -> do
traceWith tracer (TrPromotedToHotRemote connId)
let state' = updateRemoteState connId RemoteHot state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'

CommitRemote connId -> do
res <- unregisterInboundConnection connectionManager
Expand All @@ -418,8 +352,10 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
-- @'InOutboundState' 'Unidirectional'@,
-- @'InTerminatingState'@,
-- @'InTermiantedState'@.
newState <- updateState (unregisterConnection connId state) connId
inboundGovernorLoop newState
let state' = unregisterConnection connId state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'

OperationSuccess transition ->
case transition of
Expand All @@ -430,16 +366,20 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
-- Commit^{dataFlow}_{Remote} : InboundIdleState dataFlow
-- → TerminatingState
-- @
newState <- updateState (unregisterConnection connId state) connId
inboundGovernorLoop newState
let state' = unregisterConnection connId state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'

DemotedToColdRemoteTr -> do
-- @
-- Commit^{dataFlow}_{Remote} : InboundIdleState dataFlow
-- → TerminatingState
-- @
newState <- updateState (unregisterConnection connId state) connId
inboundGovernorLoop newState
let state' = unregisterConnection connId state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'

-- the connection is still used by p2p-governor, carry on but put
-- it in 'RemoteCold' state. This will ensure we keep ready to
Expand All @@ -459,9 +399,10 @@ inboundGovernor tracer serverControlChannel protocolIdleTimeout
-- idleness expires rather than as soon as the connection
-- manager was requested outbound connection.
KeepTr -> do
newState <- updateState (updateRemoteState connId RemoteCold state)
connId
inboundGovernorLoop newState
let state' = updateRemoteState connId RemoteCold state
traceWith tracer ( TrInboundGovernorCounters
$ inboundGovernorCounters state')
inboundGovernorLoop state'


-- | Run a responder mini-protocol.
Expand Down Expand Up @@ -554,8 +495,10 @@ data InboundGovernorTrace peerAddr
| TrResponderRestarted !(ConnectionId peerAddr) !MiniProtocolNum
| TrResponderStartFailure !(ConnectionId peerAddr) !MiniProtocolNum !SomeException
| TrResponderErrored !(ConnectionId peerAddr) !MiniProtocolNum !SomeException
| TrResponderStarted !(ConnectionId peerAddr) !MiniProtocolNum
| TrResponderTerminated !(ConnectionId peerAddr) !MiniProtocolNum
| TrPromotedToWarmRemote !(ConnectionId peerAddr) !(OperationResult AbstractState)
| TrPromotedToHotRemote !(ConnectionId peerAddr)
| TrDemotedToColdRemote !(ConnectionId peerAddr) !(OperationResult DemotedToColdRemoteTr)
-- ^ All mini-protocols terminated. The boolean is true if this connection
-- was not used by p2p-governor, and thus the connection will be terminated.
Expand Down

0 comments on commit 210c3c8

Please sign in to comment.