diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index 48c34960..525901ba 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -1,7 +1,7 @@ module Propulsion.Tool.Program open Argu -open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException +open Propulsion.Internal open Serilog [] @@ -13,7 +13,7 @@ type Parameters = | [] InitPg of ParseResults | [] Index of ParseResults | [] Checkpoint of ParseResults - | [] Project of ParseResults + | [] Sync of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Verbose -> "Include low level logging regarding specific test runs." @@ -23,7 +23,7 @@ type Parameters = | InitPg _ -> "Initialize a postgres checkpoint store" | Index _ -> "Validate index (optionally, ingest events from a DynamoDB JSON S3 export to remediate missing events)." | Checkpoint _ -> "Display or override checkpoints in Cosmos or Dynamo" - | Project _ -> "Project from store specified as the last argument." + | Sync _ -> "Project from store specified as the last argument. Supports kafka, cosmos, or just gathering `stats`." and [] CheckpointParameters = | [] Source of Propulsion.Feed.SourceId | [] Tranche of Propulsion.Feed.TrancheId @@ -92,7 +92,7 @@ type Arguments(c: Args.Configuration, p: ParseResults) = | InitPg a -> do! Args.Mdb.Arguments(c, a).CreateCheckpointStoreTable() |> Async.ofTask | Checkpoint a -> do! Checkpoints.readOrOverride(c, a, CancellationToken.None) |> Async.ofTask | Index a -> do! Args.Dynamo.index (c, a) - | Project a -> do! Project.run AppName (c, a) + | Sync a -> do! Sync.run AppName (c, a) | x -> p.Raise $"unexpected subcommand %A{x}" } static member Parse argv = let parseResults = ArgumentParser.Create().ParseCommandLine argv @@ -110,6 +110,6 @@ let main argv = try a.ExecuteSubCommand() |> Async.RunSynchronously; 0 with e when not (isExpectedShutdownSignalException e) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with x when x = Project.eofSignalException -> printfn "Processing COMPLETE"; 0 + with x when x = Sync.eofSignalException -> printfn "Processing COMPLETE"; 0 | :? ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 | e -> eprintfn $"EXCEPTION: %s{e.Message}"; 1 diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index b58722b0..b6b4365e 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -1,4 +1,4 @@ -module Propulsion.Tool.Project +module Propulsion.Tool.Sync open Argu open Infrastructure @@ -115,7 +115,7 @@ and [] CosmosParameters = | Retries _ -> "specify operation retries. Default: 0." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." | MaxKiB _ -> "specify maximum size in KiB to pass to the Sync stored proc (reduce if Malformed Streams due to 413 RequestTooLarge responses). Default: 128." - | From _ -> "Specify Source." + | From _ -> "Specify Source." and CosmosArguments(c: Args.Configuration, p: ParseResults) = let source = SourceArguments(c, p.GetResult CosmosParameters.From) let connection = match source.Store with