Skip to content

Commit

Permalink
Use GetResult
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 11, 2023
1 parent c6effae commit e45d301
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 34 deletions.
60 changes: 29 additions & 31 deletions tools/Propulsion.Tool/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ open System
exception MissingArg of message: string with override this.Message = this.message
let missingArg msg = raise (MissingArg msg)

type ParseResults<'T when 'T :> IArgParserTemplate> with
member x.GetResult([<ReflectedDefinition>] e: Quotations.Expr<'Fields -> 'T>, f: unit -> 'Fields) = x.TryGetResult e |> Option.defaultWith f

module Configuration =

module Cosmos =
Expand Down Expand Up @@ -87,24 +90,22 @@ module Cosmos =
| LagFreqM _ -> "Specify frequency to dump lag stats. Default: off"

type Arguments(c: Configuration, p: ParseResults<Parameters>) =
let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection)
let discovery = Equinox.CosmosStore.Discovery.ConnectionString connection
let mode = p.TryGetResult ConnectionMode
let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds
let retries = p.GetResult(Retries, 1)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds
let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode)
let databaseId = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase)
let connector =
let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString
let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds
let retries = p.GetResult(Retries, 1)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds
let mode = p.TryGetResult ConnectionMode
Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode)
let databaseId = p.GetResult(Database, fun () -> c.CosmosDatabase)
let containerId = p.GetResult(Container, fun () -> c.CosmosContainer)
let leasesContainerName = p.GetResult(LeaseContainer, fun () -> containerId + p.GetResult(Suffix, "-aux"))
let checkpointInterval = TimeSpan.FromHours 1.
member val ContainerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer)
member val LeaseContainerId = p.TryGetResult LeaseContainer
member x.LeasesContainerName = match x.LeaseContainerId with Some x -> x | None -> x.ContainerId + p.GetResult(Suffix, "-aux")
member x.CreateLeasesContainer() = connector.CreateLeasesContainer(databaseId, x.LeasesContainerName)
member x.ConnectFeed() = connector.ConnectFeed(databaseId, x.ContainerId, x.LeasesContainerName)
member _.MaybeLogLagInterval = p.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes

member val MaybeLogLagInterval = p.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes
member _.CreateLeasesContainer() = connector.CreateLeasesContainer(databaseId, leasesContainerName)
member _.ConnectFeed() = connector.ConnectFeed(databaseId, containerId, leasesContainerName)
member x.CreateCheckpointStore(group, cache, storeLog) = async {
let! context = connector.ConnectContext("Checkpoints", databaseId, x.ContainerId, 256)
let! context = connector.ConnectContext("Checkpoints", databaseId, containerId, 256)
return Propulsion.Feed.ReaderCheckpoint.CosmosStore.create storeLog (group, checkpointInterval) (context, cache) }

module Dynamo =
Expand Down Expand Up @@ -162,18 +163,15 @@ module Dynamo =
| Some systemName ->
Choice1Of2 systemName
| None ->
let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl)
let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey)
let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey)
let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl)
let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey)
let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey)
Choice2Of2 (serviceUrl, accessKey, secretKey)
let connector timeout retries = match conn with
| Choice1Of2 systemName -> Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries)
| Choice2Of2 (serviceUrl, accessKey, secretKey) -> Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries)
let indexSuffix = p.GetResult(IndexSuffix, "-index")
let indexTable = p.TryGetResult IndexTable
|> Option.orElseWith (fun () -> c.DynamoIndexTable)
|> Option.defaultWith (fun () -> c.DynamoTable + indexSuffix)

let indexTable = p.GetResult(IndexTable, fun () -> c.DynamoIndexTable
|> Option.defaultWith (fun () -> c.DynamoTable + p.GetResult(IndexSuffix, "-index")))
let writeConnector = let timeout = p.GetResult(RetriesTimeoutS, 120.) |> TimeSpan.FromSeconds
let retries = p.GetResult(Retries, 10)
connector timeout retries
Expand Down Expand Up @@ -206,7 +204,7 @@ module Dynamo =
Propulsion.DynamoStore.EventLoadMode.IndexOnly
| Some streamsDop ->
Log.Information("DynamoStoreSource WithData, parallelism limit {streamsDop}", streamsDop)
let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable)
let table = p.GetResult(Table, fun () -> c.DynamoTable)
let context = readClient.Value.CreateContext("Store", table)
Propulsion.DynamoStore.EventLoadMode.WithData (streamsDop, context)
indexProps, loadMode
Expand Down Expand Up @@ -235,18 +233,18 @@ module Mdb =
| Category _ -> "The message-db category to load (must specify >1 when projecting)"

type Arguments(c: Configuration, p: ParseResults<Parameters>) =
let conn () = p.TryGetResult ConnectionString |> Option.defaultWith (fun () -> c.MdbConnectionString)
let checkpointConn () = p.TryGetResult CheckpointConnectionString |> Option.defaultWith conn
let schema = p.TryGetResult CheckpointSchema |> Option.defaultWith (fun () -> c.MdbSchema)
let connectionString () = p.GetResult(ConnectionString, fun () -> c.MdbConnectionString)
let checkpointConnectionString () = p.GetResult(CheckpointConnectionString, connectionString)
let schema = p.GetResult(CheckpointSchema, fun () -> c.MdbSchema)

member x.CreateClient() =
Array.ofList (p.GetResults Category), conn ()
Array.ofList (p.GetResults Category), connectionString ()

member x.CreateCheckpointStore(group) =
Propulsion.MessageDb.ReaderCheckpoint.CheckpointStore(checkpointConn (), schema, group)
Propulsion.MessageDb.ReaderCheckpoint.CheckpointStore(checkpointConnectionString (), schema, group)

member x.CreateCheckpointStoreTable([<O; D null>] ?ct) = task {
let connStringWithoutPassword = NpgsqlConnectionStringBuilder(checkpointConn (), Password = null)
let connStringWithoutPassword = NpgsqlConnectionStringBuilder(checkpointConnectionString (), Password = null)
Log.Information("Authenticating with postgres using {connectionString}", connStringWithoutPassword.ToString())
Log.Information("Creating checkpoints table as {table}", $"{schema}.{Propulsion.MessageDb.ReaderCheckpoint.TableName}")
let checkpointStore = x.CreateCheckpointStore("nil")
Expand Down
6 changes: 3 additions & 3 deletions tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ and [<NoComparison; NoEquality>] InitAuxParameters =
| Cosmos _ -> "Cosmos Connection parameters."
and CosmosModeType = Container | Db | Serverless
and CosmosInitArguments(p: ParseResults<InitAuxParameters>) =
let rusOrDefault value = p.GetResult(Rus, value)
let rusOrDefault (value: int) = p.GetResult(Rus, value)
let throughput auto = if auto then CosmosInit.Throughput.Autoscale (rusOrDefault 4000)
else CosmosInit.Throughput.Manual (rusOrDefault 400)
member val ProvisioningMode =
Expand Down Expand Up @@ -283,8 +283,8 @@ module Indexer =
module Project =

type KafkaArguments(c, p: ParseResults<KafkaParameters>) =
member _.Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.KafkaBroker)
member _.Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.KafkaTopic)
member _.Broker = p.GetResult(Broker, fun () -> c.KafkaBroker)
member _.Topic = p.GetResult(Topic, fun () -> c.KafkaTopic)
member val StoreArgs =
match p.GetSubCommand() with
| KafkaParameters.Cosmos p -> Choice1Of3 (Args.Cosmos.Arguments (c, p))
Expand Down

0 comments on commit e45d301

Please sign in to comment.