Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Esdb+Sss Sources: Add fromTail, categoryFilter #173

Merged
merged 8 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.CosmosStore3`: Special cased version of `Propulsion.CosmosStore` to target `Equinox.CosmosStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.CosmosStore` by updating `Equinox.CosmosStore` dependencies to `4.0.0`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/140)
- `Propulsion.MemoryStore`: `MemoryStoreSource` to align with other sources for integration testing. Includes *deterministic* `AwaitCompletion` as per `Propulsion.Feed`-based Sources [#165](https://github.com/jet/propulsion/pull/165)
- `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142) :pray: [@brihadish](https://github.com/brihadish)

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/Propulsion.Cosmos.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.Cosmos" Version="[2.6.0, 2.99.0]" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.7.1" />
<PackageReference Include="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" Version="2.4.0" />
</ItemGroup>

Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.CosmosStore/CosmosStoreParser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module EquinoxSystemTextJsonParser =
unixEpoch.AddSeconds(ts.GetDouble())

/// Sanity check to determine whether the Document represents an `Equinox.Cosmos` >= 1.0 based batch
let tryParseEquinoxBatch streamFilter (d : System.Text.Json.JsonDocument) =
let tryParseEquinoxBatch categoryFilter (d : System.Text.Json.JsonDocument) =
let r = d.RootElement
let tryProp (id : string) : ValueOption<System.Text.Json.JsonElement> =
let mutable p = Unchecked.defaultof<_>
Expand All @@ -39,16 +39,16 @@ module EquinoxSystemTextJsonParser =
match tryProp "p" with
| ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" ->
let streamName = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{aggregateId}" form (or we'll throw)
if streamFilter (FsCodec.StreamName.splitCategoryAndStreamId streamName) then ValueSome (struct (streamName, d.Cast<Batch>())) else ValueNone
if categoryFilter (FsCodec.StreamName.category streamName) then ValueSome (struct (streamName, d.Cast<Batch>())) else ValueNone
| _ -> ValueNone

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents struct (streamName, batch : Batch) : Default.StreamEvent seq =
batch.e |> Seq.mapi (fun offset x -> streamName, FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, batch.MapData x.d, batch.MapData x.m, timestamp = x.t))

