Skip to content

Commit

Permalink
Target Equinox 4.0.0-rc.6 (inc FSharp.Core 6.0.7)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 9, 2022
1 parent 4d62f9d commit 8e4255a
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 33 deletions.
6 changes: 6 additions & 0 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace Propulsion.CosmosStore

open Equinox.CosmosStore.Core
open FsCodec
open Propulsion.Infrastructure // AwaitTaskCorrect
open Propulsion.Internal
open Propulsion.Streams
open Serilog
Expand Down Expand Up @@ -55,7 +56,12 @@ module Internal =

let write (log : ILogger) (ctx : EventsContext) stream (span : Default.StreamSpan) = async {
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length)
#if COSMOSV3
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _))
#else
let! ct = Async.CancellationToken
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _), ct) |> Async.AwaitTaskCorrect
#endif
let res' =
match res with
| AppendResult.Ok pos -> Ok pos.index
Expand Down
3 changes: 2 additions & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.5.2" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.6" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.9" />
<PackageReference Include="FSharp.Control.AsyncSeq" Version="3.2.1" />
</ItemGroup>

<ItemGroup>
Expand Down
4 changes: 3 additions & 1 deletion src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

open Equinox.DynamoStore
open FSharp.Control
open Propulsion.Infrastructure // AwaitTaskCorrect
open Propulsion.Internal
open System

Expand Down Expand Up @@ -126,7 +127,8 @@ module internal LoadMode =
let private withBodies (eventsContext : Equinox.DynamoStore.Core.EventsContext) categoryFilter =
fun sn (i, cs : string array) ->
if categoryFilter (FsCodec.StreamName.category sn) then
ValueSome (async { let! _pos, events = eventsContext.Read(FsCodec.StreamName.toString sn, i, maxCount = cs.Length)
ValueSome (async { let! ct = Async.CancellationToken
let! _pos, events = eventsContext.Read(FsCodec.StreamName.toString sn, ct, i, maxCount = cs.Length) |> Async.AwaitTaskCorrect
return events |> Array.map mapTimelineEvent })
else ValueNone
let private withoutBodies categoryFilter =
Expand Down
3 changes: 2 additions & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.5.2" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.6" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.9" />
<PackageReference Include="FSharp.Core" Version="6.0.6" />
</ItemGroup>

<ItemGroup>
Expand Down
26 changes: 3 additions & 23 deletions src/Propulsion.DynamoStore/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,8 @@ module internal EventCodec =

module internal Async =

open Propulsion.Infrastructure // AwaitTaskCorrect

type Async with
static member Throttle degreeOfParallelism =
let s = new System.Threading.SemaphoreSlim(degreeOfParallelism)
fun computation -> async {
let! ct = Async.CancellationToken
do! s.WaitAsync ct |> Async.AwaitTaskCorrect
try return! computation
finally s.Release() |> ignore }
let private parallelThrottledUnsafe dop computations = // https://github.com/dotnet/fsharp/issues/13165
// NOTE there's a bug in FSharp.Core >= 6.0.6 that can trigger a process exit with StackOverflowException
// if more than ~1200 computations are supplied and abort quickly
let parallelThrottled dop computations = // https://github.com/dotnet/fsharp/issues/13165
Async.Parallel(computations, maxDegreeOfParallelism = dop)
// NOTE as soon as a non-preview Async.Parallel impl in FSharp.Core includes the fix (e.g. 6.0.5 does not, 6.0.5-beta.22329.3 does),
// we can remove this shimming and replace it with the body of parallelThrottledUnsafe
let parallelThrottled dop computations = async {
let throttle = Async.Throttle dop // each batch of 1200 gets the full potential dop - we internally limit what actually gets to run concurrently here
let! allResults =
computations
|> Seq.map throttle
|> Seq.chunkBySize 1200
|> Seq.map (parallelThrottledUnsafe dop)
|> Async.Parallel
return Array.concat allResults
}
#endif
7 changes: 6 additions & 1 deletion src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ module Internal =

let write (log : ILogger) (context : EventStoreContext) stream (span : Default.StreamSpan) = async {
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length)
let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _))
#if EVENTSTORE_LEGACY
let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _)) |> Async.AwaitTaskCorrect
#else
let! ct = Async.CancellationToken
let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _), ct) |> Async.AwaitTaskCorrect
#endif
let ress =
match res with
| GatewaySyncResult.Written (Token.Unpack pos') ->
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/Propulsion.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.5.2" />
<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.6" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.9" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.5.2" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.6" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.9" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.5.2" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.6" />
</ItemGroup>

<ItemGroup>
Expand Down
3 changes: 1 addition & 2 deletions tests/Propulsion.Tests/ParallelThrottledValidation.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ module Propulsion.Tests.ParallelThrottledValidation
open Swensen.Unquote
open Xunit

// Fixed in 6.0.5-beta.22329.3 (which we explicitly target in this test suite)
// Fixed in 6.0.5-beta.22329.3 and 6.0.6 (which we explicitly target in this test suite)
// 6.0.5 is built from a 6.0.4 tag so does not include the fix (but it would win over the beta version in semver)
// => Propulsion.DynamoStore needs to depend on on 6.0.0 for now
let [<Fact>] ``Async.Parallel blows stack when cancelling many <= in FSharp.Core 6.0.5`` () =
let gen (i : int) = async {
if i = 0 then
Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Tests/Propulsion.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<ItemGroup>
<PackageReference Include="FsCheck.Xunit" Version="2.16.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />
<PackageReference Include="FSharp.Core" Version="6.0.5-beta.22329.3" />
<PackageReference Include="FSharp.Core" Version="6.0.6" />
<PackageReference Include="unquote" Version="6.1.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
Expand Down

0 comments on commit 8e4255a

Please sign in to comment.