Skip to content

Commit

Permalink
Port to Equinox4rc16, Propulsion3rc10
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored and oskardudycz committed May 17, 2024
1 parent 5a0b6e1 commit 9e13479
Show file tree
Hide file tree
Showing 22 changed files with 500 additions and 399 deletions.
22 changes: 10 additions & 12 deletions Sample/ECommerce.Equinox/ECommerce.Api/Program.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module ECommerce.Api.Program

open ECommerce
open ECommerce.Infrastructure // Args etc
open Microsoft.AspNetCore.Hosting
open Microsoft.Extensions.DependencyInjection
open Serilog
Expand All @@ -25,26 +24,26 @@ module Args =
| [<CliPrefix(CliPrefix.None); Last>] Sss of ParseResults<Args.Sss.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose _ -> "request verbose logging."
| Verbose -> "request verbose logging."
| PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off (optional if environment variable PROMETHEUS_PORT specified)"
| Cosmos _ -> "specify CosmosDB input parameters"
| Dynamo _ -> "specify DynamoDB input parameters"
| Esdb _ -> "specify EventStore input parameters"
| Sss _ -> "specify SqlStreamStore input parameters"
and [<RequireQualifiedAccess>]
Arguments(c : Configuration, a : ParseResults<Parameters>) =
member val Verbose = a.Contains Verbose
member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort)
Arguments(c : Configuration, p : ParseResults<Parameters>) =
member val Verbose = p.Contains Verbose
member val PrometheusPort = p.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort)
member val CacheSizeMb = 10
member val StoreArgs : Args.StoreArgs =
match a.TryGetSubCommand() with
match p.TryGetSubCommand() with
| Some (Parameters.Cosmos cosmos) -> Args.StoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos))
| Some (Parameters.Dynamo dynamo) -> Args.StoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo))
| Some (Parameters.Esdb es) -> Args.StoreArgs.Esdb (Args.Esdb.Arguments(c, es))
| Some (Parameters.Sss sss) -> Args.StoreArgs.Sss (Args.Sss.Arguments(c, sss))
| _ -> Args.missingArg "Must specify one of cosmos, dynamo, esdb or sss for store"
| _ -> p.Raise "Must specify one of cosmos, dynamo, esdb or sss for store"
member x.VerboseStore = Args.StoreArgs.verboseRequested x.StoreArgs
member x.Connect() : Domain.Config.Store<_> =
member x.Connect(): Store.Config =
let cache = Equinox.Cache (AppName, sizeMb = x.CacheSizeMb)
Args.StoreArgs.connectTarget x.StoreArgs cache

Expand Down Expand Up @@ -77,8 +76,7 @@ let main argv =
.Sinks(metrics, args.VerboseStore)
.CreateLogger()
try run args; 0
with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2
with e -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
with Args.MissingArg msg -> eprintfn $"%s{msg}"; 1
| :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
| e -> eprintfn "Exception %s" e.Message; 1
with:? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
| e -> eprintfn $"Exception %s{e.Message}"; 1
44 changes: 22 additions & 22 deletions Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
/// Each successive epoch is identified by an index, i.e. ConfirmedEpoch-0_0, then ConfirmedEpoch-0_1
module ECommerce.Domain.ConfirmedEpoch

let [<Literal>] Category = "ConfirmedEpoch"
let streamId epochId = Equinox.StreamId.gen2 ConfirmedSeriesId.toString ConfirmedEpochId.toString (ConfirmedSeriesId.wellKnownId, epochId)
let [<Literal>] CategoryName = "ConfirmedEpoch"
let streamId epochId = FsCodec.StreamId.gen2 ConfirmedSeriesId.toString ConfirmedEpochId.toString (ConfirmedSeriesId.wellKnownId, epochId)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand All @@ -19,11 +19,11 @@ module Events =
| Ingested of Ingested
| Closed
interface TypeShape.UnionContract.IUnionContract
let codec = Config.EventCodec.gen<Event>
let codecJsonElement = Config.EventCodec.genJsonElement<Event>
let codec = Store.Codec.gen<Event>
let codecJsonElement = Store.Codec.genJsonElement<Event>

let ofShoppingCartView cartId (view : ShoppingCart.Details.View) : Events.Cart =
{ cartId = cartId; items = [| for i in view.items -> { productId = i.productId; unitPrice = i.unitPrice; quantity = i.quantity }|] }
{ cartId = cartId; items = [| for i in view.items -> { productId = i.productId; unitPrice = i.unitPrice; quantity = i.quantity } |] }

