Skip to content

Commit

Permalink
Even more on Frequency limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
jutaro committed May 13, 2021
1 parent c2335eb commit 4508a6f
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 141 deletions.
4 changes: 3 additions & 1 deletion trace-dispatcher/docs/trace-dispatcher.md
Expand Up @@ -2,7 +2,7 @@

The current iohk-monitoring-framework should be replaced with something simpler. The current framework shall be replaced, because it consumes too much resources, and as such too much influences the system it is monitoring.

We call everything that shall be logged or monitored a __Message__ in this context, and are aware that this message can carry a payload, so it is not just a String. We say __TraceIn__, when we refer to the incoming side of a stream of messages while we say __TraceOut__ when we refer to the outgoing side. The `traceWith` function is called on a TraceIn. A TraceOut is doing something effect-full with the messages. So a TraceOut is a backend that not only produces effects but also is always the end of the data flow. We say __Trace__ for a stream of messages, that originates at some TraceIn, is then plumbed via transformers and ends in one or more TraceOuts.
We call everything that shall be logged or monitored a __Message__ in this context, and are aware that this message can carry a payload, so it is not just a String. We say __TraceIn__, when we refer to the incoming side of a stream of messages while we say __TraceOut__ when we refer to the outgoing side. The `traceWith` function is called on a TraceIn. A TraceOut is doing something effect-full with the messages. So a TraceOut is a backend that not only produces effects but also is always the end of the data flow. We say __Trace__ for a stream of messages, that originates at some TraceIn, is then plumbed via transformers, which are implemented via contravariant functions and ends in one or more TraceOuts.

This library consists just of simple combinators and requires the messages just to implement one typeclass for formatting called `Logging`. It can be reconfigured at runtime with one procedure call. However, to work properly it puts the burden on the user to provide a default object for any message as is described in the section on configuration and documentation. This library build upon the arrow based contravariant tracer library contra-tracer.

Expand Down Expand Up @@ -273,9 +273,11 @@ class Logging a where
A.Object o -> o
s@(A.String _) -> HM.singleton "string" s
_ -> mempty

forHuman :: a -> Text
default forHuman :: Humanise a => a -> Text
forHuman v = humanise v

asMetrics :: a -> [Metric]
asMetrics v = []

Expand Down
228 changes: 88 additions & 140 deletions trace-dispatcher/src/Cardano/Logging/FrequencyLimiter.hs
Expand Up @@ -16,179 +16,127 @@ import Cardano.Logging.Types

data LimitingMessage =
StartLimiting Text Double
| ContinueLimiting Text Double -- Just for debugging
| ContinueLimiting Text Double -- Should be shown only for debugging
| StopLimiting Text

data FrequencyRec a = FrequencyRec {
frMessage :: Maybe a
, frLastTime :: UTCTime
, frMsgCount :: Int
, frTicks :: Int
, frActive :: Maybe Double
}

-- | Limits the frequency of messages to msgPer10Seconds
-- If the limiter detects more messages, it traces a StartLimiting message
-- with the current percentage given as a floating point number between 1.0 and 0.0
-- Then it randomly selects messages with the given percentage
-- until the frequency falls under the treshold.
-- Then it sends a StopLimiting message and traces all messages again.
-- Inbetween you can receive ContinueLimiting messages, with the current
-- percentage given as a floating point number between 1.0 and 0.0
-- | Limits the frequency of messages to nMsg which is given per minute.

-- If the limiter detects more messages, it traces randomly selected
-- messages with the given percentage
-- on the vtracer until the frequency falls under the treshold.

-- Before this the ltracer gets a StartLimiting message with the
-- current percentage given as a floating point number between 1.0 and 0.0.
-- Inbetween you can receive ContinueLimiting messages on the ltracer,
-- with the current percentage.
-- Finally it sends a StopLimiting message on the ltracer and traces all
-- messages on the vtracer again.
limitFrequency
:: forall a acc m . MonadIO m
=> Int
-> Text
-> Trace m a
-> Trace m LimitingMessage
-> m (Trace m a)
limitFrequency limiting limiterName vtracer ltracer = do
timeNow <- liftIO getCurrentTime
let initialMessage = Nothing
let initial = FrequencyRec initialMessage timeNow 0 Nothing
let vtr = T.contramap prepare (filterTraceMaybe vtracer)
foldMTraceM cata initial vtr
limitFrequency nMsg limiterName vtracer ltracer =
let ticks = max 1 (round (5.0 * (60.0 / fromIntegral nMsg)))
treshold = (fromIntegral nMsg / 60.0) * fromIntegral ticks
in do
timeNow <- liftIO getCurrentTime
foldMTraceM
(cata ticks treshold)
(FrequencyRec Nothing timeNow 0 0 Nothing)
(T.contramap prepare (filterTraceMaybe vtracer))
where
-- prepare ::
-- (LoggingContext, Either TraceControl (Folding a (FrequencyRec a)))
-- -> (LoggingContext, Either TraceControl (Maybe a))
prepare ::
(LoggingContext, Either TraceControl (Folding a (FrequencyRec a)))
-> (LoggingContext, Either TraceControl (Maybe a))
prepare (lc, Left c) = (lc, Left c)
prepare (lc, Right (Folding FrequencyRec {..})) = (lc, Right frMessage)

