diff --git a/global.json b/global.json index 3a7f067d5..21e27cede 100644 --- a/global.json +++ b/global.json @@ -1,5 +1,5 @@ { "sdk": { - "version": "2.1.504" + "version": "2.1.500" } } \ No newline at end of file diff --git a/propulsion-all-projector/.template.config/template.json b/propulsion-all-projector/.template.config/template.json new file mode 100644 index 000000000..27a29bf7f --- /dev/null +++ b/propulsion-all-projector/.template.config/template.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json.schemastore.org/template", + "author": "@jet @bartelink", + "classifications": [ + "Event Sourcing", + "Equinox", + "Propulsion", + "EventStore", + "Kafka" + ], + "tags": { + "language": "F#" + }, + "identity": "Propulsion.Template.AllProjector", + "name": "Propulsion EventStore $all Projector", + "shortName": "proAllProjector", + "sourceName": "ProjectorTemplate", + "preferNameDirectory": true, + + "symbols": { + "kafka": { + "type": "parameter", + "datatype": "bool", + "isRequired": false, + "defaultValue": "false", + "description": "Include code projecting to Kafka." + } + } +} \ No newline at end of file diff --git a/propulsion-all-projector/AllProjector.fsproj b/propulsion-all-projector/AllProjector.fsproj new file mode 100644 index 000000000..5b68a3c30 --- /dev/null +++ b/propulsion-all-projector/AllProjector.fsproj @@ -0,0 +1,24 @@ + + + + Exe + netcoreapp2.1 + 5 + + + + + + + + + + + + + + + + + + diff --git a/propulsion-all-projector/Handler.fs b/propulsion-all-projector/Handler.fs new file mode 100644 index 000000000..17a9eee19 --- /dev/null +++ b/propulsion-all-projector/Handler.fs @@ -0,0 +1,19 @@ +module ProjectorTemplate.Handler + +open Propulsion.EventStore + +/// Responsible for inspecting and then either dropping or tweaking events coming from EventStore +// NB the `index` needs to be contiguous with existing events - IOW filtering needs to be at stream (and not event) level +let tryMapEvent filterByStreamName (x : EventStore.ClientAPI.ResolvedEvent) = + match x.Event with + | e when not e.IsJson || e.EventStreamId.StartsWith "$" || not (filterByStreamName e.EventStreamId) -> None + | PropulsionStreamEvent e -> Some e + +/// Responsible for wrapping a span of events for a specific stream into an envelope (we use the well-known Propulsion.Codec form) +/// Most manipulation should take place before events enter the scheduler +let render (stream: FsCodec.StreamName, span: Propulsion.Streams.StreamSpan<_>) = async { + let value = + span + |> Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream + |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize + return FsCodec.StreamName.toString stream, value } \ No newline at end of file diff --git a/propulsion-all-projector/Program.fs b/propulsion-all-projector/Program.fs new file mode 100644 index 000000000..87fa1cd74 --- /dev/null +++ b/propulsion-all-projector/Program.fs @@ -0,0 +1,306 @@ +module ProjectorTemplate.Program + +open Propulsion.EventStore +open Serilog +open System + +module EnvVar = + + let tryGet varName : string option = Environment.GetEnvironmentVariable varName |> Option.ofObj + let set varName value : unit = Environment.SetEnvironmentVariable(varName, value) + +module Settings = + + let private initEnvVar var key loadF = + if None = EnvVar.tryGet var then + printfn "Setting %s from %A" var key + EnvVar.set var (loadF key) + + let initialize () = + // e.g. initEnvVar "EQUINOX_COSMOS_COLLECTION" "CONSUL KEY" readFromConsul + () // TODO add any custom logic preprocessing commandline arguments and/or gathering custom defaults from external sources, etc + +module CmdParser = + + exception MissingArg of string + let private getEnvVarForArgumentOrThrow varName argName = + match EnvVar.tryGet varName with + | None -> raise (MissingArg(sprintf "Please provide a %s, either as an argument or via the %s environment variable" argName varName)) + | Some x -> x + let private defaultWithEnvVar varName argName = function + | None -> getEnvVarForArgumentOrThrow varName argName + | Some x -> x + open Argu + open Equinox.Cosmos + open Equinox.EventStore + [] + type Parameters = + | [] ConsumerGroupName of string + | [] MaxReadAhead of int + | [] MaxWriters of int + | [] Verbose + | [] VerboseConsole + + | [] CategoryBlacklist of string + | [] CategoryWhitelist of string + + | [] Es of ParseResults + interface IArgParserTemplate with + member a.Usage = + match a with + | ConsumerGroupName _ -> "Projector consumer group name." + | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 16." + | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 64." + | Verbose -> "request Verbose Logging. Default: off." + | VerboseConsole -> "request Verbose Console Logging. Default: off." + | CategoryBlacklist _ -> "category whitelist" + | CategoryWhitelist _ -> "category blacklist" + | Es _ -> "specify EventStore input parameters." + and Arguments(a : ParseResults) = + member __.ConsumerGroupName = a.GetResult ConsumerGroupName + member __.Verbose = a.Contains Parameters.Verbose + member __.VerboseConsole = a.Contains VerboseConsole + member __.MaxReadAhead = a.GetResult(MaxReadAhead, 16) + member __.MaxConcurrentStreams = a.GetResult(MaxWriters, 64) + member __.StatsInterval = TimeSpan.FromMinutes 1. + member __.FilterFunction(?excludeLong, ?longOnly): string -> bool = + let isLong (streamName : string) = + streamName.StartsWith "Inventory-" // Too long + || streamName.StartsWith "InventoryCount-" // No Longer used + || streamName.StartsWith "InventoryLog" // 5GB, causes lopsided partitions, unused + let excludeLong = defaultArg excludeLong true + match a.GetResults CategoryBlacklist, a.GetResults CategoryWhitelist with + | [], [] when longOnly = Some true -> + Log.Information("Only including long streams") + isLong + | [], [] -> + let black = set [ + "SkuFileUpload-534e4362c641461ca27e3d23547f0852" + "SkuFileUpload-778f1efeab214f5bab2860d1f802ef24" + "PurchaseOrder-5791" ] + let isCheckpoint (streamName : string) = + streamName.EndsWith "_checkpoint" + || streamName.EndsWith "_checkpoints" + || streamName.StartsWith "#serial" + || streamName.StartsWith "marvel_bookmark" + Log.Information("Using well-known stream blacklist {black} excluding checkpoints and #serial streams, excluding long streams: {excludeLong}", black, excludeLong) + fun x -> not (black.Contains x) && (not << isCheckpoint) x && (not excludeLong || (not << isLong) x) + | bad, [] -> let black = Set.ofList bad in Log.Warning("Excluding categories: {cats}", black); fun x -> not (black.Contains x) + | [], good -> let white = Set.ofList good in Log.Warning("Only copying categories: {cats}", white); fun x -> white.Contains x + | _, _ -> raise (MissingArg "BlackList and Whitelist are mutually exclusive; inclusions and exclusions cannot be mixed") + + member val Source : EsSourceArguments = + match a.TryGetSubCommand() with + | Some (Es es) -> EsSourceArguments es + | _ -> raise (MissingArg "Must specify es for Src") + member x.SourceParams() : EsSourceArguments*CosmosArguments*ReaderSpec = + let srcE = x.Source + let startPos, cosmos = srcE.StartPos, srcE.CheckpointStore + Log.Information("Processing Consumer Group {groupName} from {startPos} (force: {forceRestart}) in Database {db} Container {container}", + x.ConsumerGroupName, startPos, srcE.ForceRestart, cosmos.Database, cosmos.Container) + Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}], reading up to {maxReadAhead} uncommitted batches ahead", + srcE.MinBatchSize, srcE.StartingBatchSize, x.MaxReadAhead) + srcE, cosmos, + { groupName = x.ConsumerGroupName; start = startPos; checkpointInterval = srcE.CheckpointInterval; tailInterval = srcE.TailInterval + forceRestart = srcE.ForceRestart + batchSize = srcE.StartingBatchSize; minBatchSize = srcE.MinBatchSize; gorge = srcE.Gorge; streamReaders = 0 } + and [] EsSourceParameters = + | [] FromTail + | [] Gorge of int + | [] Tail of intervalS: float + | [] ForceRestart + | [] BatchSize of int + | [] MinBatchSize of int + | [] Position of int64 + | [] Chunk of int + | [] Percent of float + + | [] Verbose + | [] Timeout of float + | [] Retries of int + | [] HeartbeatTimeout of float + | [] Host of string + | [] Port of int + | [] Username of string + | [] Password of string + + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | FromTail -> "Start the processing from the Tail" + | Gorge _ -> "Request Parallel readers phase during initial catchup, running one chunk (256MB) apart. Default: off" + | Tail _ -> "attempt to read from tail at specified interval in Seconds. Default: 1" + | ForceRestart _ -> "Forget the current committed position; start from (and commit) specified position. Default: start from specified position or resume from committed." + | BatchSize _ -> "maximum item count to request from feed. Default: 4096" + | MinBatchSize _ -> "minimum item count to drop down to in reaction to read failures. Default: 512" + | Position _ -> "EventStore $all Stream Position to commence from" + | Chunk _ -> "EventStore $all Chunk to commence from" + | Percent _ -> "EventStore $all Stream Position to commence from (as a percentage of current tail position)" + + | Verbose -> "Include low level Store logging." + | Host _ -> "specify a DNS query, using Gossip-driven discovery against all A records returned. (optional if environment variable EQUINOX_ES_HOST specified)" + | Port _ -> "specify a custom port. Defaults: envvar:EQUINOX_ES_PORT, 30778." + | Username _ -> "specify a username. (optional if environment variable EQUINOX_ES_USERNAME specified)" + | Password _ -> "specify a Password. (optional if environment variable EQUINOX_ES_PASSWORD specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 20." + | Retries _ -> "specify operation retries. Default: 3." + | HeartbeatTimeout _ -> "specify heartbeat timeout in seconds. Default: 1.5." + + | Cosmos _ -> "CosmosDb Checkpoint Store parameters." + and EsSourceArguments(a : ParseResults) = + member __.Gorge = a.TryGetResult Gorge + member __.TailInterval = a.GetResult(Tail, 1.) |> TimeSpan.FromSeconds + member __.ForceRestart = a.Contains ForceRestart + member __.StartingBatchSize = a.GetResult(BatchSize, 4096) + member __.MinBatchSize = a.GetResult(MinBatchSize, 512) + member __.StartPos = + match a.TryGetResult Position, a.TryGetResult Chunk, a.TryGetResult Percent, a.Contains EsSourceParameters.FromTail with + | Some p, _, _, _ -> Absolute p + | _, Some c, _, _ -> StartPos.Chunk c + | _, _, Some p, _ -> Percentage p + | None, None, None, true -> StartPos.TailOrCheckpoint + | None, None, None, _ -> StartPos.StartOrCheckpoint + + member __.Discovery = match __.Port with Some p -> Discovery.GossipDnsCustomPort (__.Host, p) | None -> Discovery.GossipDns __.Host + member __.Port = match a.TryGetResult Port with Some x -> Some x | None -> EnvVar.tryGet "EQUINOX_ES_PORT" |> Option.map int + member __.Host = a.TryGetResult Host |> defaultWithEnvVar "EQUINOX_ES_HOST" "Host" + member __.User = a.TryGetResult Username |> defaultWithEnvVar "EQUINOX_ES_USERNAME" "Username" + member __.Password = a.TryGetResult Password |> defaultWithEnvVar "EQUINOX_ES_PASSWORD" "Password" + member __.Retries = a.GetResult(EsSourceParameters.Retries, 3) + member __.Timeout = a.GetResult(EsSourceParameters.Timeout, 20.) |> TimeSpan.FromSeconds + member __.Heartbeat = a.GetResult(HeartbeatTimeout, 1.5) |> TimeSpan.FromSeconds + member x.Connect(log: ILogger, storeLog: ILogger, appName, connectionStrategy) = + let s (x : TimeSpan) = x.TotalSeconds + let discovery = x.Discovery + log.ForContext("host", x.Host).ForContext("port", x.Port) + .Information("EventStore {discovery} heartbeat: {heartbeat}s Timeout: {timeout}s Retries {retries}", + discovery, s x.Heartbeat, s x.Timeout, x.Retries) + let log=if storeLog.IsEnabled Serilog.Events.LogEventLevel.Debug then Logger.SerilogVerbose storeLog else Logger.SerilogNormal storeLog + let tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string] + Connector(x.User, x.Password, x.Timeout, x.Retries, log=log, heartbeatTimeout=x.Heartbeat, tags=tags) + .Establish(appName, discovery, connectionStrategy) |> Async.RunSynchronously + + member __.CheckpointInterval = TimeSpan.FromHours 1. + member val CheckpointStore : CosmosArguments = + match a.TryGetSubCommand() with + | Some (EsSourceParameters.Cosmos cosmos) -> CosmosArguments cosmos + | _ -> raise (MissingArg "Must specify `cosmos` checkpoint store source is `es`") + and [] CosmosParameters = + | [] Connection of string + | [] ConnectionMode of ConnectionMode + | [] Database of string + | [] Container of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + | [] Kafka of ParseResults + interface IArgParserTemplate with + member a.Usage = + match a with + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" + | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" + | Container _ -> "specify a container name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + | Kafka _ -> "Kafka Sink parameters." + and CosmosArguments(a : ParseResults) = + member __.Mode = a.GetResult(CosmosParameters.ConnectionMode, Equinox.Cosmos.ConnectionMode.Direct) + member __.Connection = a.TryGetResult CosmosParameters.Connection |> defaultWithEnvVar "EQUINOX_COSMOS_CONNECTION" "Connection" + member __.Database = a.TryGetResult CosmosParameters.Database |> defaultWithEnvVar "EQUINOX_COSMOS_DATABASE" "Database" + member __.Container = a.TryGetResult CosmosParameters.Container |> defaultWithEnvVar "EQUINOX_COSMOS_CONTAINER" "Container" + member __.Timeout = a.GetResult(CosmosParameters.Timeout, 5.) |> TimeSpan.FromSeconds + member __.Retries = a.GetResult(CosmosParameters.Retries, 1) + member __.MaxRetryWaitTime = a.GetResult(CosmosParameters.RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + member x.BuildConnectionDetails() = + let (Discovery.UriAndKey (endpointUri, _) as discovery) = Discovery.FromConnectionString x.Connection + Log.Information("CosmosDb {mode} {endpointUri} Database {database} Container {container}", + x.Mode, endpointUri, x.Database, x.Container) + Log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s", + (let t = x.Timeout in t.TotalSeconds), x.Retries, (let t = x.MaxRetryWaitTime in t.TotalSeconds)) + let connector = Equinox.Cosmos.Connector(x.Timeout, x.Retries, x.MaxRetryWaitTime, Log.Logger, mode=x.Mode) + discovery, x.Database, x.Container, connector + member val Sink = + match a.TryGetSubCommand() with + | Some (CosmosParameters.Kafka kafka) -> KafkaSinkArguments kafka + | _ -> raise (MissingArg "Must specify `kafka` arguments") + and [] KafkaSinkParameters = + | [] Broker of string + | [] Topic of string + interface IArgParserTemplate with + member a.Usage = a |> function + | Broker _ -> "specify Kafka Broker, in host:port format. (optional if environment variable PROPULSION_KAFKA_BROKER specified)" + | Topic _ -> "specify Kafka Topic Id. (optional if environment variable PROPULSION_KAFKA_TOPIC specified)" + and KafkaSinkArguments(a : ParseResults) = + member __.Broker = a.TryGetResult Broker |> defaultWithEnvVar "PROPULSION_KAFKA_BROKER" "Broker" |> Uri + member __.Topic = a.TryGetResult Topic |> defaultWithEnvVar "PROPULSION_KAFKA_TOPIC" "Topic" + member x.BuildTargetParams() = x.Broker, x.Topic + + /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args + let parse argv = + let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name + let parser = ArgumentParser.Create(programName = programName) + parser.ParseCommandLine argv |> Arguments + +module Logging = + + let initialize verbose changeLogVerbose = + Log.Logger <- + LoggerConfiguration() + .Destructure.FSharpTypes() + .Enrich.FromLogContext() + |> fun c -> if verbose then c.MinimumLevel.Debug() else c + |> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {partitionKeyRangeId,2} {Message:lj} {NewLine}{Exception}" + c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) + |> fun c -> c.CreateLogger() + +let [] AppName = "ProjectorTemplate" + +module EventStoreContext = + let cache = Equinox.Cache(AppName, sizeMb = 10) + let create connection = Equinox.EventStore.Context(connection, Equinox.EventStore.BatchingPolicy(maxBatchSize=500)) + +let build (args : CmdParser.Arguments) = + let (srcE, cosmos, spec) = args.SourceParams() + let filterByStreamName = args.FilterFunction() + let (discovery, database, container, connector), (broker, topic) = cosmos.BuildConnectionDetails(), cosmos.Sink.BuildTargetParams() + let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic) + + let cache = Equinox.Cache(AppName, sizeMb = 10) + let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously + let context = Equinox.Cosmos.Context(connection, database, container) + + let resolveCheckpointStream = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let caching = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + let access = Equinox.Cosmos.AccessStrategy.Custom (Checkpoint.Fold.isOrigin, Checkpoint.Fold.transmute) + fun target -> Equinox.Cosmos.Resolver(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access).Resolve(target, Equinox.AllowStale) + let checkpoints = Checkpoint.CheckpointSeries(spec.groupName, Log.ForContext(), resolveCheckpointStream) + + let sink = + Propulsion.Kafka.StreamsProducerSink.Start( + Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, Handler.render, producer, + statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 2.) + + let connect () = let c = srcE.Connect(Log.Logger, Log.Logger, AppName, Equinox.EventStore.ConnectionStrategy.ClusterSingle Equinox.EventStore.NodePreference.PreferSlave) in c.ReadConnection + sink, EventStoreSource.Run( + Log.Logger, sink, checkpoints, connect, spec, Handler.tryMapEvent filterByStreamName, + args.MaxReadAhead, args.StatsInterval) + +let run argv = + try let args = CmdParser.parse argv + Logging.initialize args.Verbose args.VerboseConsole + Settings.initialize () + let projector, runSourcePipeline = build args + runSourcePipeline |> Async.Start + projector.AwaitCompletion() |> Async.RunSynchronously + if projector.RanToCompletion then 0 else 2 + with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 + | CmdParser.MissingArg msg -> eprintfn "%s" msg; 1 + | e -> Log.Fatal(e, "Exiting"); 1 + +[] +let main argv = + try run argv + finally Log.CloseAndFlush() \ No newline at end of file diff --git a/propulsion-all-projector/README.md b/propulsion-all-projector/README.md new file mode 100644 index 000000000..da6df2588 --- /dev/null +++ b/propulsion-all-projector/README.md @@ -0,0 +1,62 @@ +# Propulsion CosmosDb -> Kafka Summary Projector + +This project was generated using: + + dotnet new -i Equinox.Templates # just once, to install/update in the local templates store + dotnet new summaryProjector + +## Usage instructions + +0. establish connection strings etc. for the container from which the summaries will be generated per https://github.com/jet/equinox README + + $env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;" # or use -s + $env:EQUINOX_COSMOS_DATABASE="equinox-test" # or use -d + $env:EQUINOX_COSMOS_CONTAINER="equinox-test" # or use - c + +1a. Use the `eqx` tool to initialize and then run some transactions in a CosmosDb container + + dotnet tool install -g Equinox.Tool # only needed once + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + + # generate a cosmos container to store events in + eqx init -ru 1000 cosmos + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + # `-t todo` Todo test produces items the sample Aggregate logic knows how to summarize + # `-C -f 200` constrains current writers to 100 and applies caching so RU consumption is constrained such that an allocation of 1000 is sufficient + eqx run -t todo -C -f 100 cosmos + +1b. To run an instance of the Projector from CosmosDb + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + + $env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b + + # `-g default` defines the Projector Group identity - each id has separated state in the aux container (aka LeaseId) + # `cosmos` specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) + # `-m 1000` sets the change feed maximum document limit to 1000 + # `-t topic0` identifies the Kafka topic to which the Projector should write + dotnet run -- -g default cosmos -m 1000 kafka -t topic0 + + # (assuming you've scaled up enough to have >1 range, you can run a second instance in a second console with the same arguments) + +2. To run an instance of the Projector from EventStore + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them after the `cosmos` argument token) + + $env:EQUINOX_ES_USERNAME="admin" # or use -u + $env:EQUINOX_ES_PASSWORD="changeit" # or use -p + $env:EQUINOX_ES_HOST="localhost" # or use -g + + $env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b + + # `-g default` defines the Projector Group identity - each id has separated state in the aux container (aka LeaseId) + # `es` specifies the source (if you have specified 3x EQUINOX_ES_* environment vars, no arguments are needed) + # `cosmos` specifies the destination and the checkpoint store (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) + # `-t topic0` identifies the Kafka topic to which the Projector should write + dotnet run -- -g default es cosmos kafka -t topic0 + + # NB running more than one projector will cause them to duel, and is hence not advised + +3. To create a Consumer, use `dotnet new summaryConsumer` (see README therein for details) \ No newline at end of file