let itemId (x : Events.Cart) : CartId = x.cartId
let (|ItemIds|) : Events.Cart[] -> CartId[] = Array.map itemId
Expand All @@ -45,20 +45,20 @@ let notAlreadyIn (ids : CartId seq) =
/// Manages ingestion of only items not already in the list
/// Yields residual net of items already present in this epoch
// NOTE See feedSource template for more advanced version handling splitting large input requests where epoch limit is strict
let decide shouldClose candidates (currentIds, closed as state) : ExactlyOnceIngester.IngestResult<_,_> * Events.Event list =
let decide shouldClose candidates (currentIds, closed as state) : ExactlyOnceIngester.IngestResult<_,_> * Events.Event[] =
match closed, candidates |> Array.filter (notAlreadyIn currentIds) with
| false, fresh ->
let added, events =
match fresh with
| [||] -> [||], []
| [||] -> [||], [||]
| ItemIds freshIds ->
let closing = shouldClose currentIds freshIds
let ingestEvent = Events.Ingested { carts = fresh }
freshIds, if closing then [ ingestEvent ; Events.Closed ] else [ ingestEvent ]
freshIds, if closing then [| ingestEvent ; Events.Closed |] else [| ingestEvent |]
let _, closed = Fold.fold state events
{ accepted = added; closed = closed; residual = [||] }, events
| true, fresh ->
{ accepted = [||]; closed = true; residual = fresh }, []
{ accepted = [||]; closed = true; residual = fresh }, [||]

// NOTE see feedSource for example of separating Service logic into Ingestion and Read Services in order to vary the folding and/or state held
type Service internal
Expand All @@ -72,7 +72,7 @@ type Service internal
// NOTE decider which will initially transact against potentially stale cached state, which will trigger a
// resync if another writer has gotten in before us. This is a conscious decision in this instance; the bulk
// of writes are presumed to be coming from within this same process
decider.Transact(decide shouldClose carts, load = Equinox.AllowStale)
decider.Transact(decide shouldClose carts, load = Equinox.LoadOption.AnyCachedValue)

/// Returns all the items currently held in the stream (Not using AllowStale on the assumption this needs to see updates from other apps)
member _.Read epochId : Async<Fold.State> =
Expand All @@ -81,13 +81,13 @@ type Service internal

module Config =

