Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 27, 2023
1 parent d683018 commit 2a2831c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
20 changes: 11 additions & 9 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,24 @@ module Stats =
type LatencyStatsSet() =
let buckets = Dictionary<string, ResizeArray<TimeSpan>>()
let emit log names =
let maxGroupLen = names |> Seq.map String.length |> Seq.max
let maxGroupLen = names |> Seq.map String.length |> Seq.max // NOTE caller must guarantee >1 item
fun (label: string) -> dumpStats log (label.PadRight maxGroupLen)
member _.Record(bucket, value: TimeSpan) =
match buckets.TryGetValue bucket with
| false, _ -> let n = ResizeArray() in n.Add value; buckets.Add(bucket, n)
| true, buf -> buf.Add value
member _.Dump(log: Serilog.ILogger, ?labelSortOrder) =
let emit = emit log buckets.Keys
for name in Seq.sortBy (defaultArg labelSortOrder id) buckets.Keys do
emit name buckets[name]
if buckets.Count <> 0 then
let emit = emit log buckets.Keys
for name in Seq.sortBy (defaultArg labelSortOrder id) buckets.Keys do
emit name buckets[name]
member _.DumpGrouped(bucketGroup, log: Serilog.ILogger, ?totalLabel) =
let clusters = buckets |> Seq.groupBy (fun kv -> bucketGroup kv.Key) |> Seq.sortBy fst |> Seq.toArray
let emit = emit log (clusters |> Seq.map fst)
totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect (fun kv -> kv.Value)))
for name, items in clusters do
emit name (items |> Seq.collect (fun kv -> kv.Value))
if buckets.Count <> 0 then
let clusters = buckets |> Seq.groupBy (fun kv -> bucketGroup kv.Key) |> Seq.sortBy fst |> Seq.toArray
let emit = emit log (clusters |> Seq.map fst)
totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect (fun kv -> kv.Value)))
for name, items in clusters do
emit name (items |> Seq.collect (fun kv -> kv.Value))
member _.Clear() = buckets.Clear()

type LogEventLevel = Serilog.Events.LogEventLevel
Expand Down
26 changes: 13 additions & 13 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ module Buffer =

type [<RequireQualifiedAccess; Struct; NoEquality; NoComparison>] OutcomeKind = Ok | Tagged of string | Exn
module OutcomeKind =
let [<Literal>] OkTag = "STREAMS"
let [<Literal>] OkTag = "ok"
let [<Literal>] ExnTag = "exception"
let Timeout = OutcomeKind.Tagged "timeout"
let RateLimited = OutcomeKind.Tagged "rateLimited"
Expand Down Expand Up @@ -405,8 +405,8 @@ module Scheduling =
/// Collates all state and reactions to manage the list of busy streams based on callbacks/notifications from the Dispatcher
type Monitor() =
let active, failing, stuck = Active(), Repeating(), Repeating()
let emit (log : ILogger) state struct (streams, attempts) struct (oldest : TimeSpan, newest : TimeSpan) =
log.Information(" {state} {streams} for {newest:n1}-{oldest:n1}s, {attempts} attempts",
let emit (log: ILogger) level state struct (streams, attempts) struct (oldest: TimeSpan, newest: TimeSpan) =
log.Write(level, " {state,7} {streams,3} for {newest:n1}-{oldest:n1}s, {attempts} attempts",
state, streams, newest.TotalSeconds, oldest.TotalSeconds, attempts)
member _.HandleStarted(sn, ts) =
active.HandleStarted(sn, ts)
Expand All @@ -426,19 +426,19 @@ module Scheduling =
member _.IsFailing(failingThreshold: TimeSpan) =
failing.OldestIsOlderThan failingThreshold || stuck.OldestIsOlderThan TimeSpan.Zero
member _.DumpState(log : ILogger) =
let dump state struct (streams, attempts) ages =
let dump level state struct (streams, attempts) ages =
if streams <> 0 then
emit log state (streams, attempts) ages
active.State ||> dump "active"
failing.State ||> dump "failing"
stuck.State ||> dump "stalled"
emit log level state (streams, attempts) ages
stuck.State ||> dump LogEventLevel.Error "stalled"
failing.State ||> dump LogEventLevel.Warning "failing"
active.State ||> dump LogEventLevel.Information "active"
member _.EmitMetrics(log : ILogger) =
let report state struct (streams, attempts) struct (oldest : TimeSpan, newest : TimeSpan) =
let m = Log.Metric.StreamsBusy (state, streams, oldest.TotalSeconds, newest.TotalSeconds)
emit (log |> Log.withMetric m) state (streams, attempts) (oldest, newest)
active.State ||> report "active"
failing.State ||> report "failing"
emit (log |> Log.withMetric m) LogEventLevel.Information state (streams, attempts) (oldest, newest)
stuck.State ||> report "stalled"
failing.State ||> report "failing"
active.State ||> report "active"

type [<NoComparison; NoEquality>] Timers() =
let mutable results, dispatch, merge, ingest, stats, sleep = 0L, 0L, 0L, 0L, 0L, 0L
Expand Down Expand Up @@ -499,14 +499,14 @@ module Scheduling =
log.Information("Scheduler {cycles} cycles {@states} Running {busy}/{processors}",
cycles, stateStats.StatsDescending, dispatchActive, dispatchMax)
cycles <- 0; stateStats.Clear()
monitor.DumpState x.Log
lats.Dump(log, function OutcomeKind.OkTag -> String.Empty | x -> x)
lats.Clear()
let batchesCompleted = Interlocked.Exchange(&batchesCompleted, 0)
log.Information(" Batches waiting {waiting} started {started} {streams:n0}s {events:n0}e skipped {streamsSkipped:n0}s {eventsSkipped:n0}e completed {completed} Running {active}",
batchesWaiting, batchesStarted, streamsStarted, eventsStarted, streamsWrittenAhead, eventsWrittenAhead, batchesCompleted, batchesRunning)
batchesStarted <- 0; streamsStarted <- 0; eventsStarted <- 0; streamsWrittenAhead <- 0; eventsWrittenAhead <- 0; (*batchesCompleted <- 0*)
x.Timers.Dump log
monitor.DumpState x.Log
x.DumpStats()

member _.IsFailing = monitor.IsFailing failThreshold
Expand Down Expand Up @@ -548,7 +548,7 @@ module Scheduling =
abstract member Handle : Res<Result<'R, 'E>> -> unit

member private _.RecordOutcomeKind(r, k) =
let progressed = r.index = r.index'
let progressed = r.index' > r.index
let inline updateMonitor succeeded = monitor.HandleResult(r.stream, succeeded = succeeded, progressed = progressed)
let kindTag =
match k with
Expand Down

0 comments on commit 2a2831c

Please sign in to comment.