Skip to content

Commit

Permalink
Feed: Temp replace taskSeq with asyncSeq (#214)
Browse files Browse the repository at this point in the history
* remove task seq because of resource leak issues
* fix msgdb test
  • Loading branch information
nordfjord committed May 15, 2023
1 parent f58bdc7 commit e4fd061
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 39 deletions.
55 changes: 55 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: Run tests

on:
push:
branches:
- '**'
tags-ignore:
- '*'

jobs:
build-test-deploy:
name: Build and test
runs-on: ubuntu-latest
services:
postgres:
image: postgres:12
env:
POSTGRES_HOST_AUTH_METHOD: trust
ports:
# will assign a random free host port
- 5432:5432
# needed because the postgres container does not provide a healthcheck
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
steps:
- name: Checkout
uses: actions/checkout@v2

- uses: actions/setup-dotnet@v1
with:
dotnet-version: '6.0.x'

- name: Install message-db
env:
MESSAGE_DB_VERSION: 1.3.0
PGHOST: localhost
PGUSER: postgres
PGPASSWORD: postgres
PGPORT: '5432'
run: |
mkdir -p /tmp/eventide
curl -L https://github.com/message-db/message-db/archive/refs/tags/v$MESSAGE_DB_VERSION.tar.gz -o /tmp/eventide/message-db.tgz
tar -xf /tmp/eventide/message-db.tgz --directory /tmp/eventide
(cd /tmp/eventide/message-db-${MESSAGE_DB_VERSION}/database && ./install.sh)
- name: Restore
run: dotnet restore Propulsion.sln

- name: Build
run: dotnet build Propulsion.sln --configuration Release --no-restore

- name: Run Tests
env:
MSG_DB_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=message_store"
CHECKPOINT_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres"
run: dotnet test Propulsion.sln --no-restore --verbosity minimal
1 change: 1 addition & 0 deletions Propulsion.sln
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".project", ".project", "{6E
LICENSE = LICENSE
README.md = README.md
SECURITY.md = SECURITY.md
.github\workflows\ci.yaml = .github\workflows\ci.yaml
EndProjectSection
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion", "src\Propulsion\Propulsion.fsproj", "{0F72360F-1C14-46E3-9A60-B6BF87BD726D}"
Expand Down
1 change: 1 addition & 0 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

<Target Name="VSTest">
<Exec Command="dotnet test tests/Propulsion.Tests $(Cfg) $(TestOptions)" />
<Exec Command="dotnet test tests/Propulsion.MessageDb.Integration $(Cfg) $(TestOptions)" />
<!-- NB previously, when there were multiple integration tests running concurrently, they failed on CI (i.e. doing dotnet test on the .sln causes various hangs etc.)-->
<!-- Wild guess says this is down to having two rdkafkas in a single process but who knows.-->
<Exec Command="dotnet test tests/Propulsion.Kafka.Integration $(Cfg) $(TestOptions)" />
Expand Down
11 changes: 5 additions & 6 deletions src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,15 @@ type ChangeFeedProcessor =
let estimator = monitored.GetChangeFeedEstimator(processorName_, leases)
let emitLagMetrics (ct : CancellationToken) = task {
while not ct.IsCancellationRequested do
let feedIteratorMap (map : ChangeFeedProcessorState -> 'u) : IAsyncEnumerable<'u> = taskSeq {
let feedIteratorMap (map : ChangeFeedProcessorState -> 'u) : Task<'u seq> = task {
// earlier versions, such as 3.9.0, do not implement IDisposable; see linked issue for detail on when SDK team added it
use query = estimator.GetCurrentStateIterator() // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable
let result = ResizeArray()
while query.HasMoreResults do
let! res = query.ReadNextAsync(ct)
for x in res do
yield map x }
let! leasesState =
feedIteratorMap (fun s -> leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag)
|> TaskSeq.toArrayAsync
for x in res do result.Add(map x)
return result :> 'u seq }
let! leasesState = feedIteratorMap (fun s -> leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag)
do! lagMonitorCallback (Seq.sortBy fst leasesState |> List.ofSeq) }
emitLagMetrics)
let wrap (f : unit -> Task) () = task { return! f () }
Expand Down
11 changes: 6 additions & 5 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

open Equinox.DynamoStore
open FSharp.Control
open Propulsion.Infrastructure
open Propulsion.Internal
open System
open System.Threading
Expand Down Expand Up @@ -44,10 +45,10 @@ module private Impl =
// Includes optional hydrating of events with event bodies and/or metadata (controlled via hydrating/maybeLoad args)
let materializeIndexEpochAsBatchesOfStreamEvents
(log : Serilog.ILogger, sourceId, storeLog) (hydrating, maybeLoad : _ -> _ -> (CancellationToken -> Task<_>) voption, loadDop) batchCutoff (context : DynamoStoreContext)
(AppendsPartitionId.Parse pid) (Checkpoint.Parse (epochId, offset)) ct = taskSeq {
(AppendsPartitionId.Parse pid) (Checkpoint.Parse (epochId, offset)) ct = AsyncSeq.toAsyncEnum(asyncSeq {
let epochs = AppendsEpoch.Reader.Config.create storeLog context
let sw = Stopwatch.start ()
let! _maybeSize, version, state = epochs.Read(pid, epochId, offset) |> Async.startImmediateAsTask ct
let! _maybeSize, version, state = epochs.Read(pid, epochId, offset)
let totalChanges = state.changes.Length
sw.Stop()
let totalStreams, chosenEvents, totalEvents, streamEvents =
Expand Down Expand Up @@ -103,15 +104,15 @@ module private Impl =
for i, spans in state.changes do
let pending = spans |> Array.filter (fun (span : AppendsEpoch.Events.StreamSpan) -> streamEvents.ContainsKey(span.p))
if buffer.Count <> 0 && buffer.Count + pending.Length > batchCutoff then
let! hydrated = materializeSpans ct
let! hydrated = materializeSpans ct |> Async.AwaitTaskCorrect
report (Some i) hydrated.Length
yield struct (sw.Elapsed, sliceBatch epochId i hydrated) // not i + 1 as the batch does not include these changes
sw.Reset()
buffer.Clear()
buffer.AddRange(pending)
let! hydrated = materializeSpans ct
let! hydrated = materializeSpans ct |> Async.AwaitTaskCorrect
report None hydrated.Length
yield struct (sw.Elapsed, finalBatch epochId (version, state) hydrated) }
yield struct (sw.Elapsed, finalBatch epochId (version, state) hydrated) })

/// Defines the strategy to use for hydrating the events prior to routing them to the Handler
[<NoComparison; NoEquality>]
Expand Down
1 change: 1 addition & 0 deletions src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Propulsion\Infrastructure.fs" Link="Infrastructure.fs" />
<Compile Include="Types.fs" />
<Compile Include="ExactlyOnceIngester.fs" />
<Compile Include="AppendsIndex.fs" />
Expand Down
14 changes: 9 additions & 5 deletions src/Propulsion.Feed/FeedReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace Propulsion.Feed.Core

open FSharp.Control
open Propulsion.Feed
open Propulsion.Infrastructure
open Propulsion.Internal
open Serilog
open System
Expand Down Expand Up @@ -125,7 +126,7 @@ type FeedReader
renderPos,
?logCommitFailure,
// If supplied, an isTail Batch stops the reader loop and waits for supplied cleanup function. Default is a perpetual read loop.
?awaitIngesterShutdown) =
?awaitIngesterShutdown: CancellationToken -> Task<struct(int * int)>) =

let stats = Stats(partition, source, tranche, renderPos)

Expand Down Expand Up @@ -161,10 +162,13 @@ type FeedReader
stats.UpdateCommittedPosition(initialPosition)
let mutable currentPos, lastWasTail = initialPosition, false
while not (ct.IsCancellationRequested || (lastWasTail && Option.isSome awaitIngesterShutdown)) do
for readLatency, batch in crawl (lastWasTail, currentPos) ct do
do! submitPage (readLatency, batch)
currentPos <- batch.checkpoint
lastWasTail <- batch.isTail
do! AsyncSeq.ofAsyncEnum (crawl (lastWasTail, currentPos) ct)
|> AsyncSeq.iterAsync (fun struct(readLatency, batch) -> async {
do! submitPage (readLatency, batch) |> Async.AwaitTaskCorrect
currentPos <- batch.checkpoint
lastWasTail <- batch.isTail })
|> Async.startImmediateAsTask ct
|> Task.ignore<unit>

Check warning on line 171 in src/Propulsion.Feed/FeedReader.fs

View workflow job for this annotation

GitHub Actions / Build and test

The method or function 'ignore' should not be given explicit type argument(s) because it does not declare its type parameters explicitly

Check warning on line 171 in src/Propulsion.Feed/FeedReader.fs

View workflow job for this annotation

GitHub Actions / Build and test

The method or function 'ignore' should not be given explicit type argument(s) because it does not declare its type parameters explicitly

Check warning on line 171 in src/Propulsion.Feed/FeedReader.fs

View workflow job for this annotation

GitHub Actions / Build and test

The method or function 'ignore' should not be given explicit type argument(s) because it does not declare its type parameters explicitly
match awaitIngesterShutdown with
| Some a when not ct.IsCancellationRequested ->
let completionTimer = Stopwatch.start ()
Expand Down
24 changes: 13 additions & 11 deletions src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -267,24 +267,25 @@ type TailingFeedSource
inherit FeedSourceBase(log, statsInterval, sourceId, checkpoints, establishOrigin, sink, renderPos,
?logCommitFailure = logCommitFailure, ?readersStopAtTail = readersStopAtTail)

let crawl trancheId (wasLast, startPos) ct = taskSeq {
if wasLast then do! Task.delay tailSleepInterval ct
try let batches = crawl.Invoke(trancheId, startPos, ct)
let crawl trancheId (wasLast, startPos) ct = AsyncSeq.toAsyncEnum(asyncSeq {
if wasLast then do! Async.Sleep tailSleepInterval
try let batches = crawl.Invoke(trancheId, startPos, ct) |> AsyncSeq.ofAsyncEnum
for batch in batches do
yield batch
with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log
match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext<TailingFeedSource>().Warning(e, "Read failure") | Some l -> l e
match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct }
match readFailureSleepInterval with None -> () | Some interval -> do! Async.Sleep interval
})

member _.Pump(readTranches, ct) =
base.Pump(readTranches, crawl, ct)

module TailingFeedSource =

let readOne readBatch cat pos ct = taskSeq {
let readOne readBatch cat pos ct = AsyncSeq.toAsyncEnum (asyncSeq {
let sw = Stopwatch.start ()
let! b = readBatch struct (cat, pos, ct)
yield struct (sw.Elapsed, b) }
let! b = readBatch struct (cat, pos, ct) |> Async.AwaitTaskCorrect
yield struct (sw.Elapsed, b) })

/// Drives reading and checkpointing from a source that aggregates data from multiple streams as a singular source
/// without shards/physical partitions (tranches), such as the SqlStreamStore and EventStoreDB $all feeds
Expand Down Expand Up @@ -341,6 +342,7 @@ open Propulsion.Internal
open System
open System.Threading
open System.Threading.Tasks
open Propulsion.Infrastructure

[<NoComparison; NoEquality>]
type Page<'F> = { items : FsCodec.ITimelineEvent<'F>[]; checkpoint : Position; isTail : bool }
Expand All @@ -357,13 +359,13 @@ type FeedSource

