Skip to content

Commit

Permalink
refactor(Tool)!: Move Stats helpers to Internal
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 21, 2024
1 parent 89b00ad commit 74197fc
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 38 deletions.
31 changes: 31 additions & 0 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,37 @@ module Stats =
emit name (items |> Seq.collect _.Value)
member _.Clear() = buckets.Clear()

/// Not thread-safe, i.e. suitable for use in a Stats handler only
type CategoryCounters() =
let cats = Dictionary<string, Counters>()
member _.Ingest(category, counts) =
let cat =
match cats.TryGetValue category with
| false, _ -> let acc = Counters() in cats.Add(category, acc); acc
| true, acc -> acc
for event, count : int in counts do cat.Ingest(event, count)
member _.Categories = cats.Keys
member _.StatsDescending cat =
match cats.TryGetValue cat with
| true, acc -> acc.StatsDescending
| false, _ -> Seq.empty
member _.DumpGrouped(log: Serilog.ILogger, totalLabel) =
if cats.Count <> 0 then
dumpCounterSet log totalLabel cats
member _.Clear() = cats.Clear()

/// Not thread-safe, i.e. suitable for use in a Stats handler only
type EventTypeLatencies() =
let inner = LatencyStatsSet()
member _.Record(category: string, eventType: string, latency) =
let key = $"{category}/{eventType}"
inner.Record(key, latency)
member _.Dump(log, totalLabel) =
let inline catFromKey (key: string) = key.Substring(0, key.IndexOf '/')
inner.DumpGrouped(catFromKey, log, totalLabel = totalLabel)
inner.Dump log
member _.Clear() = inner.Clear()

type LogEventLevel = Serilog.Events.LogEventLevel

module Log =
Expand Down
10 changes: 5 additions & 5 deletions tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Propulsion.Tool.Program

open Argu
open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException
open Propulsion.Internal
open Serilog

[<NoEquality; NoComparison; RequireSubcommand>]
Expand All @@ -13,7 +13,7 @@ type Parameters =
| [<CliPrefix(CliPrefix.None); Last; Unique>] InitPg of ParseResults<Args.Mdb.Parameters>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Index of ParseResults<Args.Dynamo.IndexParameters>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Checkpoint of ParseResults<CheckpointParameters>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Project of ParseResults<Project.Parameters>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Sync of ParseResults<Sync.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose -> "Include low level logging regarding specific test runs."
Expand All @@ -23,7 +23,7 @@ type Parameters =
| InitPg _ -> "Initialize a postgres checkpoint store"
| Index _ -> "Validate index (optionally, ingest events from a DynamoDB JSON S3 export to remediate missing events)."
| Checkpoint _ -> "Display or override checkpoints in Cosmos or Dynamo"
| Project _ -> "Project from store specified as the last argument."
| Sync _ -> "Project from store specified as the last argument."
and [<NoEquality; NoComparison; RequireSubcommand>] CheckpointParameters =
| [<AltCommandLine "-s"; Mandatory>] Source of Propulsion.Feed.SourceId
| [<AltCommandLine "-t"; Mandatory>] Tranche of Propulsion.Feed.TrancheId
Expand Down Expand Up @@ -92,7 +92,7 @@ type Arguments(c: Args.Configuration, p: ParseResults<Parameters>) =
| InitPg a -> do! Args.Mdb.Arguments(c, a).CreateCheckpointStoreTable() |> Async.ofTask
| Checkpoint a -> do! Checkpoints.readOrOverride(c, a, CancellationToken.None) |> Async.ofTask
| Index a -> do! Args.Dynamo.index (c, a)
| Project a -> do! Project.run AppName (c, a)
| Sync a -> do! Sync.run AppName (c, a)
| x -> p.Raise $"unexpected subcommand %A{x}" }
static member Parse argv =
let parseResults = ArgumentParser.Create().ParseCommandLine argv
Expand All @@ -110,6 +110,6 @@ let main argv =
try a.ExecuteSubCommand() |> Async.RunSynchronously; 0
with e when not (isExpectedShutdownSignalException e) -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
with x when x = Project.eofSignalException -> printfn "Processing COMPLETE"; 0
with x when x = Sync.eofSignalException -> printfn "Processing COMPLETE"; 0
| :? ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
| e -> eprintfn $"EXCEPTION: %s{e.Message}"; 1
37 changes: 4 additions & 33 deletions tools/Propulsion.Tool/Sync.fs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Propulsion.Tool.Project
module Propulsion.Tool.Sync

open Argu
open Infrastructure
Expand Down Expand Up @@ -115,7 +115,7 @@ and [<NoEquality; NoComparison; RequireSubcommand>] CosmosParameters =
| Retries _ -> "specify operation retries. Default: 0."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5."
| MaxKiB _ -> "specify maximum size in KiB to pass to the Sync stored proc (reduce if Malformed Streams due to 413 RequestTooLarge responses). Default: 128."
| From _ -> "Specify Source."
| From _ -> "Specify Source."
and CosmosArguments(c: Args.Configuration, p: ParseResults<CosmosParameters>) =
let source = SourceArguments(c, p.GetResult CosmosParameters.From)
let connection = match source.Store with
Expand Down Expand Up @@ -170,40 +170,11 @@ module Outcome =
let share = TimeSpan.seconds (match Array.length ham with 0 -> 0 | count -> elapsedS / float count)
create sn (ham |> Array.map (fun x -> struct (eventType x, share))) (eventCounts spam)

type CategoryCounters() =
let cats = System.Collections.Generic.Dictionary<string, Propulsion.Internal.Stats.Counters>()
member _.Ingest(category, counts) =
let cat =
match cats.TryGetValue category with
| false, _ -> let acc = Propulsion.Internal.Stats.Counters() in cats.Add(category, acc); acc
| true, acc -> acc
for event, count : int in counts do cat.Ingest(event, count)
member _.Categories = cats.Keys
member _.StatsDescending cat =
match cats.TryGetValue cat with
| true, acc -> acc.StatsDescending
| false, _ -> Seq.empty
member _.DumpGrouped(log: ILogger, totalLabel) =
if cats.Count <> 0 then
Propulsion.Internal.Stats.dumpCounterSet log totalLabel cats
member _.Clear() = cats.Clear()

type EventTypeLatencies() =
let inner = Propulsion.Internal.Stats.LatencyStatsSet()
member _.Record(category: string, eventType: string, latency) =
let key = $"{category}/{eventType}"
inner.Record(key, latency)
member _.Dump(log, totalLabel) =
let inline catFromKey (key: string) = key.Substring(0, key.IndexOf '/')
inner.DumpGrouped(catFromKey, log, totalLabel = totalLabel)
inner.Dump log
member _.Clear() = inner.Clear()

type Stats(log, statsInterval, stateInterval, verboseStore, logExternalStats) =
inherit StatsBase<Outcome>(log, statsInterval, stateInterval, verboseStore, logExternalStats)
let mutable handled, ignored = 0, 0
let accHam, accSpam = CategoryCounters(), CategoryCounters()
let intervalLats, accEventTypeLats = EventTypeLatencies(), EventTypeLatencies()
let accHam, accSpam = Stats.CategoryCounters(), Stats.CategoryCounters()
let intervalLats, accEventTypeLats = Stats.EventTypeLatencies(), Stats.EventTypeLatencies()
override _.HandleOk((category, ham, spam)) =
accHam.Ingest(category, ham |> Seq.countBy ValueTuple.fst)
accSpam.Ingest(category, spam)
Expand Down

0 comments on commit 74197fc

Please sign in to comment.