From 8e4255a0183d3753ee10aa614dfd37c3d645a327 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 19 Sep 2022 13:03:06 +0100 Subject: [PATCH] Target Equinox 4.0.0-rc.6 (inc FSharp.Core 6.0.7) --- src/Propulsion.CosmosStore/CosmosStoreSink.fs | 6 +++++ .../Propulsion.CosmosStore.fsproj | 3 ++- .../DynamoStoreSource.fs | 4 ++- .../Propulsion.DynamoStore.fsproj | 3 ++- src/Propulsion.DynamoStore/Types.fs | 26 +++---------------- src/Propulsion.EventStore/EventStoreSink.fs | 7 ++++- .../Propulsion.EventStore.fsproj | 2 +- .../Propulsion.EventStoreDb.fsproj | 2 +- .../Propulsion.MemoryStore.fsproj | 2 +- .../ParallelThrottledValidation.fs | 3 +-- .../Propulsion.Tests/Propulsion.Tests.fsproj | 2 +- 11 files changed, 27 insertions(+), 33 deletions(-) diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index c2bf6334..69801d13 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -2,6 +2,7 @@ namespace Propulsion.CosmosStore open Equinox.CosmosStore.Core open FsCodec +open Propulsion.Infrastructure // AwaitTaskCorrect open Propulsion.Internal open Propulsion.Streams open Serilog @@ -55,7 +56,12 @@ module Internal = let write (log : ILogger) (ctx : EventsContext) stream (span : Default.StreamSpan) = async { log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length) +#if COSMOSV3 let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _)) +#else + let! ct = Async.CancellationToken + let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _), ct) |> Async.AwaitTaskCorrect +#endif let res' = match res with | AppendResult.Ok pos -> Ok pos.index diff --git a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj index d6a45984..1e50442e 100644 --- a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj +++ b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj @@ -20,8 +20,9 @@ - + + diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index 51d145dd..960bf177 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -2,6 +2,7 @@ open Equinox.DynamoStore open FSharp.Control +open Propulsion.Infrastructure // AwaitTaskCorrect open Propulsion.Internal open System @@ -126,7 +127,8 @@ module internal LoadMode = let private withBodies (eventsContext : Equinox.DynamoStore.Core.EventsContext) categoryFilter = fun sn (i, cs : string array) -> if categoryFilter (FsCodec.StreamName.category sn) then - ValueSome (async { let! _pos, events = eventsContext.Read(FsCodec.StreamName.toString sn, i, maxCount = cs.Length) + ValueSome (async { let! ct = Async.CancellationToken + let! _pos, events = eventsContext.Read(FsCodec.StreamName.toString sn, ct, i, maxCount = cs.Length) |> Async.AwaitTaskCorrect return events |> Array.map mapTimelineEvent }) else ValueNone let private withoutBodies categoryFilter = diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index 6ce816c2..d5ad894d 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -25,8 +25,9 @@ - + + diff --git a/src/Propulsion.DynamoStore/Types.fs b/src/Propulsion.DynamoStore/Types.fs index 336720fc..b47cdce6 100644 --- a/src/Propulsion.DynamoStore/Types.fs +++ b/src/Propulsion.DynamoStore/Types.fs @@ -112,28 +112,8 @@ module internal EventCodec = module internal Async = - open Propulsion.Infrastructure // AwaitTaskCorrect - - type Async with - static member Throttle degreeOfParallelism = - let s = new System.Threading.SemaphoreSlim(degreeOfParallelism) - fun computation -> async { - let! ct = Async.CancellationToken - do! s.WaitAsync ct |> Async.AwaitTaskCorrect - try return! computation - finally s.Release() |> ignore } - let private parallelThrottledUnsafe dop computations = // https://github.com/dotnet/fsharp/issues/13165 + // NOTE there's a bug in FSharp.Core >= 6.0.6 that can trigger a process exit with StackOverflowException + // if more than ~1200 computations are supplied and abort quickly + let parallelThrottled dop computations = // https://github.com/dotnet/fsharp/issues/13165 Async.Parallel(computations, maxDegreeOfParallelism = dop) - // NOTE as soon as a non-preview Async.Parallel impl in FSharp.Core includes the fix (e.g. 6.0.5 does not, 6.0.5-beta.22329.3 does), - // we can remove this shimming and replace it with the body of parallelThrottledUnsafe - let parallelThrottled dop computations = async { - let throttle = Async.Throttle dop // each batch of 1200 gets the full potential dop - we internally limit what actually gets to run concurrently here - let! allResults = - computations - |> Seq.map throttle - |> Seq.chunkBySize 1200 - |> Seq.map (parallelThrottledUnsafe dop) - |> Async.Parallel - return Array.concat allResults - } #endif diff --git a/src/Propulsion.EventStore/EventStoreSink.fs b/src/Propulsion.EventStore/EventStoreSink.fs index 7c9be1a6..790b7b2e 100755 --- a/src/Propulsion.EventStore/EventStoreSink.fs +++ b/src/Propulsion.EventStore/EventStoreSink.fs @@ -42,7 +42,12 @@ module Internal = let write (log : ILogger) (context : EventStoreContext) stream (span : Default.StreamSpan) = async { log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length) - let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _)) +#if EVENTSTORE_LEGACY + let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _)) |> Async.AwaitTaskCorrect +#else + let! ct = Async.CancellationToken + let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _), ct) |> Async.AwaitTaskCorrect +#endif let ress = match res with | GatewaySyncResult.Written (Token.Unpack pos') -> diff --git a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj index 2a1be5b7..3878339c 100644 --- a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj +++ b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj @@ -19,7 +19,7 @@ - + diff --git a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj index 8e8aa497..d2498dff 100644 --- a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj +++ b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj @@ -18,7 +18,7 @@ - + diff --git a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj index 40c0a440..b2ac752e 100644 --- a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj +++ b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj @@ -19,7 +19,7 @@ - + diff --git a/tests/Propulsion.Tests/ParallelThrottledValidation.fs b/tests/Propulsion.Tests/ParallelThrottledValidation.fs index eab09028..ced18a86 100644 --- a/tests/Propulsion.Tests/ParallelThrottledValidation.fs +++ b/tests/Propulsion.Tests/ParallelThrottledValidation.fs @@ -3,9 +3,8 @@ module Propulsion.Tests.ParallelThrottledValidation open Swensen.Unquote open Xunit -// Fixed in 6.0.5-beta.22329.3 (which we explicitly target in this test suite) +// Fixed in 6.0.5-beta.22329.3 and 6.0.6 (which we explicitly target in this test suite) // 6.0.5 is built from a 6.0.4 tag so does not include the fix (but it would win over the beta version in semver) -// => Propulsion.DynamoStore needs to depend on on 6.0.0 for now let [] ``Async.Parallel blows stack when cancelling many <= in FSharp.Core 6.0.5`` () = let gen (i : int) = async { if i = 0 then diff --git a/tests/Propulsion.Tests/Propulsion.Tests.fsproj b/tests/Propulsion.Tests/Propulsion.Tests.fsproj index 49c68056..db56b2e0 100644 --- a/tests/Propulsion.Tests/Propulsion.Tests.fsproj +++ b/tests/Propulsion.Tests/Propulsion.Tests.fsproj @@ -23,7 +23,7 @@ - +