let crawl (readPage: Func<TrancheId,Position,CancellationToken,Task<Page<_>>>) trancheId =
let streamName = FsCodec.StreamName.compose "Messages" [SourceId.toString sourceId; TrancheId.toString trancheId]
fun (wasLast, pos) ct -> taskSeq {
fun (wasLast, pos) ct -> AsyncSeq.toAsyncEnum(asyncSeq {
if wasLast then
do! Task.delay tailSleepInterval ct
do! Async.Sleep tailSleepInterval
let readTs = Stopwatch.timestamp ()
let! page = readPage.Invoke(trancheId, pos, ct)
let! page = (readPage.Invoke(trancheId, pos, ct)) |> Async.AwaitTaskCorrect
let items' = page.items |> Array.map (fun x -> struct (streamName, x))
yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)) }
yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)) })

member internal _.Pump(readTranches: Func<CancellationToken, Task<TrancheId[]>>,
readPage: Func<TrancheId, Position, CancellationToken, Task<Page<Propulsion.Sinks.EventBody>>>, ct): Task<unit> =
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.Feed/PeriodicSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ type PeriodicSource
inherit Core.FeedSourceBase(log, statsInterval, sourceId, checkpoints, None, sink, defaultArg renderPos DateTimeOffsetPosition.render)

