diff --git a/.gitignore b/.gitignore
index 6486412d..9612cb6e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,6 +25,7 @@ packages.config
## JetBrains Rider
.idea/
*.sln.iml
+*.DotSettings.user
## CodeRush
.cr/
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bf1a2cea..86554491 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -20,6 +20,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.EventStoreDb`: Ported `EventStore` to target `Equinox.EventStore` >= `4.0.0` (using the gRPC interface) [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore3`: Special cased version of `Propulsion.CosmosStore` to target `Equinox.CosmosStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.CosmosStore` by updating `Equinox.CosmosStore` dependencies to `4.0.0`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/143) [#140](https://github.com/jet/propulsion/pull/143) [#177](https://github.com/jet/propulsion/pull/177)
+- `Propulsion.MessageDb`: `FeedSource` for [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord)
- `Propulsion.MemoryStore`: `MemoryStoreSource` to align with other sources for integration testing. Includes *deterministic* `AwaitCompletion` as per `Propulsion.Feed`-based Sources [#165](https://github.com/jet/propulsion/pull/165)
- `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
diff --git a/Propulsion.sln b/Propulsion.sln
index ef90beba..91c6dbcc 100644
--- a/Propulsion.sln
+++ b/Propulsion.sln
@@ -55,6 +55,10 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MemoryStore", "s
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.DynamoStore.Lambda", "src\Propulsion.DynamoStore.Lambda\Propulsion.DynamoStore.Lambda.fsproj", "{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}"
EndProject
+Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MessageDb", "src\Propulsion.MessageDb\Propulsion.MessageDb.fsproj", "{BCAEFE2C-8D09-4F4E-B27E-62077497C752}"
+EndProject
+Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MessageDb.Integration", "tests\Propulsion.MessageDb.Integration\Propulsion.MessageDb.Integration.fsproj", "{9738D2C1-EE7C-400F-8B14-31B5B7B66839}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -149,6 +153,14 @@ Global
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/README.md b/README.md
index d649f35d..1fd7766d 100644
--- a/README.md
+++ b/README.md
@@ -90,6 +90,10 @@ If you're looking for a good discussion forum on these kinds of topics, look no
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.7.0`-`1.9.99`, `Serilog`
+- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion.Feed`, `Npgsql` >= `6.0.7`
+ 1. `MessageDbSource`: reading from one or more MessageDb categories into a `Propulsion.Sink`
+ 2. `CheckpointStore`: checkpoint storage for `Propulsion.Feed` using `Npgsql` (can be initialized via `propulsion initpg -c connstr -s schema`)
+
- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL table using Dapper. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion.Feed`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3`, `Serilog`
1. `SqlStreamStoreSource`: reading from a SqlStreamStore `$all` stream into a `Propulsion.Sink`
@@ -106,8 +110,9 @@ The ubiquitous `Serilog` dependency is solely on the core module, not any sinks.
- CosmosDB: Initialize `-aux` Container for ChangeFeedProcessor
- CosmosDB/DynamoStore/EventStoreDB/Feed/SqlStreamStore: adjust checkpoints
- - CosmosDB/DynamoStore/EventStoreDB: walk change feeds/indexes and/or project to Kafka
+ - CosmosDB/DynamoStore/EventStoreDB/MessageDb: walk change feeds/indexes and/or project to Kafka
- DynamoStore: validate and/or reindex DynamoStore Index
+ - MessageDb: Initialize a checkpoints table in a Postgres Database
## Deprecated components
diff --git a/build.proj b/build.proj
index df64beac..68a15169 100644
--- a/build.proj
+++ b/build.proj
@@ -23,6 +23,7 @@
+
diff --git a/src/Propulsion.MessageDb/Internal.fs b/src/Propulsion.MessageDb/Internal.fs
new file mode 100644
index 00000000..8b842704
--- /dev/null
+++ b/src/Propulsion.MessageDb/Internal.fs
@@ -0,0 +1,13 @@
+namespace Propulsion.MessageDb
+
+open FSharp.UMX
+open Npgsql
+
+module internal FeedSourceId =
+ let wellKnownId : Propulsion.Feed.SourceId = UMX.tag "messageDb"
+
+module internal Npgsql =
+ let connect connectionString ct = task {
+ let conn = new NpgsqlConnection(connectionString)
+ do! conn.OpenAsync(ct)
+ return conn }
diff --git a/src/Propulsion.MessageDb/MessageDbSource.fs b/src/Propulsion.MessageDb/MessageDbSource.fs
new file mode 100644
index 00000000..5e02322b
--- /dev/null
+++ b/src/Propulsion.MessageDb/MessageDbSource.fs
@@ -0,0 +1,121 @@
+namespace Propulsion.MessageDb
+
+open FSharp.Control
+open FsCodec
+open FsCodec.Core
+open NpgsqlTypes
+open Propulsion.Feed
+open Propulsion.Feed.Core
+open Propulsion.Internal
+open System
+open System.Data.Common
+open System.Diagnostics
+
+
+module Core =
+ type MessageDbCategoryClient(connectionString) =
+ let connect = Npgsql.connect connectionString
+ let parseRow (reader: DbDataReader) =
+ let readNullableString idx = if reader.IsDBNull(idx) then None else Some (reader.GetString idx)
+ let streamName = reader.GetString(8)
+ let event = TimelineEvent.Create(
+ index = reader.GetInt64(0),
+ eventType = reader.GetString(1),
+ data = ReadOnlyMemory(Text.Encoding.UTF8.GetBytes(reader.GetString 2)),
+ meta = ReadOnlyMemory(Text.Encoding.UTF8.GetBytes(reader.GetString 3)),
+ eventId = reader.GetGuid(4),
+ ?correlationId = readNullableString 5,
+ ?causationId = readNullableString 6,
+ context = reader.GetInt64(9),
+ timestamp = DateTimeOffset(DateTime.SpecifyKind(reader.GetDateTime(7), DateTimeKind.Utc)))
+
+ struct(StreamName.parse streamName, event)
+ member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct) = task {
+ use! conn = connect ct
+ let command = conn.CreateCommand(CommandText = "select position, type, data, metadata, id::uuid,
+ (metadata::jsonb->>'$correlationId')::text,
+ (metadata::jsonb->>'$causationId')::text,
+ time, stream_name, global_position
+ from get_category_messages(@Category, @Position, @BatchSize);")
+ command.Parameters.AddWithValue("Category", NpgsqlDbType.Text, TrancheId.toString category) |> ignore
+ command.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, fromPositionInclusive) |> ignore
+ command.Parameters.AddWithValue("BatchSize", NpgsqlDbType.Bigint, int64 batchSize) |> ignore
+
+ let mutable checkpoint = fromPositionInclusive
+
+ use! reader = command.ExecuteReaderAsync(ct)
+ let events = [| while reader.Read() do yield parseRow reader |]
+
+ checkpoint <- match Array.tryLast events with Some (_, ev) -> unbox ev.Context | None -> checkpoint
+
+ return { checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 } }
+ member _.ReadCategoryLastVersion(category: TrancheId, ct) = task {
+ use! conn = connect ct
+ let command = conn.CreateCommand(CommandText = "select max(global_position) from messages where category(stream_name) = @Category;")
+ command.Parameters.AddWithValue("Category", NpgsqlDbType.Text, TrancheId.toString category) |> ignore
+
+ use! reader = command.ExecuteReaderAsync(ct)
+ return if reader.Read() then reader.GetInt64(0) else 0L }
+
+module private Impl =
+ open Core
+ open Propulsion.Infrastructure // AwaitTaskCorrect
+
+ let readBatch batchSize (store : MessageDbCategoryClient) (category, pos) : Async> = async {
+ let! ct = Async.CancellationToken
+ let positionInclusive = Position.toInt64 pos
+ let! x = store.ReadCategoryMessages(category, positionInclusive, batchSize, ct) |> Async.AwaitTaskCorrect
+ return x }
+
+ let readTailPositionForTranche (store : MessageDbCategoryClient) trancheId : Async = async {
+ let! ct = Async.CancellationToken
+ let! lastEventPos = store.ReadCategoryLastVersion(trancheId, ct) |> Async.AwaitTaskCorrect
+ return Position.parse lastEventPos }
+
+type MessageDbSource
+ ( log : Serilog.ILogger, statsInterval,
+ client: Core.MessageDbCategoryClient, batchSize, tailSleepInterval,
+ checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
+ categories,
+ // Override default start position to be at the tail of the index. Default: Replay all events.
+ ?startFromTail,
+ ?sourceId) =
+ inherit Propulsion.Feed.Core.TailingFeedSource
+ ( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval, checkpoints,
+ ( if startFromTail <> Some true then None
+ else Some (Impl.readTailPositionForTranche client)),
+ sink,
+ (fun req -> asyncSeq {
+ let sw = Stopwatch.StartNew()
+ let! b = Impl.readBatch batchSize client req
+ yield sw.Elapsed, b }),
+ string)
+ new (log, statsInterval, connectionString, batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail, ?sourceId) =
+ MessageDbSource(log, statsInterval, Core.MessageDbCategoryClient(connectionString),
+ batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail=startFromTail, ?sourceId=sourceId)
+
+ abstract member ListTranches : unit -> Async
+ default _.ListTranches() = async { return categories |> Array.map TrancheId.parse }
+
+ abstract member Pump : unit -> Async
+ default x.Pump() = base.Pump(x.ListTranches)
+
+ abstract member Start : unit -> Propulsion.SourcePipeline
+ default x.Start() = base.Start(x.Pump())
+
+
+ /// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed
+ member x.RunUntilCaughtUp(timeout : TimeSpan, statsInterval : IntervalTimer) = task {
+ let sw = Stopwatch.start ()
+ use pipeline = x.Start()
+
+ try System.Threading.Tasks.Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore
+
+ let initialReaderTimeout = TimeSpan.FromMinutes 1.
+ do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30)
+ pipeline.Stop()
+
+ if sw.ElapsedSeconds > 2 then statsInterval.Trigger()
+ // force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals)
+ return! x.Checkpoint()
+ finally statsInterval.SleepUntilTriggerCleared() }
diff --git a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj
new file mode 100644
index 00000000..3d4bde9c
--- /dev/null
+++ b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj
@@ -0,0 +1,28 @@
+
+
+
+ net6.0
+ true
+
+
+
+
+ Infrastructure.fs
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Propulsion.MessageDb/ReaderCheckpoint.fs b/src/Propulsion.MessageDb/ReaderCheckpoint.fs
new file mode 100644
index 00000000..e20fc2f4
--- /dev/null
+++ b/src/Propulsion.MessageDb/ReaderCheckpoint.fs
@@ -0,0 +1,73 @@
+module Propulsion.MessageDb.ReaderCheckpoint
+
+open Npgsql
+open NpgsqlTypes
+open Propulsion.Feed
+open Propulsion.Infrastructure
+
+
+let table = "propulsion_checkpoint"
+
+let createIfNotExists (conn : NpgsqlConnection, schema: string) =
+ let cmd = conn.CreateCommand(CommandText = $"create table if not exists {schema}.{table} (
+ source text not null,
+ tranche text not null,
+ consumer_group text not null,
+ position bigint not null,
+ primary key (source, tranche, consumer_group));")
+ cmd.ExecuteNonQueryAsync() |> Async.AwaitTaskCorrect |> Async.Ignore
+
+let commitPosition (conn : NpgsqlConnection, schema: string) source tranche (consumerGroup : string) (position : int64)
+ = async {
+ let cmd = conn.CreateCommand(CommandText = $"insert into {schema}.{table}(source, tranche, consumer_group, position)
+ values (@Source, @Tranche, @ConsumerGroup, @Position)
+ on conflict (source, tranche, consumer_group)
+ do update set position = @Position;")
+ cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
+ cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
+ cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore
+ cmd.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, position) |> ignore
+
+ let! ct = Async.CancellationToken
+ do! cmd.ExecuteNonQueryAsync(ct) |> Async.AwaitTaskCorrect |> Async.Ignore }
+
+let tryGetPosition (conn : NpgsqlConnection, schema : string) source tranche (consumerGroup : string) = async {
+ let cmd = conn.CreateCommand(CommandText = $"select position from {schema}.{table}
+ where source = @Source
+ and tranche = @Tranche
+ and consumer_group = @ConsumerGroup")
+ cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
+ cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
+ cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore
+
+ let! ct = Async.CancellationToken
+ use! reader = cmd.ExecuteReaderAsync(ct) |> Async.AwaitTaskCorrect
+ return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone }
+
+type CheckpointStore(connString : string, schema: string, consumerGroupName, defaultCheckpointFrequency) =
+ let connect = Npgsql.connect connString
+
+ member _.CreateSchemaIfNotExists() = async {
+ let! ct = Async.CancellationToken
+ use! conn = connect ct |> Async.AwaitTaskCorrect
+ return! createIfNotExists (conn, schema) }
+
+ interface IFeedCheckpointStore with
+
+ member _.Start(source, tranche, ?establishOrigin) = async {
+ let! ct = Async.CancellationToken
+ use! conn = connect ct |> Async.AwaitTaskCorrect
+ let! maybePos = tryGetPosition (conn, schema) source tranche consumerGroupName
+ let! pos =
+ match maybePos, establishOrigin with
+ | ValueSome pos, _ -> async { return Position.parse pos }
+ | ValueNone, Some f -> f
+ | ValueNone, None -> async { return Position.initial }
+ return defaultCheckpointFrequency, pos }
+
+ member _.Commit(source, tranche, pos) = async {
+ let! ct = Async.CancellationToken
+ use! conn = connect ct |> Async.AwaitTaskCorrect
+ return! commitPosition (conn, schema) source tranche consumerGroupName (Position.toInt64 pos) }
+
+
diff --git a/src/Propulsion.MessageDb/Readme.md b/src/Propulsion.MessageDb/Readme.md
new file mode 100644
index 00000000..d7c6cd14
--- /dev/null
+++ b/src/Propulsion.MessageDb/Readme.md
@@ -0,0 +1,51 @@
+# Propulsion.MessageDb
+
+This project houses a Propulsion source for [MessageDb](http://docs.eventide-project.org/user-guide/message-db/).
+
+## Quickstart
+
+The smallest possible sample looks like this, it is intended to give an overview of how the different pieces relate.
+For a more production ready example to take a look at [jets' templates](https://github.com/jet/dotnet-templates)
+
+```fsharp
+let quickStart log stats categories handle = async {
+ // The group is used as a key to store and retrieve checkpoints
+ let groupName = "MyGroup"
+ // The checkpoint store will receive the highest version
+ // that has been handled and flushes it to the
+ // table on an interval
+ let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Port=5433; Username=postgres; Password=postgres", "public", groupName, TimeSpan.FromSeconds 10)
+ // Creates the checkpoint table in the schema
+ // You can also create this manually
+ do! checkpoints.CreateSchemaIfNotExists()
+
+ let client = MessageDbCategoryClient("Host=localhost; Database=message_store; Port=5433; Username=message_store; Password=;")
+ let maxReadAhead = 100
+ let maxConcurrentStreams = 2
+ use sink =
+ Propulsion.Streams.Default.Config.Start(
+ log, maxReadAhead, maxConcurrentStreams,
+ handle, stats, TimeSpan.FromMinutes 1)
+
+ use src =
+ MessageDbSource(
+ log, statsInterval = TimeSpan.FromMinutes 1,
+ client, batchSize = 1000,
+ // Controls the time to wait once fully caught up
+ // before requesting a new batch of events
+ tailSleepInterval = TimeSpan.FromMilliseconds 100,
+ checkpoints, sink,
+ // An array of message-db categories to subscribe to
+ // Propulsion guarantees that events within streams are
+ // handled in order, it makes no guarantees across streams (Even within categories)
+ categories
+ ).Start()
+
+ do! src.AwaitShutdown() }
+
+let handle struct(stream, evts: StreamSpan<_>) = async {
+ // process the events
+ return struct (Propulsion.Streams.SpanResult.AllProcessed, ()) }
+
+quickStart Log.Logger (createStats ()) [| category |] handle
+```
diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs
index 56fa8967..e4215faa 100644
--- a/src/Propulsion/Internal.fs
+++ b/src/Propulsion/Internal.fs
@@ -127,13 +127,18 @@ module ValueOption =
module Seq =
- let tryPickV f xs = Seq.tryPick (f >> ValueOption.toOption) xs |> ValueOption.ofOption
- let inline chooseV f = Seq.choose (f >> ValueOption.toOption)
+ let tryPickV f (xs: _ seq) =
+ use e = xs.GetEnumerator()
+ let mutable res = ValueNone
+ while (ValueOption.isNone res && e.MoveNext()) do
+ res <- f e.Current
+ res
+ let inline chooseV f xs = seq { for x in xs do match f x with ValueSome v -> yield v | ValueNone -> () }
module Array =
let inline any xs = (not << Array.isEmpty) xs
- let inline chooseV f = Array.choose (f >> ValueOption.toOption)
+ let inline chooseV f xs = [| for item in xs do match f item with ValueSome v -> yield v | ValueNone -> () |]
module Stats =
diff --git a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj
new file mode 100644
index 00000000..28a4f670
--- /dev/null
+++ b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj
@@ -0,0 +1,32 @@
+
+
+
+ net6.0
+
+
+
+
+ Infrastructure.fs
+
+
+
+
+
+
+
+
+
+
+
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+
+
+
+
+
+
diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs
new file mode 100644
index 00000000..de960f1b
--- /dev/null
+++ b/tests/Propulsion.MessageDb.Integration/Tests.fs
@@ -0,0 +1,83 @@
+module Tests
+
+open System
+open System.Collections.Generic
+open System.Threading.Tasks
+open FSharp.Control
+open Npgsql
+open NpgsqlTypes
+open Propulsion.Feed
+open Propulsion.Streams
+open TypeShape.UnionContract
+open Xunit
+open Propulsion.MessageDb
+open Swensen.Unquote
+open Propulsion.Infrastructure
+open Propulsion.Internal
+
+module Simple =
+ type Hello = {name: string}
+ type Event =
+ | Hello of Hello
+ interface IUnionContract
+ let codec = FsCodec.SystemTextJson.Codec.Create()
+
+let writeMessagesToCategory category = task {
+ use conn = new NpgsqlConnection("Host=localhost; Port=5433; Username=message_store; Password=;")
+ do! conn.OpenAsync()
+ let batch = conn.CreateBatch()
+ for _ in 1..50 do
+ let streamName = $"{category}-{Guid.NewGuid():N}"
+ for _ in 1..20 do
+ let cmd = NpgsqlBatchCommand()
+ cmd.CommandText <- "select 1 from write_message(@Id::text, @StreamName, @EventType, @Data, 'null', null)"
+ cmd.Parameters.AddWithValue("Id", NpgsqlDbType.Uuid, Guid.NewGuid()) |> ignore
+ cmd.Parameters.AddWithValue("StreamName", NpgsqlDbType.Text, streamName) |> ignore
+ cmd.Parameters.AddWithValue("EventType", NpgsqlDbType.Text, "Hello") |> ignore
+ cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore
+
+ batch.BatchCommands.Add(cmd)
+ do! batch.ExecuteNonQueryAsync() :> Task }
+
+[]
+let ``It processes events for a category`` () = async {
+ let log = Serilog.Log.Logger
+ let consumerGroup = $"{Guid.NewGuid():N}"
+ let category1 = $"{Guid.NewGuid():N}"
+ let category2 = $"{Guid.NewGuid():N}"
+ do! writeMessagesToCategory category1 |> Async.AwaitTaskCorrect
+ do! writeMessagesToCategory category2 |> Async.AwaitTaskCorrect
+ let reader = Core.MessageDbCategoryClient("Host=localhost; Database=message_store; Port=5433; Username=message_store; Password=;")
+ let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Database=message_store; Port=5433; Username=postgres; Password=postgres", "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10)
+ do! checkpoints.CreateSchemaIfNotExists()
+ let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1)
+ with member _.HandleExn(log, x) = ()
+ member _.HandleOk x = () }
+ let stop = ref (fun () -> ())
+ let handled = HashSet<_>()
+ let handle struct(stream, evts: StreamSpan<_>) = async {
+ lock handled (fun _ -> for evt in evts do handled.Add((stream, evt.Index)) |> ignore)
+ test <@ Array.chooseV Simple.codec.TryDecode evts |> Array.forall ((=) (Simple.Hello {name = "world"})) @>
+ if handled.Count >= 2000 then
+ stop.contents()
+ return struct (Propulsion.Streams.SpanResult.AllProcessed, ()) }
+ use sink = Propulsion.Streams.Default.Config.Start(log, 2, 2, handle, stats, TimeSpan.FromMinutes 1)
+ let source = MessageDbSource(
+ log, TimeSpan.FromMinutes 1,
+ reader, 1000, TimeSpan.FromMilliseconds 100,
+ checkpoints, sink, [| category1; category2 |])
+ use src = source.Start()
+ // who says you can't do backwards referencing in F#
+ stop.contents <- src.Stop
+
+ Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore
+
+ do! src.AwaitShutdown()
+ // 2000 total events
+ test <@ handled.Count = 2000 @>
+ // 20 in each stream
+ test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @>
+ // they were handled in order within streams
+ let ordering = handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.map snd)
+ test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @>
+}
diff --git a/tools/Propulsion.Tool/Args.fs b/tools/Propulsion.Tool/Args.fs
index 1b54404a..303a128f 100644
--- a/tools/Propulsion.Tool/Args.fs
+++ b/tools/Propulsion.Tool/Args.fs
@@ -29,6 +29,10 @@ module Configuration =
let [] BROKER = "PROPULSION_KAFKA_BROKER"
let [] TOPIC = "PROPULSION_KAFKA_TOPIC"
+ module Mdb =
+ let [] CONNECTION_STRING = "MDB_CONNECTION_STRING"
+ let [] SCHEMA = "MDB_SCHEMA"
+
type Configuration(tryGet : string -> string option) =
member val tryGet = tryGet
@@ -50,6 +54,9 @@ type Configuration(tryGet : string -> string option) =
member x.KafkaBroker = x.get Configuration.Kafka.BROKER
member x.KafkaTopic = x.get Configuration.Kafka.TOPIC
+ member x.MdbConnectionString = x.get Configuration.Mdb.CONNECTION_STRING
+ member x.MdbSchema = x.get Configuration.Mdb.SCHEMA
+
module Cosmos =
open Configuration.Cosmos
@@ -253,3 +260,36 @@ module Dynamo =
member x.CreateCheckpointStore(group, cache, storeLog) =
let context = DynamoStoreContext.create indexReadClient.Value
Propulsion.Feed.ReaderCheckpoint.DynamoStore.create storeLog (group, checkpointInterval) (context, cache)
+
+module Mdb =
+ open Configuration.Mdb
+ open Npgsql
+ type [] Parameters =
+ | [] ConnectionString of string
+ | [] CheckpointConnectionString of string
+ | [] Schema of string
+ | [] Category of string
+ interface IArgParserTemplate with
+ member a.Usage = a |> function
+ | ConnectionString _ -> $"Connection string for the postgres database housing message-db. (Optional if environment variable {CONNECTION_STRING} is defined)"
+ | CheckpointConnectionString _ -> "Connection string used for the checkpoint store. If not specified, defaults to the connection string argument"
+ | Schema _ -> $"Schema that should contain the checkpoints table Optional if environment variable {SCHEMA} is defined"
+ | Category _ -> "The message-db categories to load"
+
+ type Arguments(c : Configuration, p : ParseResults) =
+ let conn = p.TryGetResult ConnectionString |> Option.defaultWith (fun () -> c.MdbConnectionString)
+ let checkpointConn = p.TryGetResult CheckpointConnectionString |> Option.defaultValue conn
+ let schema = p.TryGetResult Schema |> Option.defaultWith (fun () -> c.MdbSchema)
+
+ member x.CreateClient() = Array.ofList (p.GetResults Category), Propulsion.MessageDb.Core.MessageDbCategoryClient(conn)
+
+ member x.CreateCheckpointStore(group) =
+ Propulsion.MessageDb.ReaderCheckpoint.CheckpointStore(checkpointConn, schema, group, TimeSpan.FromSeconds 5.)
+ member x.CreateCheckpointStoreTable() = async {
+ let log = Log.Logger
+ let connStringWithoutPassword = NpgsqlConnectionStringBuilder(checkpointConn, Password = null)
+ log.Information("Authenticating with postgres using {connectionString}", connStringWithoutPassword.ToString())
+ log.Information("Creating checkpoints table as {table}", $"{schema}.{Propulsion.MessageDb.ReaderCheckpoint.table}")
+ let checkpointStore = x.CreateCheckpointStore("nil")
+ do! checkpointStore.CreateSchemaIfNotExists()
+ log.Information("Table created") }
diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs
index df5ef632..73c56125 100644
--- a/tools/Propulsion.Tool/Program.fs
+++ b/tools/Propulsion.Tool/Program.fs
@@ -1,6 +1,7 @@
module Propulsion.Tool.Program
open Argu
+open Propulsion.Feed
open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException
open Propulsion.Tool.Args
open Serilog
@@ -15,6 +16,7 @@ type Parameters =
| [] VerboseConsole
| [] VerboseStore
| [] Init of ParseResults
+ | [] InitPg of ParseResults
| [] Index of ParseResults
| [] Checkpoint of ParseResults
| [] Project of ParseResults
@@ -24,6 +26,7 @@ type Parameters =
| VerboseConsole -> "Include low level test and store actions logging in on-screen output to console."
| VerboseStore -> "Include low level Store logging"
| Init _ -> "Initialize auxiliary store (Supported for `cosmos` Only)."
+ | InitPg _ -> "Initialize a postgres checkpoint store"
| Index _ -> "Validate index (optionally, ingest events from a DynamoDB JSON S3 export to remediate missing events)."
| Checkpoint _ -> "Display or override checkpoints in Cosmos or Dynamo"
| Project _ -> "Project from store specified as the last argument."
@@ -109,19 +112,23 @@ and [] KafkaParameters =
| [] Broker of string
| [] Cosmos of ParseResults
| [] Dynamo of ParseResults
+ | [] Mdb of ParseResults
interface IArgParserTemplate with
member a.Usage = a |> function
| Topic _ -> "Specify target topic. Default: Use $env:PROPULSION_KAFKA_TOPIC"
| Broker _ -> "Specify target broker. Default: Use $env:PROPULSION_KAFKA_BROKER"
| Cosmos _ -> "Specify CosmosDB parameters."
| Dynamo _ -> "Specify DynamoDB parameters."
+ | Mdb _ -> "Specify MessageDb parameters."
and [] StatsParameters =
| [] Cosmos of ParseResults
| [] Dynamo of ParseResults
+ | [] Mdb of ParseResults
interface IArgParserTemplate with
member a.Usage = a |> function
| Cosmos _ -> "Specify CosmosDB parameters."
| Dynamo _ -> "Specify DynamoDB parameters."
+ | Mdb _ -> "Specify MessageDb parameters."
let [] appName = "propulsion-tool"
@@ -273,15 +280,17 @@ module Project =
member _.Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.KafkaTopic)
member val StoreArgs =
match p.GetSubCommand() with
- | KafkaParameters.Cosmos p -> Choice1Of2 (Args.Cosmos.Arguments (c, p))
- | KafkaParameters.Dynamo p -> Choice2Of2 (Args.Dynamo.Arguments (c, p))
+ | KafkaParameters.Cosmos p -> Choice1Of3 (Args.Cosmos.Arguments (c, p))
+ | KafkaParameters.Dynamo p -> Choice2Of3 (Args.Dynamo.Arguments (c, p))
+ | KafkaParameters.Mdb p -> Choice3Of3 (Args.Mdb.Arguments (c, p))
| x -> missingArg $"unexpected subcommand %A{x}"
type StatsArguments(c, p : ParseResults) =
member val StoreArgs =
match p.GetSubCommand() with
- | StatsParameters.Cosmos p -> Choice1Of2 (Args.Cosmos.Arguments (c, p))
- | StatsParameters.Dynamo p -> Choice2Of2 (Args.Dynamo.Arguments (c, p))
+ | StatsParameters.Cosmos p -> Choice1Of3 (Args.Cosmos.Arguments (c, p))
+ | StatsParameters.Dynamo p -> Choice2Of3 (Args.Dynamo.Arguments (c, p))
+ | StatsParameters.Mdb p -> Choice3Of3 (Args.Mdb.Arguments (c, p))
type Arguments(c, p : ParseResults) =
member val IdleDelay = TimeSpan.FromMilliseconds 10.
@@ -304,8 +313,9 @@ module Project =
let a = Arguments(c, p)
let storeArgs, dumpStoreStats =
match a.StoreArgs with
- | Choice1Of2 sa -> Choice1Of2 sa, Equinox.CosmosStore.Core.Log.InternalMetrics.dump
- | Choice2Of2 sa -> Choice2Of2 sa, Equinox.DynamoStore.Core.Log.InternalMetrics.dump
+ | Choice1Of3 sa -> Choice1Of3 sa, Equinox.CosmosStore.Core.Log.InternalMetrics.dump
+ | Choice2Of3 sa -> Choice2Of3 sa, Equinox.DynamoStore.Core.Log.InternalMetrics.dump
+ | Choice3Of3 sa -> Choice3Of3 sa, (fun _ -> ())
let group, startFromTail, maxItems = p.GetResult ConsumerGroupName, p.Contains FromTail, p.TryGetResult MaxItems
match maxItems with None -> () | Some bs -> Log.Information("ChangeFeed Max items Count {changeFeedMaxItems}", bs)
if startFromTail then Log.Warning("ChangeFeed (If new projector group) Skipping projection of all existing events.")
@@ -332,7 +342,7 @@ module Project =
let source =
let nullFilter _ = true
match storeArgs with
- | Choice1Of2 sa ->
+ | Choice1Of3 sa ->
let monitored = sa.MonitoredContainer()
let leases = sa.ConnectLeases()
let parseFeedDoc = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents nullFilter
@@ -340,7 +350,7 @@ module Project =
Propulsion.CosmosStore.CosmosStoreSource.Start
( Log.Logger, monitored, leases, group, observer,
startFromTail = startFromTail, ?maxItems = maxItems, ?lagReportFreq = sa.MaybeLogLagInterval)
- | Choice2Of2 sa ->
+ | Choice2Of3 sa ->
let (indexStore, indexFilter), maybeHydrate = sa.MonitoringParams()
let checkpoints =
let cache = Equinox.Cache (appName, sizeMb = 1)
@@ -356,6 +366,14 @@ module Project =
checkpoints, sink, loadMode, startFromTail = startFromTail, storeLog = Log.forMetrics,
?trancheIds = indexFilter
).Start()
+ | Choice3Of3 sa ->
+ let checkpoints = sa.CreateCheckpointStore(group)
+ let categories, client = sa.CreateClient()
+ Propulsion.MessageDb.MessageDbSource(
+ Log.Logger, stats.StatsInterval,
+ client, defaultArg maxItems 100, TimeSpan.FromSeconds 0.5,
+ checkpoints, sink, categories
+ ).Start()
let work = [
Async.AwaitKeyboardInterruptAsTaskCanceledException()
sink.AwaitWithStopOnCancellation()
@@ -377,6 +395,7 @@ let main argv =
let c = Args.Configuration(Environment.GetEnvironmentVariable >> Option.ofObj)
try match a.GetSubCommand() with
| Init a -> CosmosInit.aux (c, a) |> Async.Ignore |> Async.RunSynchronously
+ | InitPg a -> Mdb.Arguments(c, a).CreateCheckpointStoreTable() |> Async.RunSynchronously
| Checkpoint a -> Checkpoints.readOrOverride (c, a) |> Async.RunSynchronously
| Index a -> Indexer.run (c, a) |> Async.RunSynchronously
| Project a -> Project.run (c, a) |> Async.RunSynchronously
diff --git a/tools/Propulsion.Tool/Propulsion.Tool.fsproj b/tools/Propulsion.Tool/Propulsion.Tool.fsproj
index 13113cfc..eaa73513 100644
--- a/tools/Propulsion.Tool/Propulsion.Tool.fsproj
+++ b/tools/Propulsion.Tool/Propulsion.Tool.fsproj
@@ -21,6 +21,7 @@
+