let private create_ shouldClose cat = Service(shouldClose, streamId >> Config.createDecider cat Category)
let private create_ shouldClose cat = Service(shouldClose, streamId >> Store.createDecider cat)
let private (|Category|) = function
| Config.Store.Memory store -> Config.Memory.create Events.codec Fold.initial Fold.fold store
| Config.Store.Cosmos (context, cache) -> Config.Cosmos.createUnoptimized Events.codecJsonElement Fold.initial Fold.fold (context, cache)
| Config.Store.Dynamo (context, cache) -> Config.Dynamo.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache)
| Config.Store.Esdb (context, cache) -> Config.Esdb.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache)
| Config.Store.Sss (context, cache) -> Config.Sss.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createUnoptimized CategoryName Events.codecJsonElement Fold.initial Fold.fold (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
| Store.Config.Sss (context, cache) -> Store.Sss.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let shouldClose maxItemsPerEpoch candidateItems currentItems = Array.length currentItems + Array.length candidateItems >= maxItemsPerEpoch
let create maxItemsPerEpoch (Category cat) = create_ (shouldClose maxItemsPerEpoch) cat

Expand Down Expand Up @@ -116,9 +116,9 @@ module Reader =
module Config =

let private (|Category|) = function
| Config.Store.Memory store -> Config.Memory.create Events.codec initial fold store
| Config.Store.Cosmos (context, cache) -> Config.Cosmos.createUnoptimized Events.codecJsonElement initial fold (context, cache)
| Config.Store.Dynamo (context, cache) -> Config.Dynamo.createUnoptimized Events.codec initial fold (context, cache)
| Config.Store.Esdb (context, cache) -> Config.Esdb.createUnoptimized Events.codec initial fold (context, cache)
| Config.Store.Sss (context, cache) -> Config.Sss.createUnoptimized Events.codec initial fold (context, cache)
let create (Category cat) = Service(streamId >> Config.createDecider cat Category)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec initial fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createUnoptimized CategoryName Events.codecJsonElement initial fold (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createUnoptimized CategoryName Events.codec initial fold (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec initial fold (context, cache)
| Store.Config.Sss (context, cache) -> Store.Sss.createUnoptimized CategoryName Events.codec initial fold (context, cache)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
30 changes: 15 additions & 15 deletions Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedSeries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
/// As an Epoch is marked `Closed`, the Ingester will mark a new Epoch `Started` on this aggregate via MarkIngestionEpochId
module ECommerce.Domain.ConfirmedSeries

let [<Literal>] Category = "ConfirmedSeries"
let [<Literal>] CategoryName = "ConfirmedSeries"
// TOCONSIDER: if you need multiple lists series/epochs in a single system, the Series and Epoch streams should have a SeriesId in the stream name
// See also the implementation in the feedSource template, where the Series aggregate also functions as an index of series held in the system
let streamId () = Equinox.StreamId.gen ConfirmedSeriesId.toString ConfirmedSeriesId.wellKnownId
let streamId () = FsCodec.StreamId.gen ConfirmedSeriesId.toString ConfirmedSeriesId.wellKnownId

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand All @@ -16,8 +16,8 @@ module Events =
| Started of {| epochId : ConfirmedEpochId |}
| Snapshotted of {| active : ConfirmedEpochId |}
interface TypeShape.UnionContract.IUnionContract
let codec = Config.EventCodec.gen<Event>
let codecJsonElement = Config.EventCodec.genJsonElement<Event>
let codec = Store.Codec.gen<Event>
let codecJsonElement = Store.Codec.genJsonElement<Event>

module Fold =

Expand All @@ -31,9 +31,9 @@ module Fold =
let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let toSnapshot s = Events.Snapshotted {| active = Option.get s |}

let interpret epochId (state : Fold.State) =
[if state |> Option.forall (fun cur -> cur < epochId) && epochId >= ConfirmedEpochId.initial then
yield Events.Started {| epochId = epochId |}]
let interpret epochId (state : Fold.State) = [|
if state |> Option.forall (fun cur -> cur < epochId) && epochId >= ConfirmedEpochId.initial then
yield Events.Started {| epochId = epochId |}|]

type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.State>) =

Expand All @@ -46,15 +46,15 @@ type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.Stat
/// Mark specified `epochId` as live for the purposes of ingesting
/// Writers are expected to react to having writes to an epoch denied (due to it being Closed) by anointing a successor via this
member _.MarkIngestionEpochId epochId : Async<unit> =
let decider = resolve()
decider.Transact(interpret epochId, load = Equinox.AllowStale)
let decider = resolve ()
decider.Transact(interpret epochId, load = Equinox.LoadOption.AnyCachedValue)

module Config =

let private (|Category|) = function
| Config.Store.Memory store -> Config.Memory.create Events.codec Fold.initial Fold.fold store
| Config.Store.Cosmos (context, cache) -> Config.Cosmos.createSnapshotted Events.codecJsonElement Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Config.Store.Dynamo (context, cache) -> Config.Dynamo.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Config.Store.Esdb (context, cache) -> Config.Esdb.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache)
| Config.Store.Sss (context, cache) -> Config.Sss.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Config.createDecider cat Category)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJsonElement Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
| Store.Config.Sss (context, cache) -> Store.Sss.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
18 changes: 10 additions & 8 deletions Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

<ItemGroup>
<Compile Include="ExactlyOnceIngester.fs" />
<Compile Include="Config.fs" />
<Compile Include="Store.fs" />
<Compile Include="Streams.fs" />
<Compile Include="Types.fs" />
<Compile Include="RandomProductPriceCalculator.fs" />
<Compile Include="ShoppingCart.fs" />
Expand All @@ -17,13 +18,14 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.6" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.6" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.6" />
<PackageReference Include="Equinox.SqlStreamStore" Version="4.0.0-rc.6" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.6" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.9" />
<PackageReference Include="Propulsion" Version="3.0.0-beta.7" />
<PackageReference Include="Equinox.Core" Version="4.0.0-rc.16" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.16" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.16" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.16" />
<PackageReference Include="Equinox.SqlStreamStore" Version="4.0.0-rc.16" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.16" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.15" />
<PackageReference Include="Propulsion" Version="3.0.0-rc.10" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Service<[<Measure>]'id, 'req, 'res, 'outcome> internal
/// a) back-off, re-read and retry if there's a concurrent write Optimistic Concurrency Check failure when writing the stream
/// b) enter a prolonged period of retries if multiple concurrent writes trigger rate limiting and 429s from CosmosDB
/// c) readers will less frequently encounter sustained 429s on the batch
let batchedIngest = Equinox.Core.AsyncBatchingGate(tryIngest, linger)
let batchedIngest = Equinox.Core.Batching.Batcher(tryIngest, linger)

/// Run the requests over a chain of epochs.
/// Returns the subset that actually got handled this time around (exclusive of items that did not trigger events per idempotency rules).
Expand Down
Loading

0 comments on commit 9e13479

Please sign in to comment.