Skip to content

Commit

Permalink
p2p-governor: TimedDecisions
Browse files Browse the repository at this point in the history
Some decisions needs access to the current time rather than the time
when the governor loop blocked.  This patch is using `TimedDecision
~ Time -> Decision` to achieve this.  Completed async jobs are also
using current time instead of the `blockedAt` value.
  • Loading branch information
coot committed Jan 20, 2021
1 parent 758d484 commit 949cea5
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 43 deletions.
41 changes: 22 additions & 19 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs
Expand Up @@ -472,46 +472,49 @@ peerSelectionGovernorLoop tracer debugTracer actions policy jobPool =
where
loop :: PeerSelectionState peeraddr peerconn -> m Void
loop !st = assert (invariantPeerSelectionState st) $ do
now <- getMonotonicTime
let knownPeers' = KnownPeers.setCurrentTime now (knownPeers st)
blockedAt <- getMonotonicTime
let knownPeers' = KnownPeers.setCurrentTime blockedAt (knownPeers st)
st' = st { knownPeers = knownPeers' }

decision <- evalGuardedDecisions now st'
timedDecision <- evalGuardedDecisions blockedAt st'

let Decision { decisionTrace, decisionJobs, decisionState } = decision
-- get the current time after the governor returned from the blocking
-- 'evalGuardedDecisions' call.
now <- getMonotonicTime
let Decision { decisionTrace, decisionJobs, decisionState } = timedDecision now
traceWith tracer decisionTrace
mapM_ (JobPool.forkJob jobPool) decisionJobs
loop decisionState

evalGuardedDecisions :: Time
-> PeerSelectionState peeraddr peerconn
-> m (Decision m peeraddr peerconn)
evalGuardedDecisions now st =
case guardedDecisions now st of
-> m (TimedDecision m peeraddr peerconn)
evalGuardedDecisions blockedAt st =
case guardedDecisions blockedAt st of
GuardedSkip _ ->
-- impossible since guardedDecisions always has something to wait for
error "peerSelectionGovernorLoop: impossible: nothing to do"

Guarded Nothing decisionAction -> do
traceWith debugTracer (TraceGovernorState now Nothing st)
traceWith debugTracer (TraceGovernorState blockedAt Nothing st)
atomically decisionAction

Guarded (Just (Min wakeupAt)) decisionAction -> do
let wakeupIn = diffTime wakeupAt now
traceWith debugTracer (TraceGovernorState now (Just wakeupIn) st)
let wakeupIn = diffTime wakeupAt blockedAt
traceWith debugTracer (TraceGovernorState blockedAt (Just wakeupIn) st)
wakupTimeout <- newTimeout wakeupIn
let wakeup = awaitTimeout wakupTimeout >> pure (wakeupDecision st)
decision <- atomically (decisionAction <|> wakeup)
timedDecision <- atomically (decisionAction <|> wakeup)
cancelTimeout wakupTimeout
return decision
return timedDecision

guardedDecisions :: Time
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
guardedDecisions now st =
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
guardedDecisions blockedAt st =
-- All the alternative non-blocking internal decisions.
RootPeers.belowTarget actions now st
<> KnownPeers.belowTarget actions now policy st
RootPeers.belowTarget actions blockedAt st
<> KnownPeers.belowTarget actions policy st
<> KnownPeers.aboveTarget policy st
<> EstablishedPeers.belowTarget actions policy st
<> EstablishedPeers.aboveTarget actions policy st
Expand All @@ -521,7 +524,7 @@ peerSelectionGovernorLoop tracer debugTracer actions policy jobPool =
-- All the alternative potentially-blocking decisions.
<> Monitor.targetPeers actions st
<> Monitor.localRoots actions st
<> Monitor.jobs jobPool st now
<> Monitor.jobs jobPool st
<> Monitor.connections actions st

-- There is no rootPeersAboveTarget since the roots target is one sided.
Expand All @@ -536,8 +539,8 @@ peerSelectionGovernorLoop tracer debugTracer actions policy jobPool =


