diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs index bfa952c4..8bd60b96 100644 --- a/src/Propulsion/Internal.fs +++ b/src/Propulsion/Internal.fs @@ -322,6 +322,37 @@ module Stats = emit name (items |> Seq.collect _.Value) member _.Clear() = buckets.Clear() + /// Not thread-safe, i.e. suitable for use in a Stats handler only + type CategoryCounters() = + let cats = Dictionary() + member _.Ingest(category, counts) = + let cat = + match cats.TryGetValue category with + | false, _ -> let acc = Counters() in cats.Add(category, acc); acc + | true, acc -> acc + for event, count : int in counts do cat.Ingest(event, count) + member _.Categories = cats.Keys + member _.StatsDescending cat = + match cats.TryGetValue cat with + | true, acc -> acc.StatsDescending + | false, _ -> Seq.empty + member _.DumpGrouped(log: Serilog.ILogger, totalLabel) = + if cats.Count <> 0 then + dumpCounterSet log totalLabel cats + member _.Clear() = cats.Clear() + + /// Not thread-safe, i.e. suitable for use in a Stats handler only + type EventTypeLatencies() = + let inner = LatencyStatsSet() + member _.Record(category: string, eventType: string, latency) = + let key = $"{category}/{eventType}" + inner.Record(key, latency) + member _.Dump(log, totalLabel) = + let inline catFromKey (key: string) = key.Substring(0, key.IndexOf '/') + inner.DumpGrouped(catFromKey, log, totalLabel = totalLabel) + inner.Dump log + member _.Clear() = inner.Clear() + type LogEventLevel = Serilog.Events.LogEventLevel module Log = diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index 48c34960..d9256356 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -1,7 +1,7 @@ module Propulsion.Tool.Program open Argu -open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException +open Propulsion.Internal open Serilog [] @@ -13,7 +13,7 @@ type Parameters = | [] InitPg of ParseResults | [] Index of ParseResults | [] Checkpoint of ParseResults - | [] Project of ParseResults + | [] Sync of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Verbose -> "Include low level logging regarding specific test runs." @@ -23,7 +23,7 @@ type Parameters = | InitPg _ -> "Initialize a postgres checkpoint store" | Index _ -> "Validate index (optionally, ingest events from a DynamoDB JSON S3 export to remediate missing events)." | Checkpoint _ -> "Display or override checkpoints in Cosmos or Dynamo" - | Project _ -> "Project from store specified as the last argument." + | Sync _ -> "Project from store specified as the last argument." and [] CheckpointParameters = | [] Source of Propulsion.Feed.SourceId | [] Tranche of Propulsion.Feed.TrancheId @@ -92,7 +92,7 @@ type Arguments(c: Args.Configuration, p: ParseResults) = | InitPg a -> do! Args.Mdb.Arguments(c, a).CreateCheckpointStoreTable() |> Async.ofTask | Checkpoint a -> do! Checkpoints.readOrOverride(c, a, CancellationToken.None) |> Async.ofTask | Index a -> do! Args.Dynamo.index (c, a) - | Project a -> do! Project.run AppName (c, a) + | Sync a -> do! Sync.run AppName (c, a) | x -> p.Raise $"unexpected subcommand %A{x}" } static member Parse argv = let parseResults = ArgumentParser.Create().ParseCommandLine argv @@ -110,6 +110,6 @@ let main argv = try a.ExecuteSubCommand() |> Async.RunSynchronously; 0 with e when not (isExpectedShutdownSignalException e) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with x when x = Project.eofSignalException -> printfn "Processing COMPLETE"; 0 + with x when x = Sync.eofSignalException -> printfn "Processing COMPLETE"; 0 | :? ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 | e -> eprintfn $"EXCEPTION: %s{e.Message}"; 1 diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index b58722b0..145d78a3 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -1,4 +1,4 @@ -module Propulsion.Tool.Project +module Propulsion.Tool.Sync open Argu open Infrastructure @@ -115,7 +115,7 @@ and [] CosmosParameters = | Retries _ -> "specify operation retries. Default: 0." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." | MaxKiB _ -> "specify maximum size in KiB to pass to the Sync stored proc (reduce if Malformed Streams due to 413 RequestTooLarge responses). Default: 128." - | From _ -> "Specify Source." + | From _ -> "Specify Source." and CosmosArguments(c: Args.Configuration, p: ParseResults) = let source = SourceArguments(c, p.GetResult CosmosParameters.From) let connection = match source.Store with @@ -170,40 +170,11 @@ module Outcome = let share = TimeSpan.seconds (match Array.length ham with 0 -> 0 | count -> elapsedS / float count) create sn (ham |> Array.map (fun x -> struct (eventType x, share))) (eventCounts spam) -type CategoryCounters() = - let cats = System.Collections.Generic.Dictionary() - member _.Ingest(category, counts) = - let cat = - match cats.TryGetValue category with - | false, _ -> let acc = Propulsion.Internal.Stats.Counters() in cats.Add(category, acc); acc - | true, acc -> acc - for event, count : int in counts do cat.Ingest(event, count) - member _.Categories = cats.Keys - member _.StatsDescending cat = - match cats.TryGetValue cat with - | true, acc -> acc.StatsDescending - | false, _ -> Seq.empty - member _.DumpGrouped(log: ILogger, totalLabel) = - if cats.Count <> 0 then - Propulsion.Internal.Stats.dumpCounterSet log totalLabel cats - member _.Clear() = cats.Clear() - -type EventTypeLatencies() = - let inner = Propulsion.Internal.Stats.LatencyStatsSet() - member _.Record(category: string, eventType: string, latency) = - let key = $"{category}/{eventType}" - inner.Record(key, latency) - member _.Dump(log, totalLabel) = - let inline catFromKey (key: string) = key.Substring(0, key.IndexOf '/') - inner.DumpGrouped(catFromKey, log, totalLabel = totalLabel) - inner.Dump log - member _.Clear() = inner.Clear() - type Stats(log, statsInterval, stateInterval, verboseStore, logExternalStats) = inherit StatsBase(log, statsInterval, stateInterval, verboseStore, logExternalStats) let mutable handled, ignored = 0, 0 - let accHam, accSpam = CategoryCounters(), CategoryCounters() - let intervalLats, accEventTypeLats = EventTypeLatencies(), EventTypeLatencies() + let accHam, accSpam = Stats.CategoryCounters(), Stats.CategoryCounters() + let intervalLats, accEventTypeLats = Stats.EventTypeLatencies(), Stats.EventTypeLatencies() override _.HandleOk((category, ham, spam)) = accHam.Ingest(category, ham |> Seq.countBy ValueTuple.fst) accSpam.Ingest(category, spam)