Skip to content

Commit

Permalink
Restrict updating CDF counters after restart.
Browse files Browse the repository at this point in the history
After restart the block delayes doesn't reflect the node's network
condition. This change waits until the node sees a block with a delay
less than 20s. Then we collect 45 samples (about 15 minutes) until
we start to provide values for the CDF counters.
  • Loading branch information
karknu committed Nov 30, 2021
1 parent 3bba19a commit 1cbd1e7
Showing 1 changed file with 71 additions and 40 deletions.
111 changes: 71 additions & 40 deletions cardano-node/src/Cardano/Tracing/Tracers.hs
Expand Up @@ -771,18 +771,47 @@ incCdfCounter v size = cdfCounter v size 1
decCdfCounter :: Ord a => Num a => KnownNat l => a -> Int -> STM.TVar (CdfCounter l) -> STM Double
decCdfCounter v size = cdfCounter v size (-1)

traceBlockFetchClientMetrics
:: forall blk remotePeer.
( )
=> Maybe EKGDirect
-> STM.TVar (IntPSQ SlotNo NominalDiffTime)

-- Track the fraction of times forgeDelay was above 1s, 3s, and 5s.
-- Only the first sample per slot number is counted.
cdf135Counters
:: Integral a
=> STM.TVar (IntPSQ a NominalDiffTime)
-> STM.TVar (CdfCounter 1)
-> STM.TVar (CdfCounter 3)
-> STM.TVar (CdfCounter 5)
-> Tracer IO (TraceLabelPeer remotePeer (TraceFetchClientState (Header blk)))
-> Tracer IO (TraceLabelPeer remotePeer (TraceFetchClientState (Header blk)))
traceBlockFetchClientMetrics Nothing _ _ _ _ tracer = tracer
traceBlockFetchClientMetrics (Just ekgDirect) slotMapVar cdf1sVar cdf3sVar cdf5sVar tracer = Tracer bfTracer
-> a
-> NominalDiffTime
-> STM (Bool, Double, Double, Double)
cdf135Counters slotMapVar cdf1sVar cdf3sVar cdf5sVar slotNo forgeDelay = do
slotMap <- STM.readTVar slotMapVar
if Pq.null slotMap && forgeDelay > 20
then return (False, 0, 0, 0) -- During startup wait until we are in sync
else case Pq.lookup (fromIntegral slotNo) slotMap of
Nothing -> do
let slotMap' = Pq.insert (fromIntegral slotNo) slotNo forgeDelay slotMap
if Pq.size slotMap' > 1080 -- TODO k/2, should come from config file
then
case Pq.minView slotMap' of
Nothing -> return (False, 0, 0, 0) -- Err. We just inserted an element!
Just (_, minSlotNo, minDelay, slotMap'') ->
if minSlotNo == slotNo
then return (False, 0, 0, 0) -- Nothing to do
else do
decCdfs minDelay (Pq.size slotMap'')
(cdf1s, cdf3s, cdf5s) <- incCdfs forgeDelay (Pq.size slotMap'')
STM.writeTVar slotMapVar slotMap''
return (True, cdf1s, cdf3s, cdf5s)
else do
(cdf1s, cdf3s, cdf5s) <- incCdfs forgeDelay (Pq.size slotMap')
STM.writeTVar slotMapVar slotMap'
-- Wait until we have at least 45 samples before we start providing
-- cdf estimates.
if Pq.size slotMap >= 45
then return (True, cdf1s, cdf3s, cdf5s)
else return (True, -1, -1, -1)

Just _ -> return (False, 0, 0, 0) -- dupe, we only track the first

where
incCdfs :: NominalDiffTime -> Int -> STM (Double, Double, Double)
Expand All @@ -799,56 +828,58 @@ traceBlockFetchClientMetrics (Just ekgDirect) slotMapVar cdf1sVar cdf3sVar cdf5s
>> decCdfCounter delay size cdf5sVar
>> return ()

traceBlockFetchClientMetrics
:: forall blk remotePeer.
( )
=> Maybe EKGDirect
-> STM.TVar (IntPSQ Word64 NominalDiffTime)
-> STM.TVar (CdfCounter 1)
-> STM.TVar (CdfCounter 3)
-> STM.TVar (CdfCounter 5)
-> Tracer IO (TraceLabelPeer remotePeer (TraceFetchClientState (Header blk)))
-> Tracer IO (TraceLabelPeer remotePeer (TraceFetchClientState (Header blk)))
traceBlockFetchClientMetrics Nothing _ _ _ _ tracer = tracer
traceBlockFetchClientMetrics (Just ekgDirect) slotMapVar cdf1sVar cdf3sVar cdf5sVar tracer = Tracer bfTracer

where
bfTracer :: TraceLabelPeer remotePeer (TraceFetchClientState (Header blk)) -> IO ()
bfTracer e@(TraceLabelPeer _ (CompletedBlockFetch p _ _ _ delay blockSize)) = do
traceWith tracer e
case pointSlot p of
Origin -> return () -- Nothing to do.
At slotNo -> do
(fresh, cdf1s, cdf3s, cdf5s) <- atomically $ do
slotMap <- STM.readTVar slotMapVar
case Pq.lookup (slotMapKey slotNo) slotMap of
Nothing -> do
let slotMap' = Pq.insert (slotMapKey slotNo) slotNo delay slotMap
if Pq.size slotMap' > 1080 -- TODO k/2, should come from config file
then
case Pq.minView slotMap' of
Nothing -> return (False, 0, 0, 0) -- Err. We just inserted an element!
Just (_, minSlotNo, minDelay, slotMap'') ->
if minSlotNo == slotNo
then return (False, 0, 0, 0) -- Nothing to do
else do
decCdfs minDelay (Pq.size slotMap'')
(cdf1s, cdf3s, cdf5s) <- incCdfs delay (Pq.size slotMap'')
STM.writeTVar slotMapVar slotMap''
return (True, cdf1s, cdf3s, cdf5s)
else do
(cdf1s, cdf3s, cdf5s) <- incCdfs delay (Pq.size slotMap')
STM.writeTVar slotMapVar slotMap'
return (True, cdf1s, cdf3s, cdf5s)

Just _ -> return (False, 0, 0, 0) -- dupe, we only track the first
(fresh, cdf1s, cdf3s, cdf5s) <- atomically $
cdf135Counters slotMapVar cdf1sVar cdf3sVar cdf5sVar (slotMapKey slotNo) delay

when fresh $ do
-- TODO: Revisit ekg counter access once there is a faster way.
sendEKGDirectDouble ekgDirect "cardano.node.metrics.blockfetchclient.blockdelay.s"
$ realToFrac delay
sendEKGDirectInt ekgDirect "cardano.node.metrics.blockfetchclient.blocksize"
blockSize
sendEKGDirectDouble ekgDirect "cardano.node.metrics.blockfetchclient.blockdelay.cdfOne"
cdf1s
sendEKGDirectDouble ekgDirect "cardano.node.metrics.blockfetchclient.blockdelay.cdfThree"
cdf3s
sendEKGDirectDouble ekgDirect "cardano.node.metrics.blockfetchclient.blockdelay.cdfFive"
cdf5s
when (cdf1s >= 0) $
sendEKGDirectDouble ekgDirect
"cardano.node.metrics.blockfetchclient.blockdelay.cdfOne"
cdf1s

when (cdf3s >= 0) $
sendEKGDirectDouble ekgDirect
"cardano.node.metrics.blockfetchclient.blockdelay.cdfThree"
cdf3s

when (cdf5s >= 0) $
sendEKGDirectDouble ekgDirect
"cardano.node.metrics.blockfetchclient.blockdelay.cdfFive"
cdf5s
when (delay > 5) $
sendEKGDirectCounter ekgDirect "cardano.node.metrics.blockfetchclient.lateblocks"

bfTracer e =
traceWith tracer e

slotMapKey :: SlotNo -> Int
slotMapKey (SlotNo s) = fromIntegral s
slotMapKey :: SlotNo -> Word64
slotMapKey (SlotNo s) = s


traceLeadershipChecks ::
forall blk
Expand Down

0 comments on commit 1cbd1e7

Please sign in to comment.