wakeupDecision :: PeerSelectionState peeraddr peerconn
-> Decision m peeraddr peerconn
wakeupDecision st =
-> TimedDecision m peeraddr peerconn
wakeupDecision st _now =
Decision {
decisionTrace = TraceGovernorWakeup,
decisionState = st,
Expand Down
Expand Up @@ -73,7 +73,7 @@ belowTarget actions
let selectedToPromote' :: Map peeraddr peerconn
selectedToPromote' = establishedPeers
`Map.restrictKeys` selectedToPromote
return Decision {
return $ \_now -> Decision {
decisionTrace = TracePromoteWarmPeers
targetNumberOfActivePeers
numActivePeers
Expand Down Expand Up @@ -187,7 +187,7 @@ aboveTarget actions
selectedToDemote' = establishedPeers
`Map.restrictKeys` selectedToDemote

return Decision {
return $ \_now -> Decision {
decisionTrace = TraceDemoteHotPeers
targetNumberOfActivePeers
numActivePeers
Expand Down
Expand Up @@ -76,7 +76,7 @@ belowTarget actions
policyPickColdPeersToPromote
availableToPromote
numPeersToPromote
return Decision {
return $ \_now -> Decision {
decisionTrace = TracePromoteColdPeers
targetNumberOfEstablishedPeers
numEstablishedPeers
Expand Down Expand Up @@ -225,7 +225,7 @@ aboveTarget actions
selectedToDemote' = establishedPeers
`Map.restrictKeys` selectedToDemote

return Decision {
return $ \_now -> Decision {
decisionTrace = TraceDemoteWarmPeers
targetNumberOfEstablishedPeers
numEstablishedPeers
Expand Down
Expand Up @@ -35,9 +35,8 @@ import Ouroboros.Network.PeerSelection.Governor.Types
--
belowTarget :: (MonadAsync m, MonadTimer m, Ord peeraddr)
=> PeerSelectionActions peeraddr peerconn m
-> Time
-> MkGuardedDecision peeraddr peerconn m
belowTarget actions now
belowTarget actions
policy@PeerSelectionPolicy {
policyMaxInProgressGossipReqs,
policyPickKnownPeersForGossip,
Expand Down Expand Up @@ -66,7 +65,7 @@ belowTarget actions now
`Map.restrictKeys` availableForGossip)
numGossipReqsPossible
let numGossipReqs = Set.size selectedForGossip
return Decision {
return $ \now -> Decision {
decisionTrace = TraceGossipRequests
targetNumberOfKnownPeers
numKnownPeers
Expand Down Expand Up @@ -292,7 +291,7 @@ aboveTarget PeerSelectionPolicy {
policyPickColdPeersToForget
availableToForget
numPeersToForget
return Decision {
return $ \_now -> Decision {
decisionTrace = TraceForgetColdPeers
targetNumberOfKnownPeers
numKnownPeers
Expand Down
Expand Up @@ -22,7 +22,6 @@ import Data.Set (Set)
import Control.Concurrent.JobPool (JobPool)
import qualified Control.Concurrent.JobPool as JobPool
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadTime
import Control.Exception (assert)

import qualified Ouroboros.Network.PeerSelection.KnownPeers as KnownPeers
Expand All @@ -38,7 +37,7 @@ import Ouroboros.Network.PeerSelection.Governor.ActivePeers (jobDemote
targetPeers :: MonadSTM m
=> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
targetPeers PeerSelectionActions{readPeerSelectionTargets}
st@PeerSelectionState{
localRootPeers,
Expand All @@ -54,7 +53,7 @@ targetPeers PeerSelectionActions{readPeerSelectionTargets}
let localRootPeers' = Map.take (targetNumberOfKnownPeers targets')
localRootPeers

return Decision {
return $ \_now -> Decision {
decisionTrace = TraceTargetsChanged targets targets',
decisionJobs = [],
decisionState = assert (sanePeerSelectionTargets targets')
Expand All @@ -70,14 +69,13 @@ targetPeers PeerSelectionActions{readPeerSelectionTargets}
jobs :: MonadSTM m
=> JobPool m (Completion m peeraddr peerconn)
-> PeerSelectionState peeraddr peerconn
-> Time
-> Guarded (STM m) (Decision m peeraddr peerconn)
jobs jobPool st now =
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
jobs jobPool st =
-- This case is simple because the job pool returns a 'Completion' which is
-- just a function from the current state to a new 'Decision'.
Guarded Nothing $ do
Completion completion <- JobPool.collect jobPool
return $! completion st now
return (completion st)


-- | Monitor connections.
Expand All @@ -86,7 +84,7 @@ connections :: forall m peeraddr peerconn.
(MonadSTM m, Ord peeraddr)
=> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
connections PeerSelectionActions{peerStateActions = PeerStateActions {monitorPeerConnection}}
st@PeerSelectionState {
activePeers,
Expand All @@ -101,7 +99,7 @@ connections PeerSelectionActions{peerStateActions = PeerStateActions {monitorPee
establishedStatus'
check (not (Map.null demotions))
let (demotedToWarm, demotedToCold) = Map.partition (==PeerWarm) demotions
return Decision {
return $ \_now -> Decision {
decisionTrace = TraceDemoteAsynchronous demotions,
decisionJobs = [],
decisionState = st {
Expand Down Expand Up @@ -152,7 +150,7 @@ localRoots :: forall peeraddr peerconn m.
(MonadSTM m, Ord peeraddr)
=> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
localRoots actions@PeerSelectionActions{readLocalRootPeers}
st@PeerSelectionState{
localRootPeers,
Expand Down Expand Up @@ -205,7 +203,7 @@ localRoots actions@PeerSelectionActions{readLocalRootPeers}
selectedToDemote = activePeers `Set.intersection` removedSet
selectedToDemote' = establishedPeers
`Map.restrictKeys` selectedToDemote
return Decision {
return $ \_now -> Decision {
decisionTrace = TraceLocalRootPeersChanged localRootPeers
localRootPeers',
decisionState = st {
Expand Down
Expand Up @@ -27,9 +27,9 @@ belowTarget :: (MonadSTM m, Ord peeraddr)
=> PeerSelectionActions peeraddr peerconn m
-> Time
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
belowTarget actions
now
blockedAt
st@PeerSelectionState {
localRootPeers,
publicRootPeers,
Expand All @@ -46,9 +46,9 @@ belowTarget actions
, not inProgressPublicRootsReq

-- We limit how frequently we make requests, are we allowed to do it yet?
, now >= publicRootRetryTime
, blockedAt >= publicRootRetryTime
= Guarded Nothing $
return Decision {
return $ \_now -> Decision {
decisionTrace = TracePublicRootsRequest
targetNumberOfRootPeers
numRootPeers,
Expand Down
Expand Up @@ -394,14 +394,18 @@ data Decision m peeraddr peerconn = Decision {
decisionJobs :: [Job m (Completion m peeraddr peerconn)]
}

-- | Decision which has access to the current time, rather than the time when
-- the governor's loop blocked to make a decision.
--
type TimedDecision m peeraddr peerconn = Time -> Decision m peeraddr peerconn

-- | Type alias for function types which are used to create governor decisions.
-- Allmost all decisions are following this pattern.
--
type MkGuardedDecision peeraddr peerconn m
= PeerSelectionPolicy peeraddr m
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)


newtype Completion m peeraddr peerconn =
Expand Down

0 comments on commit 949cea5

Please sign in to comment.