// We don't want to checkpoint for real until we know the scheduler has handled the full set of pages in the crawl.
let crawlInternal (read : Func<_, IAsyncEnumerable<struct (_ * _)>>) trancheId (_wasLast, position) ct : IAsyncEnumerable<struct (TimeSpan * Core.Batch<_>)> = taskSeq {
let crawlInternal (read : Func<_, IAsyncEnumerable<struct (_ * _)>>) trancheId (_wasLast, position) ct : IAsyncEnumerable<struct (TimeSpan * Core.Batch<_>)> = AsyncSeq.toAsyncEnum(asyncSeq {
let startDate = DateTimeOffsetPosition.getDateTimeOffset position
let dueDate = startDate + refreshInterval
match dueDate - DateTimeOffset.UtcNow with
| waitTime when waitTime.Ticks > 0L -> do! Task.delay waitTime ct
| waitTime when waitTime.Ticks > 0L -> do! Async.Sleep waitTime
| _ -> ()

let basePosition = DateTimeOffset.UtcNow |> DateTimeOffsetPosition.ofDateTimeOffset
Expand All @@ -70,7 +70,7 @@ type PeriodicSource
let buffer = ResizeArray()
let mutable index = 0L
let mutable elapsed = TimeSpan.Zero
for ts, xs in read.Invoke trancheId do
for ts, xs in AsyncSeq.ofAsyncEnum (read.Invoke trancheId) do
elapsed <- elapsed + ts
let streamEvents : Propulsion.Sinks.StreamEvent seq = seq {
for si in xs ->
Expand All @@ -91,7 +91,7 @@ type PeriodicSource
match buffer.ToArray() with
| [||] as noItems -> noItems, basePosition
| finalItem -> finalItem, let struct (_s, e) = Array.last finalItem in e |> Core.TimelineEvent.toCheckpointPosition
yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true } : Core.Batch<_>) }
yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true } : Core.Batch<_>) })

