Skip to content

Commit

Permalink
Add MessageDb feed source (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
nordfjord committed Nov 18, 2022
1 parent 7f579b5 commit a689ef3
Show file tree
Hide file tree
Showing 16 changed files with 498 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -25,6 +25,7 @@ packages.config
## JetBrains Rider
.idea/
*.sln.iml
*.DotSettings.user

## CodeRush
.cr/
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.EventStoreDb`: Ported `EventStore` to target `Equinox.EventStore` >= `4.0.0` (using the gRPC interface) [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore3`: Special cased version of `Propulsion.CosmosStore` to target `Equinox.CosmosStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.CosmosStore` by updating `Equinox.CosmosStore` dependencies to `4.0.0`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/143) [#140](https://github.com/jet/propulsion/pull/143) [#177](https://github.com/jet/propulsion/pull/177)
- `Propulsion.MessageDb`: `FeedSource` for [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord)
- `Propulsion.MemoryStore`: `MemoryStoreSource` to align with other sources for integration testing. Includes *deterministic* `AwaitCompletion` as per `Propulsion.Feed`-based Sources [#165](https://github.com/jet/propulsion/pull/165)
- `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
Expand Down
12 changes: 12 additions & 0 deletions Propulsion.sln
Expand Up @@ -55,6 +55,10 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MemoryStore", "s
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.DynamoStore.Lambda", "src\Propulsion.DynamoStore.Lambda\Propulsion.DynamoStore.Lambda.fsproj", "{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MessageDb", "src\Propulsion.MessageDb\Propulsion.MessageDb.fsproj", "{BCAEFE2C-8D09-4F4E-B27E-62077497C752}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.MessageDb.Integration", "tests\Propulsion.MessageDb.Integration\Propulsion.MessageDb.Integration.fsproj", "{9738D2C1-EE7C-400F-8B14-31B5B7B66839}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -149,6 +153,14 @@ Global
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7AEA3BB7-E5C4-4653-ABBF-F6C8476E77AF}.Release|Any CPU.Build.0 = Release|Any CPU
{BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BCAEFE2C-8D09-4F4E-B27E-62077497C752}.Release|Any CPU.Build.0 = Release|Any CPU
{9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9738D2C1-EE7C-400F-8B14-31B5B7B66839}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
7 changes: 6 additions & 1 deletion README.md
Expand Up @@ -90,6 +90,10 @@ If you're looking for a good discussion forum on these kinds of topics, look no

- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.7.0`-`1.9.99`, `Serilog`

- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion.Feed`, `Npgsql` >= `6.0.7`
1. `MessageDbSource`: reading from one or more MessageDb categories into a `Propulsion.Sink`
2. `CheckpointStore`: checkpoint storage for `Propulsion.Feed` using `Npgsql` (can be initialized via `propulsion initpg -c connstr -s schema`)

- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL table using Dapper. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion.Feed`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3`, `Serilog`

1. `SqlStreamStoreSource`: reading from a SqlStreamStore `$all` stream into a `Propulsion.Sink`
Expand All @@ -106,8 +110,9 @@ The ubiquitous `Serilog` dependency is solely on the core module, not any sinks.

- CosmosDB: Initialize `-aux` Container for ChangeFeedProcessor
- CosmosDB/DynamoStore/EventStoreDB/Feed/SqlStreamStore: adjust checkpoints
- CosmosDB/DynamoStore/EventStoreDB: walk change feeds/indexes and/or project to Kafka
- CosmosDB/DynamoStore/EventStoreDB/MessageDb: walk change feeds/indexes and/or project to Kafka
- DynamoStore: validate and/or reindex DynamoStore Index
- MessageDb: Initialize a checkpoints table in a Postgres Database

## Deprecated components

Expand Down
1 change: 1 addition & 0 deletions build.proj
Expand Up @@ -23,6 +23,7 @@
<Exec Command="dotnet pack src/Propulsion.Feed $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.Kafka $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.MemoryStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.MessageDb $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.SqlStreamStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack tools/Propulsion.Tool $(Cfg) $(PackOptions)" />

Expand Down
13 changes: 13 additions & 0 deletions src/Propulsion.MessageDb/Internal.fs
@@ -0,0 +1,13 @@
namespace Propulsion.MessageDb

open FSharp.UMX
open Npgsql

module internal FeedSourceId =
let wellKnownId : Propulsion.Feed.SourceId = UMX.tag "messageDb"

module internal Npgsql =
let connect connectionString ct = task {
let conn = new NpgsqlConnection(connectionString)
do! conn.OpenAsync(ct)
return conn }
121 changes: 121 additions & 0 deletions src/Propulsion.MessageDb/MessageDbSource.fs
@@ -0,0 +1,121 @@
namespace Propulsion.MessageDb

open FSharp.Control
open FsCodec
open FsCodec.Core
open NpgsqlTypes
open Propulsion.Feed
open Propulsion.Feed.Core
open Propulsion.Internal
open System
open System.Data.Common
open System.Diagnostics


module Core =
type MessageDbCategoryClient(connectionString) =
let connect = Npgsql.connect connectionString
let parseRow (reader: DbDataReader) =
let readNullableString idx = if reader.IsDBNull(idx) then None else Some (reader.GetString idx)
let streamName = reader.GetString(8)
let event = TimelineEvent.Create(
index = reader.GetInt64(0),
eventType = reader.GetString(1),
data = ReadOnlyMemory(Text.Encoding.UTF8.GetBytes(reader.GetString 2)),
meta = ReadOnlyMemory(Text.Encoding.UTF8.GetBytes(reader.GetString 3)),
eventId = reader.GetGuid(4),
?correlationId = readNullableString 5,
?causationId = readNullableString 6,
context = reader.GetInt64(9),
timestamp = DateTimeOffset(DateTime.SpecifyKind(reader.GetDateTime(7), DateTimeKind.Utc)))

struct(StreamName.parse streamName, event)
member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct) = task {
use! conn = connect ct
let command = conn.CreateCommand(CommandText = "select position, type, data, metadata, id::uuid,
(metadata::jsonb->>'$correlationId')::text,
(metadata::jsonb->>'$causationId')::text,
time, stream_name, global_position
from get_category_messages(@Category, @Position, @BatchSize);")
command.Parameters.AddWithValue("Category", NpgsqlDbType.Text, TrancheId.toString category) |> ignore
command.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, fromPositionInclusive) |> ignore
command.Parameters.AddWithValue("BatchSize", NpgsqlDbType.Bigint, int64 batchSize) |> ignore

let mutable checkpoint = fromPositionInclusive

use! reader = command.ExecuteReaderAsync(ct)
let events = [| while reader.Read() do yield parseRow reader |]

checkpoint <- match Array.tryLast events with Some (_, ev) -> unbox<int64> ev.Context | None -> checkpoint

return { checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 } }
member _.ReadCategoryLastVersion(category: TrancheId, ct) = task {
use! conn = connect ct
let command = conn.CreateCommand(CommandText = "select max(global_position) from messages where category(stream_name) = @Category;")
command.Parameters.AddWithValue("Category", NpgsqlDbType.Text, TrancheId.toString category) |> ignore

use! reader = command.ExecuteReaderAsync(ct)
return if reader.Read() then reader.GetInt64(0) else 0L }

module private Impl =
open Core
open Propulsion.Infrastructure // AwaitTaskCorrect

let readBatch batchSize (store : MessageDbCategoryClient) (category, pos) : Async<Propulsion.Feed.Core.Batch<_>> = async {
let! ct = Async.CancellationToken
let positionInclusive = Position.toInt64 pos
let! x = store.ReadCategoryMessages(category, positionInclusive, batchSize, ct) |> Async.AwaitTaskCorrect
return x }

let readTailPositionForTranche (store : MessageDbCategoryClient) trancheId : Async<Propulsion.Feed.Position> = async {
let! ct = Async.CancellationToken
let! lastEventPos = store.ReadCategoryLastVersion(trancheId, ct) |> Async.AwaitTaskCorrect
return Position.parse lastEventPos }

type MessageDbSource
( log : Serilog.ILogger, statsInterval,
client: Core.MessageDbCategoryClient, batchSize, tailSleepInterval,
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
categories,
// Override default start position to be at the tail of the index. Default: Replay all events.
?startFromTail,
?sourceId) =
inherit Propulsion.Feed.Core.TailingFeedSource
( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval, checkpoints,
( if startFromTail <> Some true then None
else Some (Impl.readTailPositionForTranche client)),
sink,
(fun req -> asyncSeq {
let sw = Stopwatch.StartNew()
let! b = Impl.readBatch batchSize client req
yield sw.Elapsed, b }),
string)
new (log, statsInterval, connectionString, batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail, ?sourceId) =
MessageDbSource(log, statsInterval, Core.MessageDbCategoryClient(connectionString),
batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail=startFromTail, ?sourceId=sourceId)

abstract member ListTranches : unit -> Async<Propulsion.Feed.TrancheId array>
default _.ListTranches() = async { return categories |> Array.map TrancheId.parse }

abstract member Pump : unit -> Async<unit>
default x.Pump() = base.Pump(x.ListTranches)

abstract member Start : unit -> Propulsion.SourcePipeline<Propulsion.Feed.Core.FeedMonitor>
default x.Start() = base.Start(x.Pump())


/// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed
member x.RunUntilCaughtUp(timeout : TimeSpan, statsInterval : IntervalTimer) = task {
let sw = Stopwatch.start ()
use pipeline = x.Start()

try System.Threading.Tasks.Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore

let initialReaderTimeout = TimeSpan.FromMinutes 1.
do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30)
pipeline.Stop()

if sw.ElapsedSeconds > 2 then statsInterval.Trigger()
// force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals)
return! x.Checkpoint()
finally statsInterval.SleepUntilTriggerCleared() }
28 changes: 28 additions & 0 deletions src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Propulsion\Infrastructure.fs">
<Link>Infrastructure.fs</Link>
</Compile>
<Compile Include="Internal.fs" />
<Compile Include="MessageDbSource.fs" />
<Compile Include="ReaderCheckpoint.fs" />
<Content Include="Readme.md" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />
<PackageReference Include="Npgsql" Version="6.0.7" />
</ItemGroup>


<ItemGroup>
<ProjectReference Include="..\Propulsion.Feed\Propulsion.Feed.fsproj" />
</ItemGroup>

</Project>
73 changes: 73 additions & 0 deletions src/Propulsion.MessageDb/ReaderCheckpoint.fs
@@ -0,0 +1,73 @@
module Propulsion.MessageDb.ReaderCheckpoint

open Npgsql
open NpgsqlTypes
open Propulsion.Feed
open Propulsion.Infrastructure


let table = "propulsion_checkpoint"

let createIfNotExists (conn : NpgsqlConnection, schema: string) =
let cmd = conn.CreateCommand(CommandText = $"create table if not exists {schema}.{table} (
source text not null,
tranche text not null,
consumer_group text not null,
position bigint not null,
primary key (source, tranche, consumer_group));")
cmd.ExecuteNonQueryAsync() |> Async.AwaitTaskCorrect |> Async.Ignore<int>

let commitPosition (conn : NpgsqlConnection, schema: string) source tranche (consumerGroup : string) (position : int64)
= async {
let cmd = conn.CreateCommand(CommandText = $"insert into {schema}.{table}(source, tranche, consumer_group, position)
values (@Source, @Tranche, @ConsumerGroup, @Position)
on conflict (source, tranche, consumer_group)
do update set position = @Position;")
cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore
cmd.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, position) |> ignore

let! ct = Async.CancellationToken
do! cmd.ExecuteNonQueryAsync(ct) |> Async.AwaitTaskCorrect |> Async.Ignore<int> }

let tryGetPosition (conn : NpgsqlConnection, schema : string) source tranche (consumerGroup : string) = async {
let cmd = conn.CreateCommand(CommandText = $"select position from {schema}.{table}
where source = @Source
and tranche = @Tranche
and consumer_group = @ConsumerGroup")
cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore

let! ct = Async.CancellationToken
use! reader = cmd.ExecuteReaderAsync(ct) |> Async.AwaitTaskCorrect
return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone }

type CheckpointStore(connString : string, schema: string, consumerGroupName, defaultCheckpointFrequency) =
let connect = Npgsql.connect connString

member _.CreateSchemaIfNotExists() = async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
return! createIfNotExists (conn, schema) }

interface IFeedCheckpointStore with

member _.Start(source, tranche, ?establishOrigin) = async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
let! maybePos = tryGetPosition (conn, schema) source tranche consumerGroupName
let! pos =
match maybePos, establishOrigin with
| ValueSome pos, _ -> async { return Position.parse pos }
| ValueNone, Some f -> f
| ValueNone, None -> async { return Position.initial }
return defaultCheckpointFrequency, pos }

member _.Commit(source, tranche, pos) = async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
return! commitPosition (conn, schema) source tranche consumerGroupName (Position.toInt64 pos) }


0 comments on commit a689ef3

Please sign in to comment.