Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Target Equinox 4.0.0-rc.12.8 #225

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.12" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.12.8" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down
62 changes: 32 additions & 30 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ module Fold =
| Events.Started { config = cfg; origin=originState } -> Running { config = cfg; state = originState }
| Events.Updated e | Events.Checkpointed e | Events.Overrode e -> Running { config = e.config; state = e.pos }
| Events.Snapshotted runningState -> Running runningState
let fold : State -> Events.Event seq -> State = Seq.fold evolve
let fold: State -> Events.Event seq -> State = Seq.fold evolve // NOTE Leave as Seq for interop with COSMOSV3

let isOrigin _state = true // we can build a state from any of the events and/or an unfold

Expand Down Expand Up @@ -96,31 +96,40 @@ let decideStart establishOrigin at freq state = async {
| Fold.NotStarted ->
let! origin = establishOrigin
let config, checkpoint = mk at freq origin
return checkpoint.pos, [Events.Started { config = config; origin = checkpoint}]
return checkpoint.pos, [| Events.Started { config = config; origin = checkpoint } |]
| Fold.Running s ->
return s.state.pos, [] }
return s.state.pos, [||] }

let decideOverride at (freq : TimeSpan) pos = function
| Fold.Running s when s.state.pos = pos && s.config.checkpointFreqS = int freq.TotalSeconds -> []
| Fold.Running s when s.state.pos = pos && s.config.checkpointFreqS = int freq.TotalSeconds -> [||]
| _ ->
let config, checkpoint = mk at freq pos
[Events.Overrode { config = config; pos = checkpoint}]
[| Events.Overrode { config = config; pos = checkpoint } |]

let decideUpdate at pos = function
| Fold.NotStarted -> failwith "Cannot Commit a checkpoint for a series that has not been Started"
| Fold.Running state ->
| Fold.Running state -> [|
if at < state.state.nextCheckpointDue then
if pos = state.state.pos then [] // No checkpoint due, pos unchanged => No write
else // No checkpoint due, pos changed => Write, but maintain same nextCheckpointDue
[Events.Updated { config = state.config; pos = mkCheckpoint at state.state.nextCheckpointDue pos }]
if pos <> state.state.pos then // No checkpoint due, pos changed => Write, but maintain same nextCheckpointDue
Events.Updated { config = state.config; pos = mkCheckpoint at state.state.nextCheckpointDue pos }
else // Checkpoint due => Force a write every N seconds regardless of whether the position has actually changed
let freq = TimeSpan.FromSeconds(float state.config.checkpointFreqS)
let config, checkpoint = mk at freq pos
[Events.Checkpointed { config = config; pos = checkpoint }]
Events.Checkpointed { config = config; pos = checkpoint } |]

type Decider<'e, 's> = Equinox.Decider<'e, 's>
#if COSMOSV3
module Equinox =
let AnyCachedValue = ()
type Equinox.Decider<'e, 's> with
member x.TransactAsync(decide, load : unit): Async<'r> =
x.TransactAsync(fun s -> async { let! r, es = decide s in return r, Array.toList es })
member x.Transact(decide, load : unit): Async<'r> =
x.Transact(decide >> function r, es -> r, Array.toList es)
member x.Transact(decide, ?load : unit): Async<unit> =
x.Transact(decide >> Array.toList)
#endif

type Service internal (resolve : SourceId * TrancheId * string -> Decider<Events.Event, Fold.State>, consumerGroupName, defaultCheckpointFrequency) =
type Service internal (resolve: SourceId * TrancheId * string -> Equinox.Decider<Events.Event, Fold.State>, consumerGroupName, defaultCheckpointFrequency) =

interface IFeedCheckpointStore with

