Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MessageDb feed source #181

Merged
merged 44 commits into from Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ac34e7a
Add MessageDb feed source
nordfjord Nov 12, 2022
5e8e0e1
spaces
nordfjord Nov 12, 2022
47bc516
formatting
nordfjord Nov 12, 2022
496746b
fix bug
nordfjord Nov 12, 2022
32c13ad
remove conn helper
nordfjord Nov 12, 2022
06422dd
add to build
nordfjord Nov 12, 2022
2e6aff3
fix bugs with positions
nordfjord Nov 12, 2022
5fd81a6
fix typo
nordfjord Nov 12, 2022
be656ab
space
nordfjord Nov 12, 2022
cac1768
Add readme and changelog
nordfjord Nov 12, 2022
2a3f73f
clarify
nordfjord Nov 12, 2022
9f69cf9
add assert on the order of events
nordfjord Nov 12, 2022
c51bd81
perf: voptions shouldn't be turned into options
nordfjord Nov 12, 2022
dee0116
use the internal chooseV
nordfjord Nov 12, 2022
5394519
fixes from code review
nordfjord Nov 15, 2022
a001b00
fix
nordfjord Nov 15, 2022
e2aec05
Update CHANGELOG.md
nordfjord Nov 15, 2022
c7db625
Update README.md
nordfjord Nov 15, 2022
5781820
take categories as an argument
nordfjord Nov 15, 2022
2fea86a
change the test to make sure it reads across categories
nordfjord Nov 15, 2022
7fc78dc
add tail semantics when reaching an empty page
nordfjord Nov 15, 2022
d305f11
reviews
nordfjord Nov 16, 2022
30a7991
add initpg command
nordfjord Nov 17, 2022
2df580f
docs
nordfjord Nov 17, 2022
e4a6d38
Review
nordfjord Nov 17, 2022
f440284
rename reader to client
nordfjord Nov 18, 2022
f5ff105
renames
nordfjord Nov 18, 2022
7828420
move to rider
nordfjord Nov 18, 2022
3abd627
Add constructor with connection string
nordfjord Nov 18, 2022
33494a2
Update README.md
nordfjord Nov 18, 2022
8d1d0cc
revert silly
nordfjord Nov 18, 2022
22f2778
wire up kafka and project command
nordfjord Nov 18, 2022
9a5e3cc
fix
nordfjord Nov 18, 2022
4eb0a9e
fix tests
nordfjord Nov 18, 2022
f7e105e
rename
nordfjord Nov 18, 2022
70b0143
add separate arg for checkpoint conn string
nordfjord Nov 18, 2022
0fd95eb
The big Mdb rename
nordfjord Nov 18, 2022
2e4b6eb
add a readme specifically for messagedb, mostly for myself
nordfjord Nov 18, 2022
9e58018
review
nordfjord Nov 18, 2022
e17d446
command text in style
nordfjord Nov 18, 2022
171e2a2
move the client into a core module
nordfjord Nov 18, 2022
5206572
fix missing core naming
nordfjord Nov 18, 2022
22a4df5
rename tranches to categories for mdb args
nordfjord Nov 18, 2022
f004c71
yes, it should be category
nordfjord Nov 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -25,6 +25,7 @@ packages.config
## JetBrains Rider
.idea/
*.sln.iml
*.DotSettings.user