/// Collects all events with a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let enumStreamEvents streamFilter d : Default.StreamEvent seq =
tryParseEquinoxBatch streamFilter d |> ValueOption.map enumEquinoxCosmosEvents |> ValueOption.defaultValue Seq.empty
let enumStreamEvents categoryFilter d : Default.StreamEvent seq =
tryParseEquinoxBatch categoryFilter d |> ValueOption.map enumEquinoxCosmosEvents |> ValueOption.defaultValue Seq.empty
#else
#if COSMOSV2
module EquinoxCosmosParser =
Expand Down
5 changes: 3 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type CosmosStoreSource =
static member Start
( log : ILogger,
monitored : Container, leases : Container, processorName, observer,
startFromTail, ?maxItems, ?lagReportFreq : TimeSpan, ?notifyError, ?customize) =
?maxItems, ?tailSleepInterval, ?startFromTail, ?lagReportFreq : TimeSpan, ?notifyError, ?customize) =
let databaseId, containerId = monitored.Database.Id, monitored.Id
#endif
let logLag (interval : TimeSpan) (remainingWork : (int*int64) list) = async {
Expand All @@ -158,7 +158,8 @@ type CosmosStoreSource =
let source =
ChangeFeedProcessor.Start
( log, monitored, leases, processorName, observer, ?notifyError=notifyError, ?customize=customize,
startFromTail=startFromTail, ?reportLagAndAwaitNextEstimation=maybeLogLag, ?maxItems=maxItems,
?maxItems = maxItems, ?feedPollDelay = tailSleepInterval, ?reportLagAndAwaitNextEstimation=maybeLogLag,
startFromTail = defaultArg startFromTail false,
leaseAcquireInterval=TimeSpan.FromSeconds 5., leaseRenewInterval=TimeSpan.FromSeconds 5., leaseTtl=TimeSpan.FromSeconds 10.)
lagReportFreq |> Option.iter (fun s -> log.Information("ChangeFeed {processorName} Lag stats interval {lagReportIntervalS:n0}s", processorName, s.TotalSeconds))
source
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-beta.12" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.7.1" />
</ItemGroup>

<ItemGroup>
Expand Down
19 changes: 17 additions & 2 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,19 @@ module Events =
| Updated of Updated
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
#if MEMORYSTORE
let codec = FsCodec.Box.Codec.Create<Event>()
#else
#if DYNAMOSTORE
let codec = FsCodec.SystemTextJson.Codec.Create<Event>() |> FsCodec.Deflate.EncodeTryDeflate
let codec = FsCodec.SystemTextJson.Codec.Create<Event>() |> FsCodec.Deflate.EncodeUncompressed
#else
#if !COSMOSV3 && !COSMOSV2
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>()
#else
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
#endif
#endif
#endif

module Fold =

Expand Down Expand Up @@ -133,6 +137,16 @@ type Service internal (resolve : SourceId * TrancheId * string -> Decider<Events
let decider = resolve (source, tranche, consumerGroupName)
decider.Transact(decideOverride DateTimeOffset.UtcNow defaultCheckpointFrequency pos)

#if MEMORYSTORE
module MemoryStore =

open Equinox.MemoryStore

let create log (consumerGroupName, defaultCheckpointFrequency) context =
let cat = MemoryStoreCategory(context, Events.codec, Fold.fold, Fold.initial)
let resolve = Equinox.Decider.resolve log cat
Service(streamName4 >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if DYNAMOSTORE
module DynamoStore =

Expand All @@ -143,7 +157,7 @@ module DynamoStore =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let cat = DynamoStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.resolve log cat
Service(streamName4 >> resolve, consumerGroupName, defaultCheckpointFrequency)
Service(streamName4 >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if !COSMOSV2 && !COSMOSV3
module CosmosStore =
Expand Down Expand Up @@ -188,3 +202,4 @@ module CosmosStore =
#endif
#endif
#endif
#endif
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="[3.0.7, 3.99.0]" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.7.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.27.0" />
</ItemGroup>

Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module Events =
| Ingested of Ingested
| Closed
interface TypeShape.UnionContract.IUnionContract
let codec = EventCodec.create<Event>()
let codec = EventCodec.gen<Event>

let next (x : Events.StreamSpan) = int x.i + x.c.Length
/// Aggregates all spans per stream into a single Span from the lowest index to the highest
Expand Down Expand Up @@ -109,7 +109,7 @@ type Service internal (shouldClose, resolve : struct (AppendsTrancheId * Appends
let decider = resolve (trancheId, epochId)
if Array.isEmpty spans then async { return { accepted = [||]; closed = false; residual = [||] } } else // special-case null round-trips

let isSelf p = IndexStreamId.toStreamName p |> FsCodec.StreamName.splitCategoryAndStreamId |> ValueTuple.fst = Category
let isSelf p = match IndexStreamId.toStreamName p with FsCodec.StreamName.Category c -> c = Category
if spans |> Array.exists (function { p = p } -> isSelf p) then invalidArg (nameof spans) "Writes to indices should be filtered prior to indexing"
decider.TransactEx((fun c -> (Ingest.decide (shouldClose (c.StreamEventBytes, c.Version))) spans c.State), if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AllowStale)

Expand Down
11 changes: 5 additions & 6 deletions src/Propulsion.DynamoStore/AppendsIndex.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Events =
| Started of {| tranche : AppendsTrancheId; epoch : AppendsEpochId |}
| Snapshotted of {| active : Map<AppendsTrancheId, AppendsEpochId> |}
interface TypeShape.UnionContract.IUnionContract
let codec = EventCodec.create<Event>()
let codec = EventCodec.gen<Event>

module Fold =

Expand Down Expand Up @@ -52,10 +52,9 @@ type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.Stat

module Config =

let private createCategory store =
Config.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) store
let resolveDecider log store = createCategory store |> Equinox.Decider.resolve log
let create log (context, cache) = Service(streamName >> resolveDecider log (context, Some cache))
let private createCategory store = Config.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) store
let resolve log store = createCategory store |> Equinox.Decider.resolve log
let create log (context, cache) = Service(streamName >> resolve log (context, Some cache))

/// On the Reading Side, there's no advantage to caching (as we have snapshots, and it's Dynamo)
module Reader =
Expand All @@ -80,4 +79,4 @@ module Reader =
let decider = resolve ()
decider.Query(readIngestionEpochId trancheId)

let create log context = Service(streamName >> Config.resolveDecider log (context, None))
let create log context = Service(streamName >> Config.resolve log (context, None))
30 changes: 15 additions & 15 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -141,39 +141,39 @@ module private Impl =

[<NoComparison; NoEquality>]
type LoadMode =
| All
| Filtered of filter : (struct (string * string) -> bool)
| Hydrated of filter : (struct (string * string) -> bool)
/// Skip loading of Data/Meta for events; this is the most efficient mode as it means the Source only needs to read from the index
| WithoutEventBodies of categoryFilter : (string -> bool)
/// Populates the Data/Meta fields for events; necessitates loads of all individual streams that pass the categoryFilter before they can be handled
| Hydrated of categoryFilter : (string -> bool)
* degreeOfParallelism : int
* /// Defines the Context to use when loading the bodies
* /// Defines the Context to use when loading the Event Data/Meta
storeContext : DynamoStoreContext
module internal LoadMode =
let private mapTimelineEvent = FsCodec.Core.TimelineEvent.Map FsCodec.Deflate.EncodedToUtf8
let private withBodies (eventsContext : Equinox.DynamoStore.Core.EventsContext) filter =
let private withBodies (eventsContext : Equinox.DynamoStore.Core.EventsContext) categoryFilter =
fun sn (i, cs : string array) ->
if filter (FsCodec.StreamName.splitCategoryAndStreamId sn) then
if categoryFilter (FsCodec.StreamName.category sn) then
ValueSome (async { let! _pos, events = eventsContext.Read(FsCodec.StreamName.toString sn, i, maxCount = cs.Length)
return events |> Array.map mapTimelineEvent })
else ValueNone
let private withoutBodies filter =
let private withoutBodies categoryFilter =
fun sn (i, cs) ->
let renderEvent offset c = FsCodec.Core.TimelineEvent.Create(i + int64 offset, eventType = c, data = Unchecked.defaultof<_>)
if filter (FsCodec.StreamName.splitCategoryAndStreamId sn) then ValueSome (async { return cs |> Array.mapi renderEvent }) else ValueNone
if categoryFilter (FsCodec.StreamName.category sn) then ValueSome (async { return cs |> Array.mapi renderEvent }) else ValueNone
let map storeLog : LoadMode -> _ = function
| All -> false, withoutBodies (fun _ -> true), 1
| Filtered filter -> false, withoutBodies filter, 1
| Hydrated (filter, dop, storeContext) ->
| WithoutEventBodies categoryFilter -> false, withoutBodies categoryFilter, 1
| Hydrated (categoryFilter, dop, storeContext) ->
let eventsContext = Equinox.DynamoStore.Core.EventsContext(storeContext, storeLog)
true, withBodies eventsContext filter, dop
true, withBodies eventsContext categoryFilter, dop

type DynamoStoreSource
( log : Serilog.ILogger, statsInterval,
indexClient : DynamoStoreClient, batchSizeCutoff, tailSleepInterval,
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
// If the Handler does not utilize the bodies of the events, we can avoid loading them from the Store
// If the Handler does not utilize the Data/Meta of the events, we can avoid loading them from the Store
loadMode : LoadMode,
// Override default start position to be at the tail of the index (Default: Always replay all events)
?fromTail,
?startFromTail,
// Separated log for DynamoStore calls in order to facilitate filtering and/or gathering metrics
?storeLog,
?readFailureSleepInterval,
Expand All @@ -185,7 +185,7 @@ type DynamoStoreSource
(log, defaultArg sourceId FeedSourceId.wellKnownId, defaultArg storeLog log)
(LoadMode.map (defaultArg storeLog log) loadMode) batchSizeCutoff (DynamoStoreContext indexClient),
checkpoints,
( if fromTail <> Some true then None
( if startFromTail <> Some true then None
else Some (Impl.readTailPositionForTranche (defaultArg storeLog log) (DynamoStoreContext indexClient))),
sink,
Impl.renderPos,
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-beta.12" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.7.1" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 4 additions & 2 deletions src/Propulsion.DynamoStore/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ module internal Config =

open Equinox.DynamoStore

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20.

let private create codec initial fold accessStrategy (context, cache) =
let cs = match cache with None -> CachingStrategy.NoCaching | Some cache -> CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let cs = match cache with None -> CachingStrategy.NoCaching | Some cache -> CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
DynamoStoreCategory(context, codec, fold, initial, cs, accessStrategy)

let createSnapshotted codec initial fold (isOrigin, toSnapshot) (context, cache) =
Expand All @@ -74,7 +76,7 @@ module internal Config =

module internal EventCodec =

let create<'t when 't :> TypeShape.UnionContract.IUnionContract> () =
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
FsCodec.SystemTextJson.Codec.Create<'t>() |> FsCodec.Deflate.EncodeTryDeflate
let private withUpconverter<'c, 'e when 'c :> TypeShape.UnionContract.IUnionContract> up : FsCodec.IEventCodec<'e, _, _> =
let down (_ : 'e) = failwith "Unexpected"
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/Propulsion.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStore" Version="[3.0.7, 3.99.0]" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.7.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading