Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 15, 2023
1 parent dd25f0e commit df10c5d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/Propulsion/Pipeline.fs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type [<AbstractClass; Sealed>] PipelineFactory private () =
if tcs.TrySetException(exn) then
log.Warning(exn, "Cancelling processing due to faulted scheduler or health checks")
abended <- true
// Health check can flag need to abend multiple times; first one has to win
elif not abended then log.Information(exn, "Failed setting abend exn")
// NB cancel needs to be after TSE or the Register(TSE) will win
cts.Cancel()
Expand Down
9 changes: 5 additions & 4 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,13 @@ module OutcomeKind =
/// Details of name, time since first failure, and number of attempts for a Failing or Stuck stream
type FailingStreamDetails = (struct (FsCodec.StreamName * TimeSpan * int))

/// Raised by <c>Stats</c> to conclude the processing when the <c>abendThreshold</c> has been exceeded.
/// <summary>Raised by <c>Stats</c>'s <c>HealthCheck</c> to terminate a Sink's processing when the <c>abendThreshold</c> has been exceeded.</summary>
type HealthCheckException(oldestStuck, oldestFailing, stuckStreams, failingStreams) =
inherit exn()
override x.Message = $"Failure Threshold exceeded; Oldest stuck stream: {oldestStuck}, Oldest failing stream {oldestFailing}"
/// Duration for which oldest stream that's active but failing has failed to progress
member val TimeStuck: TimeSpan = oldestStuck
/// Duration for which oldest stream that's active but failing to progress (though not erroring)
member val OldestFailing: TimeSpan = oldestFailing
/// Details of name, time since first attempt, and number of attempts for Streams not making progress
member val StuckStreams: FailingStreamDetails[] = stuckStreams
Expand Down Expand Up @@ -381,9 +383,8 @@ module Scheduling =
let currentTs = Stopwatch.timestamp ()
[| for kv in state ->
let sn = kv.Key
let v = kv.Value
let age = (currentTs - v.ts) |> Stopwatch.ticksToTimeSpan
struct (sn, age, v.count) |]
let age = currentTs - kv.Value.ts |> Stopwatch.ticksToTimeSpan
struct (sn, age, kv.Value.count) |]
let private renderState agesAndCounts =
let mutable oldest, newest, streams, attempts = Int64.MinValue, Int64.MaxValue, 0, 0
for struct (diff, count) in agesAndCounts do
Expand Down

0 comments on commit df10c5d

Please sign in to comment.