-- cata :: FrequencyRec a -> a -> m (FrequencyRec a)
cata fs@FrequencyRec {..} message = do
cata :: Int -> Double -> FrequencyRec a -> a -> m (FrequencyRec a)
cata ticks treshold fs@FrequencyRec {..} message = do
timeNow <- liftIO getCurrentTime
let timeDiffPico = nominalDiffTimeToSeconds (diffUTCTime timeNow frLastTime)
let timeDiffSec = timeDiffPico * 1000000000000.00
case frActive of
Nothing -> -- not active
if timeDiffSec > 1
then -- more then a second has passed
if frMsgCount > limiting
then do -- start limiting
let limitingFactor = fromIntegral limiting / fromIntegral frMsgCount
traceWith (setSeverity Info ltracer) (StartLimiting limiterName limitingFactor)
pure fs { frMessage = Just message
, frLastTime = timeNow
, frMsgCount = 0
, frActive = Just limitingFactor}
else -- continue new second without limiting
pure fs { frMessage = Just message
, frLastTime = timeNow
, frMsgCount = 0}
-- Not active, not at second boundary, jsut pass and count
if timeDiffSec > 1.0
then -- ticking
if frTicks + 1 >= ticks
then -- in a check cycle
if fromIntegral frMsgCount > treshold
then do -- start limiting
let limitingFactor = treshold / fromIntegral frMsgCount
traceWith
(setSeverity Info ltracer)
(StartLimiting limiterName limitingFactor)
pure fs { frMessage = Just message
, frLastTime = timeNow
, frMsgCount = 0
, frTicks = 0
, frActive = Just limitingFactor}
else -- in a check cycle, but stay inactive
pure fs { frMessage = Just message
, frLastTime = timeNow
, frMsgCount = 0
, frTicks = 0
, frActive = Nothing}
else -- ticking but not in a check cycle
if fromIntegral frMsgCount > treshold
then do -- start limiting inbetween
let preFactor = fromIntegral ticks / fromIntegral (frTicks + 1)
let limitingFactor = (treshold * preFactor) / fromIntegral frMsgCount
traceWith
(setSeverity Info ltracer)
(StartLimiting limiterName limitingFactor)
pure fs { frMessage = Just message
, frLastTime = timeNow
, frMsgCount = 0
, frTicks = 0
, frActive = Just limitingFactor}
else
pure fs { frMessage = Just message
, frLastTime = timeNow
, frMsgCount = 0
, frTicks = frTicks + 1}
-- Not active, not at second boundary, just pass and count
else pure $ fs { frMessage = Just message
, frMsgCount = frMsgCount + 1}
Just percentage ->
if timeDiffSec > 1
then
if frMsgCount > limiting
then do -- Continue Limiting
let limitingFactor = fromIntegral limiting / fromIntegral frMsgCount
traceWith (setSeverity Debug ltracer) (ContinueLimiting limiterName limitingFactor)
pure fs { frMessage = Just message
, frLastTime = timeNow
, frMsgCount = 0
, frActive = Just limitingFactor}
else do -- stop limiting
Just percentage -> -- Active
if (timeDiffSec > 1.0) && (frTicks + 1 >= ticks)
then -- active, second and ticking
(if fromIntegral frMsgCount > treshold
then do -- continue
let limitingFactor = treshold / fromIntegral frMsgCount
traceWith
(setSeverity Debug ltracer)
(ContinueLimiting limiterName limitingFactor)
pure fs
{frMessage = Just message, frLastTime = timeNow, frMsgCount = 0,
frTicks = 0, frActive = Just limitingFactor}
else do -- stop
traceWith (setSeverity Info ltracer) (StopLimiting limiterName)
pure fs { frMessage = Just message
, frLastTime = timeNow
, frMsgCount = 0
, frActive = Nothing}
else do -- with active limiting
pure fs
{frMessage = Just message, frLastTime = timeNow, frMsgCount = 0,
frTicks = 0, frActive = Nothing})
else do
rnd :: Double <- liftIO randomIO
let newTicks = if timeDiffSec > 1.0 then frTicks + 1 else frTicks
if percentage > rnd
then -- sending the message
pure $ fs { frMessage = Just message
, frTicks = newTicks
, frMsgCount = frMsgCount + 1}
else -- suppress the message
pure $ fs { frMessage = Nothing
, frTicks = newTicks
, frMsgCount = frMsgCount + 1}