Expand All @@ -129,21 +138,14 @@ type Service internal (resolve : SourceId * TrancheId * string -> Decider<Events
member _.Start(source, tranche, establishOrigin, ct) : Task<Position> =
let decider = resolve (source, tranche, consumerGroupName)
let establishOrigin = match establishOrigin with None -> async { return Position.initial } | Some f -> Async.call f.Invoke
#if COSMOSV3
decider.TransactAsync(decideStart establishOrigin DateTimeOffset.UtcNow defaultCheckpointFrequency)
#else
decider.TransactAsync(decideStart establishOrigin DateTimeOffset.UtcNow defaultCheckpointFrequency, load = Equinox.AnyCachedValue)
#endif
|> Async.executeAsTask ct

/// Ingest a position update
/// NB fails if not already initialized; caller should ensure correct initialization has taken place via Read -> Start
member _.Commit(source, tranche, pos : Position, ct) =
let decider = resolve (source, tranche, consumerGroupName)
#if COSMOSV3
decider.Transact(decideUpdate DateTimeOffset.UtcNow pos)
#else
decider.Transact(decideUpdate DateTimeOffset.UtcNow pos, load = Equinox.AnyCachedValue)
#endif
|> Async.executeAsTask ct :> _

/// Override a checkpointing series with the supplied parameters
Expand All @@ -157,9 +159,9 @@ module MemoryStore =
open Equinox.MemoryStore

let create log (consumerGroupName, defaultCheckpointFrequency) context =
let cat = MemoryStoreCategory(context, Events.codec, Fold.fold, Fold.initial)
let resolve = Equinox.Decider.resolve log cat
Service(streamId >> resolve Category, consumerGroupName, defaultCheckpointFrequency)
let cat = MemoryStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if DYNAMOSTORE
module DynamoStore =
Expand All @@ -169,9 +171,9 @@ module DynamoStore =
let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute)
let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let cat = DynamoStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.resolve log cat
Service(streamId >> resolve Category, consumerGroupName, defaultCheckpointFrequency)
let cat = DynamoStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if !COSMOSV3
module CosmosStore =
Expand All @@ -181,12 +183,12 @@ module CosmosStore =
let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute)
let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.resolve log cat
Service(streamId >> resolve Category, consumerGroupName, defaultCheckpointFrequency)
let cat = CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
let private create log defaultCheckpointFrequency resolveStream =
let resolve id = Decider(log, resolveStream Equinox.AllowStale (streamName id), maxAttempts = 3)
let resolve id = Equinox.Decider(log, resolveStream Equinox.AllowStale (streamName id), maxAttempts = 3)
Service(resolve, null, defaultCheckpointFrequency)

#if COSMOSV3
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<PackageReference Include="Equinox.CosmosStore" Version="[3.0.7, 3.99.0]" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.10" />
<PackageReference Include="FSharp.Control.TaskSeq" Version="0.4.0-alpha.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.27.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.27.0" ExcludeAssets="contentfiles" />
</ItemGroup>

<ItemGroup>
Expand Down
20 changes: 10 additions & 10 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ module Fold =
let private evolve (state : State) = function
| Events.Ingested e -> state.With(e)
| Events.Closed -> state.WithClosed()
let fold : State -> Events.Event seq -> State = Seq.fold evolve
let fold = Array.fold evolve

module Ingest =

Expand Down Expand Up @@ -102,16 +102,16 @@ module Ingest =
| ({ closed = false; versions = cur } as state : Fold.State) ->
let closed, ingested, events =
match tryToIngested state inputs with
| None -> false, Array.empty, []
| None -> false, Array.empty, [||]
| Some diff ->
let closing = shouldClose (diff.app.Length + diff.add.Length + cur.Count)
let ingestEvent = Events.Ingested diff
let ingested = (seq { for x in diff.add -> x.p }, seq { for x in diff.app -> x.p }) ||> Seq.append |> Array.ofSeq
closing, ingested, [ ingestEvent ; if closing then Events.Closed ]
closing, ingested, [| ingestEvent ; if closing then Events.Closed |]
let res : ExactlyOnceIngester.IngestResult<_, _> = { accepted = ingested; closed = closed; residual = [||] }
res, events
| { closed = true } as state ->
{ accepted = [||]; closed = true; residual = removeDuplicates state inputs }, []
{ accepted = [||]; closed = true; residual = removeDuplicates state inputs }, [||]

type Service internal (shouldClose, resolve : AppendsPartitionId * AppendsEpochId -> Equinox.Decider<Events.Event, Fold.State>) =

