Skip to content

Commit

Permalink
feat(Latency): Add RecordExn
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 11, 2024
1 parent 26b5041 commit 4f061bf
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -15,7 +15,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Feed.SinglePassFeedSource`: Coordinates reads of a set of tranches until each reaches its Tail [#179](https://github.com/jet/propulsion/pull/179)
- `Streams.Stats.abendThreshold`: Abends Sink's processing with a `HealthCheckException` if a stream continually fails to progress, or continually errors with non-transient exceptions [#246](https://github.com/jet/propulsion/pull/246)
- `Streams.Stats.Categorize`: Break down `ok` handler outcomes by stream category [#258](https://github.com/jet/propulsion/pull/258)
- `Streams.Stats.Latency.Record`: Gather custom Handler latency metrics [#258](https://github.com/jet/propulsion/pull/258)
- `Streams.Stats.Latency.Record(Ok|Outcom|Exn)`: Gather custom Handler latency metrics [#258](https://github.com/jet/propulsion/pull/258)
- `StreamFilter`: Generic logic for filtering source events based on Category name, Stream name or Event type [#252](https://github.com/jet/propulsion/pull/252)
- `Ingester, Sinks`: Expose optional `ingesterStateInterval` and `commitInterval` control on Sink factories [#154](https://github.com/jet/propulsion/pull/154) [#239](https://github.com/jet/propulsion/pull/239)
- `Scheduler`: Split out stats re `rateLimited` and `timedOut` vs `exceptions` [#194](https://github.com/jet/propulsion/pull/194)
Expand Down
39 changes: 20 additions & 19 deletions src/Propulsion/Streams.fs
Expand Up @@ -513,35 +513,36 @@ module Scheduling =
type Res<'R> = { duration: TimeSpan; stream: FsCodec.StreamName; index: int64; event: string; index': int64; result: 'R }

type LatencyStats() =
let outcomes = Stats.LatencyStatsSet()
let okCats = Stats.LatencyStatsSet()
let okCatsAcc = Stats.LatencyStatsSet()
let outcomes, outcomesAcc = Stats.LatencyStatsSet(), Stats.LatencyStatsSet()
let okCats, exnCats = Stats.LatencyStatsSet(), Stats.LatencyStatsSet()
let okCatsAcc, exnCatsAcc = Stats.LatencyStatsSet(), Stats.LatencyStatsSet()
member val Categorize = false with get, set
member _.Record(category, duration) =
member _.RecordOutcome(tag, duration) =
outcomes.Record(tag, duration)
outcomesAcc.Record(tag, duration)
member _.RecordOk(category, duration) =
okCats.Record(category, duration)
okCatsAcc.Record(category, duration)
member _.RecordExn(tag, duration) =
exnCats.Record(tag, duration)
exnCatsAcc.Record(tag, duration)
member internal x.DumpStats log =
if x.Categorize then
okCats.Dump(log, totalLabel = "OK")
outcomes.Dump(log)
else
okCats.Dump log
outcomes.Dump(log, labelSortOrder = function OutcomeKind.OkTag -> String.Empty | x -> x)
okCats.Clear()
outcomes.Clear()
outcomes.Dump(log, labelSortOrder = function OutcomeKind.OkTag -> String.Empty | x -> x)
okCats.Dump(log, totalLabel = "OK")
exnCats.Dump(log, totalLabel = "ERROR")
outcomes.Clear(); okCats.Clear(); exnCats.Clear()
member internal x.DumpState(log, purge) =
if x.Categorize then
okCatsAcc.Dump(log, totalLabel = "ΣOK")
else
okCatsAcc.Dump log
if purge then okCatsAcc.Clear()
outcomesAcc.Dump(log, labelSortOrder = function OutcomeKind.OkTag -> String.Empty | x -> x)
okCatsAcc.Dump(log, totalLabel = "ΣOK")
exnCatsAcc.Dump(log, totalLabel = "ΣERROR")
if purge then outcomesAcc.Clear(); okCatsAcc.Clear(); exnCatsAcc.Clear()
member internal x.RecordOutcome(streamName, kind, duration) =
let tag = OutcomeKind.tag kind
if tag = OutcomeKind.OkTag && x.Categorize then
let cat = StreamName.categorize streamName
x.Record(cat, duration)
x.RecordOk(cat, duration)
else
outcomes.Record(tag, duration)
x.RecordOutcome(tag, duration)
tag

/// Gathers stats pertaining to the core projection/ingestion activity
Expand Down

0 comments on commit 4f061bf

Please sign in to comment.