## CodeRush
.cr/
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.EventStoreDb`: Ported `EventStore` to target `Equinox.EventStore` >= `4.0.0` (using the gRPC interface) [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore3`: Special cased version of `Propulsion.CosmosStore` to target `Equinox.CosmosStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.CosmosStore` by updating `Equinox.CosmosStore` dependencies to `4.0.0`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/143) [#140](https://github.com/jet/propulsion/pull/143) [#177](https://github.com/jet/propulsion/pull/177)
- `Propulsion.MessageDb`: `FeedSource` for [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord)
- `Propulsion.MemoryStore`: `MemoryStoreSource` to align with other sources for integration testing. Includes *deterministic* `AwaitCompletion` as per `Propulsion.Feed`-based Sources [#165](https://github.com/jet/propulsion/pull/165)
- `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
Expand Down
12 changes: 12 additions & 0 deletions Propulsion.sln
Expand Up @@ -55,6 +55,10 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MemoryStore", "s
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.DynamoStore.Lambda", "src\Propulsion.DynamoStore.Lambda\Propulsion.DynamoStore.Lambda.fsproj", "{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MessageDb", "src\Propulsion.MessageDb\Propulsion.MessageDb.fsproj", "{BCAEFE2C-8D09-4F4E-B27E-62077497C752}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MessageDb.Integration", "tests\Propulsion.MessageDb.Integration\Propulsion.MessageDb.Integration.fsproj", "{9738D2C1-EE7C-400F-8B14-31B5B7B66839}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -149,6 +153,14 @@ Global
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Release|Any CPU.Build.0 = Release|Any CPU
{BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Release|Any CPU.Build.0 = Release|Any CPU
{9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
7 changes: 6 additions & 1 deletion README.md
Expand Up @@ -90,6 +90,10 @@ If you're looking for a good discussion forum on these kinds of topics, look no

- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.7.0`-`1.9.99`, `Serilog`

- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion.Feed`, `Npgsql` >= `6.0.7`
1. `MessageDbSource`: reading from one or more MessageDb categories into a `Propulsion.Sink`
2. `CheckpointStore`: checkpoint storage for `Propulsion.Feed` using `Npgsql` (can be initialized via `propulsion initpg -c connstr -s schema`)

- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL table using Dapper. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion.Feed`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3`, `Serilog`

1. `SqlStreamStoreSource`: reading from a SqlStreamStore `$all` stream into a `Propulsion.Sink`
Expand All @@ -106,8 +110,9 @@ The ubiquitous `Serilog` dependency is solely on the core module, not any sinks.

- CosmosDB: Initialize `-aux` Container for ChangeFeedProcessor
- CosmosDB/DynamoStore/EventStoreDB/Feed/SqlStreamStore: adjust checkpoints
- CosmosDB/DynamoStore/EventStoreDB: walk change feeds/indexes and/or project to Kafka
- CosmosDB/DynamoStore/EventStoreDB/MessageDb: walk change feeds/indexes and/or project to Kafka
- DynamoStore: validate and/or reindex DynamoStore Index
- MessageDb: Initialize a checkpoints table in a Postgres Database

## Deprecated components

Expand Down
1 change: 1 addition & 0 deletions build.proj
Expand Up @@ -23,6 +23,7 @@
<Exec Command="dotnet pack src/Propulsion.Feed $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.Kafka $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.MemoryStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.MessageDb $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.SqlStreamStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack tools/Propulsion.Tool $(Cfg) $(PackOptions)" />

Expand Down
13 changes: 13 additions & 0 deletions src/Propulsion.MessageDb/Internal.fs
@@ -0,0 +1,13 @@
namespace Propulsion.MessageDb

open FSharp.UMX
open Npgsql

module internal FeedSourceId =
let wellKnownId : Propulsion.Feed.SourceId = UMX.tag "messageDb"

module internal Npgsql =
let connect connectionString ct = task {
let conn = new NpgsqlConnection(connectionString)
do! conn.OpenAsync(ct)
return conn }
121 changes: 121 additions & 0 deletions src/Propulsion.MessageDb/MessageDbSource.fs
@@ -0,0 +1,121 @@
namespace Propulsion.MessageDb

open FSharp.Control
open FsCodec
open FsCodec.Core
open NpgsqlTypes
open Propulsion.Feed
open Propulsion.Feed.Core
open Propulsion.Internal
open System
open System.Data.Common
open System.Diagnostics


module Core =
type MessageDbCategoryClient(connectionString) =
let connect = Npgsql.connect connectionString
let parseRow (reader: DbDataReader) =
let readNullableString idx = if reader.IsDBNull(idx) then None else Some (reader.GetString idx)
let streamName = reader.GetString(8)
let event = TimelineEvent.Create(
index = reader.GetInt64(0),
eventType = reader.GetString(1),
data = ReadOnlyMemory(Text.Encoding.UTF8.GetBytes(reader.GetString 2)),
meta = ReadOnlyMemory(Text.Encoding.UTF8.GetBytes(reader.GetString 3)),
eventId = reader.GetGuid(4),
?correlationId = readNullableString 5,
?causationId = readNullableString 6,
context = reader.GetInt64(9),
timestamp = DateTimeOffset(DateTime.SpecifyKind(reader.GetDateTime(7), DateTimeKind.Utc)))

struct(StreamName.parse streamName, event)
member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct) = task {
use! conn = connect ct
let command = conn.CreateCommand(CommandText = "select position, type, data, metadata, id::uuid,
(metadata::jsonb->>'$correlationId')::text,
(metadata::jsonb->>'$causationId')::text,
time, stream_name, global_position
from get_category_messages(@Category, @Position, @BatchSize);")
command.Parameters.AddWithValue("Category", NpgsqlDbType.Text, TrancheId.toString category) |> ignore
command.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, fromPositionInclusive) |> ignore
command.Parameters.AddWithValue("BatchSize", NpgsqlDbType.Bigint, int64 batchSize) |> ignore

let mutable checkpoint = fromPositionInclusive

use! reader = command.ExecuteReaderAsync(ct)
let events = [| while reader.Read() do yield parseRow reader |]

checkpoint <- match Array.tryLast events with Some (_, ev) -> unbox<int64> ev.Context | None -> checkpoint

return { checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 } }
member _.ReadCategoryLastVersion(category: TrancheId, ct) = task {
use! conn = connect ct
let command = conn.CreateCommand(CommandText = "select max(global_position) from messages where category(stream_name) = @Category;")
command.Parameters.AddWithValue("Category", NpgsqlDbType.Text, TrancheId.toString category) |> ignore

use! reader = command.ExecuteReaderAsync(ct)
return if reader.Read() then reader.GetInt64(0) else 0L }

module private Impl =
open Core
open Propulsion.Infrastructure // AwaitTaskCorrect

let readBatch batchSize (store : MessageDbCategoryClient) (category, pos) : Async<Propulsion.Feed.Core.Batch<_>> = async {
let! ct = Async.CancellationToken
let positionInclusive = Position.toInt64 pos
let! x = store.ReadCategoryMessages(category, positionInclusive, batchSize, ct) |> Async.AwaitTaskCorrect
return x }

let readTailPositionForTranche (store : MessageDbCategoryClient) trancheId : Async<Propulsion.Feed.Position> = async {
let! ct = Async.CancellationToken
let! lastEventPos = store.ReadCategoryLastVersion(trancheId, ct) |> Async.AwaitTaskCorrect
return Position.parse lastEventPos }

type MessageDbSource
( log : Serilog.ILogger, statsInterval,
client: Core.MessageDbCategoryClient, batchSize, tailSleepInterval,
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
categories,
// Override default start position to be at the tail of the index. Default: Replay all events.
?startFromTail,
?sourceId) =
inherit Propulsion.Feed.Core.TailingFeedSource
( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval, checkpoints,
( if startFromTail <> Some true then None
else Some (Impl.readTailPositionForTranche client)),
sink,
(fun req -> asyncSeq {
let sw = Stopwatch.StartNew()
let! b = Impl.readBatch batchSize client req
yield sw.Elapsed, b }),
string)
new (log, statsInterval, connectionString, batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail, ?sourceId) =
MessageDbSource(log, statsInterval, Core.MessageDbCategoryClient(connectionString),
batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail=startFromTail, ?sourceId=sourceId)

abstract member ListTranches : unit -> Async<Propulsion.Feed.TrancheId array>
default _.ListTranches() = async { return categories |> Array.map TrancheId.parse }

abstract member Pump : unit -> Async<unit>
default x.Pump() = base.Pump(x.ListTranches)

abstract member Start : unit -> Propulsion.SourcePipeline<Propulsion.Feed.Core.FeedMonitor>
default x.Start() = base.Start(x.Pump())


/// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed
member x.RunUntilCaughtUp(timeout : TimeSpan, statsInterval : IntervalTimer) = task {
let sw = Stopwatch.start ()
use pipeline = x.Start()

try System.Threading.Tasks.Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore

let initialReaderTimeout = TimeSpan.FromMinutes 1.
do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30)
pipeline.Stop()

if sw.ElapsedSeconds > 2 then statsInterval.Trigger()
// force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals)
return! x.Checkpoint()
finally statsInterval.SleepUntilTriggerCleared() }
28 changes: 28 additions & 0 deletions src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Propulsion\Infrastructure.fs">
<Link>Infrastructure.fs</Link>
</Compile>
<Compile Include="Internal.fs" />
<Compile Include="MessageDbSource.fs" />
<Compile Include="ReaderCheckpoint.fs" />
<Content Include="Readme.md" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />
<PackageReference Include="Npgsql" Version="6.0.7" />
</ItemGroup>


<ItemGroup>
<ProjectReference Include="..\Propulsion.Feed\Propulsion.Feed.fsproj" />
</ItemGroup>

</Project>
73 changes: 73 additions & 0 deletions src/Propulsion.MessageDb/ReaderCheckpoint.fs
@@ -0,0 +1,73 @@
module Propulsion.MessageDb.ReaderCheckpoint

open Npgsql
open NpgsqlTypes
open Propulsion.Feed
open Propulsion.Infrastructure


let table = "propulsion_checkpoint"

let createIfNotExists (conn : NpgsqlConnection, schema: string) =
let cmd = conn.CreateCommand(CommandText = $"create table if not exists {schema}.{table} (
source text not null,
tranche text not null,
consumer_group text not null,
position bigint not null,
primary key (source, tranche, consumer_group));")
cmd.ExecuteNonQueryAsync() |> Async.AwaitTaskCorrect |> Async.Ignore<int>

let commitPosition (conn : NpgsqlConnection, schema: string) source tranche (consumerGroup : string) (position : int64)
= async {
let cmd = conn.CreateCommand(CommandText = $"insert into {schema}.{table}(source, tranche, consumer_group, position)
values (@Source, @Tranche, @ConsumerGroup, @Position)
on conflict (source, tranche, consumer_group)
do update set position = @Position;")
cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore
cmd.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, position) |> ignore

let! ct = Async.CancellationToken
do! cmd.ExecuteNonQueryAsync(ct) |> Async.AwaitTaskCorrect |> Async.Ignore<int> }

let tryGetPosition (conn : NpgsqlConnection, schema : string) source tranche (consumerGroup : string) = async {
let cmd = conn.CreateCommand(CommandText = $"select position from {schema}.{table}
where source = @Source
and tranche = @Tranche
and consumer_group = @ConsumerGroup")
cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore

let! ct = Async.CancellationToken
use! reader = cmd.ExecuteReaderAsync(ct) |> Async.AwaitTaskCorrect
return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone }

type CheckpointStore(connString : string, schema: string, consumerGroupName, defaultCheckpointFrequency) =
let connect = Npgsql.connect connString

member _.CreateSchemaIfNotExists() = async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
return! createIfNotExists (conn, schema) }

interface IFeedCheckpointStore with

member _.Start(source, tranche, ?establishOrigin) = async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
let! maybePos = tryGetPosition (conn, schema) source tranche consumerGroupName
let! pos =
match maybePos, establishOrigin with
| ValueSome pos, _ -> async { return Position.parse pos }
| ValueNone, Some f -> f
| ValueNone, None -> async { return Position.initial }
return defaultCheckpointFrequency, pos }

member _.Commit(source, tranche, pos) = async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
return! commitPosition (conn, schema) source tranche consumerGroupName (Position.toInt64 pos) }