-
Notifications
You must be signed in to change notification settings - Fork 16
/
Program.fs
133 lines (117 loc) · 8.01 KB
/
Program.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
module FeedConsumerTemplate.Program
open Serilog
open System
type Configuration(tryGet) =
let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}"
member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION"
member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE"
member _.CosmosContainer = get "EQUINOX_COSMOS_CONTAINER"
member _.BaseUri = get "API_BASE_URI"
member _.Group = get "API_CONSUMER_GROUP"
module Args =
open Argu
[<NoEquality; NoComparison; RequireSubcommand>]
type Parameters =
| [<AltCommandLine "-V"; Unique>] Verbose
| [<AltCommandLine "-g"; Unique>] Group of string
| [<AltCommandLine "-s"; Unique>] SourceId of string
| [<AltCommandLine "-f"; Unique>] BaseUri of string
| [<AltCommandLine "-r"; Unique>] MaxReadAhead of int
| [<AltCommandLine "-w"; Unique>] FcsDop of int
| [<AltCommandLine "-t"; Unique>] TicketsDop of int
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<CosmosParameters>
interface IArgParserTemplate with
member p.Usage = p |> function
| Verbose -> "request verbose logging."
| Group _ -> "specify Api Consumer Group Id. (optional if environment variable API_CONSUMER_GROUP specified)"
| SourceId _ -> "specify Api SourceId. Default: 'default'"
| BaseUri _ -> "specify Api endpoint. (optional if environment variable API_BASE_URI specified)"
| MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 8."
| FcsDop _ -> "maximum number of FCs to process in parallel. Default: 4"
| TicketsDop _ -> "maximum number of Tickets to process in parallel (per FC). Default: 4"
| Cosmos _ -> "Cosmos Store parameters."
and Arguments(c: Configuration, p: ParseResults<Parameters>) =
member val Verbose = p.Contains Parameters.Verbose
member val GroupId = p.GetResult(Group, fun () -> c.Group)
member val SourceId = p.GetResult(SourceId, "default") |> Propulsion.Feed.SourceId.parse
member val BaseUri = p.GetResult(BaseUri, fun () -> c.BaseUri) |> Uri
member val MaxReadAhead = p.GetResult(MaxReadAhead, 8)
member val FcsDop = p.GetResult(FcsDop, 4)
member val TicketsDop = p.GetResult(TicketsDop, 4)
member val StatsInterval = TimeSpan.FromMinutes 1.
member val StateInterval = TimeSpan.FromMinutes 5.
member val CheckpointInterval = TimeSpan.FromHours 1.
member val TailSleepInterval = TimeSpan.FromSeconds 1.
member val Cosmos: CosmosArguments =
match p.GetSubCommand() with
| Cosmos cosmos -> CosmosArguments(c, cosmos)
| _ -> p.Raise "Must specify cosmos"
and [<NoEquality; NoComparison; RequireSubcommand>] CosmosParameters =
| [<AltCommandLine "-V"; Unique>] Verbose
| [<AltCommandLine "-cm">] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode
| [<AltCommandLine "-s">] Connection of string
| [<AltCommandLine "-d">] Database of string
| [<AltCommandLine "-c">] Container of string
| [<AltCommandLine "-o">] Timeout of float
| [<AltCommandLine "-r">] Retries of int
| [<AltCommandLine "-rt">] RetriesWaitTime of float
interface IArgParserTemplate with
member p.Usage = p |> function
| Verbose -> "request verbose logging."
| ConnectionMode _ -> "override the connection mode. Default: Direct."
| Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)"
| Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)"
| Container _ -> "specify a container name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)"
| Timeout _ -> "specify operation timeout in seconds (default: 30)."
| Retries _ -> "specify operation retries (default: 9)."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 30)"
and CosmosArguments(c: Configuration, p: ParseResults<CosmosParameters>) =
let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString
let mode = p.TryGetResult ConnectionMode
let timeout = p.GetResult(Timeout, 30.) |> TimeSpan.FromSeconds
let retries = p.GetResult(Retries, 9)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 30.) |> TimeSpan.FromSeconds
let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode=mode)
let database = p.GetResult(Database, fun () -> c.CosmosDatabase)
let container = p.GetResult(Container, fun () -> c.CosmosContainer)
member val Verbose = p.Contains Verbose
member _.Connect(maxEvents) = connector.ConnectContext("Main", database, container, maxEvents)
/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
let parse tryGetConfigValue argv =
let programName = Reflection.Assembly.GetEntryAssembly().GetName().Name
let parser = ArgumentParser.Create<Parameters>(programName = programName)
Arguments(Configuration tryGetConfigValue, parser.ParseCommandLine argv)
let [<Literal>] AppName = "FeedConsumerTemplate"
let build (args: Args.Arguments) =
let cache = Equinox.Cache(AppName, sizeMb = 10)
let context = args.Cosmos.Connect(maxEvents = 256) |> Async.RunSynchronously
let sink =
let stats = Ingester.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
let handle = Ingester.handle args.TicketsDop
Ingester.Factory.StartSink(Log.Logger, stats, args.FcsDop, handle, args.MaxReadAhead)
let source =
let checkpoints = Propulsion.Feed.ReaderCheckpoint.CosmosStore.create Store.log (args.GroupId, args.CheckpointInterval) (context, cache)
let feed = ApiClient.TicketsFeed args.BaseUri
let source =
Propulsion.Feed.FeedSource(
Log.Logger, args.StatsInterval, args.SourceId, args.TailSleepInterval,
checkpoints, sink)
source.Start(feed.ReadTranches, fun t p -> feed.Poll(t, p))
sink, source
let run args = async {
let sink, source = build args
use _ = sink
use _ = source
do! Async.Parallel [ source.AwaitWithStopOnCancellation(); sink.AwaitWithStopOnCancellation() ] |> Async.Ignore<unit[]>
return if sink.RanToCompletion then 0 else 3
}
[<EntryPoint>]
let main argv =
try let args = Args.parse EnvVar.tryGet argv
try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName) args.SourceId
Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger()
try run args |> Async.RunSynchronously
with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
| e -> eprintf $"Exception %s{e.Message}"; 1