-- type FrequencyLimited a = (Maybe a, Maybe LimitingMessage)
--
-- data FrequencyStructure a = FrequencyStructure {
-- fsMessage :: FrequencyLimited a
-- , fsLastTime :: UTCTime
-- , fsMsgCount :: Int
-- , fsActive :: Maybe Double
-- }
--
-- -- | Limits the frequency of messages to msgPer10Seconds
-- -- If the limiter detects more messages, it traces a StartLimiting messages
-- -- and then randomly selects messages until the frequency falls under the
-- -- treshold. Then it sends a StopLimiting message and traces all messages again.
-- -- Inbetween you can receive continue limiting messages, with the current
-- -- percentage given as a floating point number between 1.0 and 0.0
-- limitFrequency'
-- :: forall a acc m . MonadIO m
-- => Int
-- -> Text
-- -> Trace m (FrequencyLimited a)
-- -> m (Trace m a)
-- limitFrequency' limiting limiterName tr = do
-- timeNow <- liftIO getCurrentTime
-- let initialMessage = (Nothing, Nothing)
-- let initial = FrequencyStructure initialMessage timeNow 0 Nothing
-- let tr' = T.contramap prepare tr
-- foldMTraceM cata initial tr'
-- where
-- prepare ::
-- (LoggingContext, Either TraceControl (Folding a (FrequencyStructure a)))
-- -> (LoggingContext, Either TraceControl (FrequencyLimited a))
-- prepare (lc, Left c) = (lc, Left c)
-- prepare (lc, Right (Folding FrequencyStructure {..})) = (lc, Right fsMessage)
--
-- cata :: FrequencyStructure a -> a -> m (FrequencyStructure a)
-- cata fs@FrequencyStructure {..} message = do
-- timeNow <- liftIO getCurrentTime
-- let timeDiffPico = nominalDiffTimeToSeconds (diffUTCTime timeNow fsLastTime)
-- let timeDiffSec = timeDiffPico * 1000000000000.00
-- case fsActive of
-- Nothing -> -- not active
-- if timeDiffSec > 1
-- then -- more then a second has passed
-- if fsMsgCount > limiting
-- then -- start limiting
-- pure $
-- let limitingFactor = fromIntegral limiting / fromIntegral fsMsgCount
-- in fs { fsMessage = (Just message,
-- Just (StartLimiting limiterName limitingFactor))
-- , fsLastTime = timeNow
-- , fsMsgCount = 0
-- , fsActive = Just limitingFactor}
-- else -- continue new second without limiting
-- pure $ fs { fsMessage = (Just message, Nothing)
-- , fsLastTime = timeNow
-- , fsMsgCount = 0}
-- -- Not active, not at second boundary, jsut pass and count
-- else pure $ fs { fsMessage = (Just message, Nothing)
-- , fsMsgCount = fsMsgCount + 1}
-- Just percentage ->
-- if timeDiffSec > 1
-- then
-- if fsMsgCount > limiting
-- then -- Continue Limiting
-- pure $
-- let limitingFactor = fromIntegral limiting
-- / fromIntegral fsMsgCount
-- in fs { fsMessage = (Just message,
-- Just (ContinueLimiting limiterName limitingFactor))
-- , fsLastTime = timeNow
-- , fsMsgCount = 0
-- , fsActive = Just limitingFactor}
-- else -- stop limiting
-- pure $
-- fs { fsMessage = (Just message, Just (StopLimiting limiterName))
-- , fsLastTime = timeNow
-- , fsMsgCount = 0
-- , fsActive = Nothing}
-- else do -- with active limiting
-- rnd :: Double <- liftIO randomIO
-- if percentage > rnd
-- then -- sending the message
-- pure $ fs { fsMessage = (Just message, Nothing)
-- , fsMsgCount = fsMsgCount + 1}
-- else -- suppress the message
-- pure $ fs { fsMessage = (Nothing, Nothing)
-- , fsMsgCount = fsMsgCount + 1}

0 comments on commit 4508a6f

Please sign in to comment.