Expand All @@ -125,15 +125,15 @@ type Service internal (shouldClose, resolve : AppendsPartitionId * AppendsEpochI

module Config =

let private createCategory (context, cache) = Config.createUnoptimized Events.codec Fold.initial Fold.fold (context, Some cache)
let private createCategory (context, cache) = Config.createUnoptimized Category Events.codec Fold.initial Fold.fold (context, Some cache)
let create log (maxBytes : int, maxVersion : int64, maxStreams : int) store =
let resolve = createCategory store |> Equinox.Decider.resolve log
let resolve = createCategory store |> Equinox.Decider.forStream log
let shouldClose (totalBytes : int64 voption, version) totalStreams =
let closing = totalBytes.Value > maxBytes || version >= maxVersion || totalStreams >= maxStreams
if closing then log.Information("Epoch Closing v{version}/{maxVersion} {streams}/{maxStreams} streams {kib:f0}/{maxKib:f0} KiB",
version, maxVersion, totalStreams, maxStreams, float totalBytes.Value / 1024., float maxBytes / 1024.)
closing
Service(shouldClose, streamId >> resolve Category)
Service(shouldClose, streamId >> resolve)

/// Manages the loading of Ingested Span Batches in a given Epoch from a given position forward
/// In the case where we are polling the tail, this should mean we typically do a single round-trip for a point read of the Tip
Expand Down Expand Up @@ -166,8 +166,8 @@ module Reader =

module Config =

let private createCategory context minIndex = Config.createWithOriginIndex codec initial fold context minIndex
let private createCategory context minIndex = Config.createWithOriginIndex Category codec initial fold context minIndex
let create log context =
let resolve minIndex = Equinox.Decider.resolve log (createCategory context minIndex)
Service(fun (pid, eid, minIndex) -> streamId (pid, eid) |> resolve minIndex Category)
let resolve minIndex = Equinox.Decider.forStream log (createCategory context minIndex)
Service(fun (pid, eid, minIndex) -> streamId (pid, eid) |> resolve minIndex)
#endif
20 changes: 10 additions & 10 deletions src/Propulsion.DynamoStore/AppendsIndex.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ module Fold =
type State = Map<AppendsPartitionId, AppendsEpochId>

let initial = Map.empty
let evolve state = function
let private evolve state = function
| Events.Started e -> state |> Map.add e.partition e.epoch
| Events.Snapshotted e -> e.active
let fold : State -> Events.Event seq -> State = Seq.fold evolve
let fold = Array.fold evolve

let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let toSnapshot s = Events.Snapshotted {| active = s |}
Expand All @@ -43,11 +43,11 @@ let readEpochId partitionId (state : Fold.State) =
state
|> Map.tryFind partitionId

let interpret (partitionId, epochId) (state : Fold.State) =
[if state |> readEpochId partitionId |> Option.forall (fun cur -> cur < epochId) && epochId >= AppendsEpochId.initial then
yield Events.Started { partition = partitionId; epoch = epochId }]
let interpret (partitionId, epochId) (state : Fold.State) = [|
if state |> readEpochId partitionId |> Option.forall (fun cur -> cur < epochId) && epochId >= AppendsEpochId.initial then
Events.Started { partition = partitionId; epoch = epochId } |]

type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (resolve: unit -> Equinox.Decider<Events.Event, Fold.State>) =

/// Determines the current active epoch for the specified Partition
member _.ReadIngestionEpochId(partitionId) : Async<AppendsEpochId> =
Expand All @@ -62,9 +62,9 @@ type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.Stat

module Config =

let private createCategory store = Config.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) store
let resolve log store = createCategory store |> Equinox.Decider.resolve log
let create log (context, cache) = Service(streamId >> resolve log (context, Some cache) Category)
let private createCategory store = Config.createSnapshotted Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) store
let resolve log store = createCategory store |> Equinox.Decider.forStream log
let create log (context, cache) = Service(streamId >> resolve log (context, Some cache))

/// On the Reading Side, there's no advantage to caching (as we have snapshots, and it's Dynamo)
module Reader =
Expand All @@ -89,5 +89,5 @@ module Reader =
let decider = resolve ()
decider.Query(readIngestionEpochId partitionId)

let create log context = Service(streamId >> Config.resolve log (context, None) Category)
let create log context = Service(streamId >> Config.resolve log (context, None))
#endif
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.12" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.12.8" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down
16 changes: 8 additions & 8 deletions src/Propulsion.DynamoStore/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,27 @@ module internal Config =

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20.

let private create codec initial fold accessStrategy (context, cache) =
let private create name codec initial fold accessStrategy (context, cache) =
let cs = match cache with None -> CachingStrategy.NoCaching | Some cache -> CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
DynamoStoreCategory(context, codec, fold, initial, cs, accessStrategy)
DynamoStoreCategory(context, name, codec, fold, initial, accessStrategy, cs)

let createSnapshotted codec initial fold (isOrigin, toSnapshot) (context, cache) =
let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) =
let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot)
create codec initial fold accessStrategy (context, cache)
create name codec initial fold accessStrategy (context, cache)

let createUnoptimized codec initial fold (context, cache) =
let createUnoptimized name codec initial fold (context, cache) =
let accessStrategy = AccessStrategy.Unoptimized
create codec initial fold accessStrategy (context, cache)
create name codec initial fold accessStrategy (context, cache)

let createWithOriginIndex codec initial fold context minIndex =
let createWithOriginIndex name codec initial fold context minIndex =
// TOCONSIDER include way to limit item count being read
// TOCONSIDER implement a loader hint to pass minIndex to the query as an additional filter
let isOrigin struct (i, _) = i <= minIndex
// There _should_ always be an event at minIndex - if there isn't for any reason, the load might go back one event too far
// Here we trim it for correctness (although Propulsion would technically ignore it)
let trimPotentialOverstep = Seq.filter (fun struct (i, _e) -> i >= minIndex)
let accessStrategy = AccessStrategy.MultiSnapshot (isOrigin, fun _ -> failwith "writing not applicable")
create codec initial (fun s -> trimPotentialOverstep >> fold s) accessStrategy (context, None)
create name codec initial (fun s -> trimPotentialOverstep >> fold s) accessStrategy (context, None)

module internal EventCodec =

Expand Down
Loading
Loading