member internal _.Pump(readTranches: Func<CancellationToken, Task<TrancheId[]>>,
// The <c>TaskSeq</c> is expected to manage its own resilience strategy (retries etc). <br/>
Expand Down
6 changes: 4 additions & 2 deletions src/Propulsion.Feed/Propulsion.Feed.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Propulsion\Infrastructure.fs">
<Link>Infrastructure.fs</Link>
</Compile>
<Compile Include="FeedReader.fs" />
<Compile Include="FeedSource.fs" />
<Compile Include="PeriodicSource.fs" />
<Compile Include="FeedPrometheus.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="FSharp.Control.AsyncSeq" Version="3.2.1" />
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="FSharp.Control.TaskSeq" Version="0.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open Dapper
open FSharp.Control
open Microsoft.Data.SqlClient
open Propulsion.Feed
open Propulsion.Internal
open System
open System.Data

Expand Down
5 changes: 5 additions & 0 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ module Task =
let parallelUnlimited ct xs : Task<'t []> =
parallel_ 0 ct xs

let inline ignore (a: Task<'T>): Task = task {
let! _ = a
return ()
}

type Sem(max) =
let inner = new SemaphoreSlim(max)
member _.HasCapacity = inner.CurrentCount <> 0
Expand Down
17 changes: 11 additions & 6 deletions tests/Propulsion.MessageDb.Integration/Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ let createStreamMessage streamName =
cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore
cmd

[<Literal>]
let ConnectionString = "Host=localhost; Port=5433; Username=message_store; Password=;"
let ConnectionString =
match Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" with
| null -> "Host=localhost; Database=message_store; Port=5432; Username=message_store"
| s -> s
let CheckpointConnectionString =
match Environment.GetEnvironmentVariable "CHECKPOINT_CONNECTION_STRING" with
| null -> "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres"
| s -> s


let connect () = task {
let conn = new NpgsqlConnection(ConnectionString)
Expand All @@ -54,10 +61,9 @@ let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, T
member _.HandleExn(log, x) = () }

let makeCheckpoints consumerGroup = task {
let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Database=message_store; Port=5433; Username=postgres; Password=postgres", "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10)
let checkpoints = ReaderCheckpoint.CheckpointStore(CheckpointConnectionString, "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10)
do! checkpoints.CreateSchemaIfNotExists()
return checkpoints
}
return checkpoints }

[<Fact>]
let ``It processes events for a category`` () = task {
Expand Down Expand Up @@ -136,7 +142,6 @@ let ``It doesn't read the tail event again`` () = task {
checkpoints, sink, [| category |])

use capture = new ActivityCapture()
use _src = source.Start()

do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task

Expand Down

0 comments on commit e4fd061

Please sign in to comment.