-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
eqxFc: Location with Long Running Inventory #40
Open
bartelink
wants to merge
51
commits into
master
Choose a base branch
from
add-fc
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
6abe2d5
Add Location initial impl
bartelink f203680
Add Fc template wrapping
bartelink 7a3690e
Correct AccessStrategy
bartelink 2a7f6ba
Fix streamId ordering
bartelink 5828bb8
Tidy log output
bartelink 6aceccc
Tidy connection logic
bartelink 5bd3ccf
Add README
bartelink 2600c20
Move AggregateId to Events
bartelink 7c7db7d
Aggregate layout/naming consistency
bartelink d8dfa0e
Sync layout
bartelink 4f599a3
wip
bartelink ce36a09
wip
bartelink c7ef3d5
Complete Location, Inventory and LocationTests
bartelink 20605ee
Yes, comments
bartelink ff7b940
tmp
bartelink 4b6781d
WIP code snipped
bartelink 1c47ee1
Add missing README.md
bartelink 9229fde
Tidy accessStrategy
bartelink 0895776
Formatting
bartelink 2daf7f0
InventoryTransaction wip
bartelink e6b3e86
Handle FsCodec _ restriction
bartelink ca1d957
Remove inventoryId from Trans SN
bartelink 40d5b2b
Remove checkpoints from InventorySeries
bartelink 21f50c9
Tidying / formatting
bartelink eba71f9
Fix batch vs epoch naming in inventory
bartelink d05eab2
Complete Process Manager Apply
bartelink e3582b9
Target V2s
bartelink 2c34ca2
wip
bartelink ca7f60c
Process Manager wip
bartelink 577328b
Minor renames; tests not yet compiling
bartelink bd6ba8e
WIP
bartelink 2ae86f2
Cover denial of Remove action
bartelink 49ffb38
Add explicit Service action methods on Process Manager
bartelink 5dd465e
Add Watchdog
bartelink d91dc51
Add Watchdog to Fc sln
bartelink 947eb5d
Tidy wiring
bartelink 8d0e454
Apply style changes from 4.2
bartelink 099cdf7
Stragglers
bartelink e64fe37
wip
bartelink cbd6562
Fix typo
bartelink 755d18f
Repurpose Program.fs as skeleton host
bartelink f0b6e2a
Add ES wiring
bartelink 941032f
Casing
bartelink 3a2dfdc
Cleanup
bartelink 1985df2
Fix changelog
bartelink c2adeb0
Cleanup more
bartelink ff265c5
Clean PM naming
bartelink 9d49d38
Extract ProcessManager as Stock*
bartelink e6d514f
Tidy
bartelink 0314a16
Cleanup from Cosmos
bartelink d3d6338
Whitespace
bartelink File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"$schema": "http://json.schemastore.org/template", | ||
"author": "@jet @bartelink", | ||
"classifications": [ | ||
"Equinox", | ||
"Event Sourcing", | ||
"Fc", | ||
"Propulsion" | ||
], | ||
"tags": { | ||
"language": "F#" | ||
}, | ||
"identity": "Equinox.Fc", | ||
"name": "Equinox Fc Example", | ||
"shortName": "eqxFc", | ||
"sourceName": "Fc", | ||
"preferNameDirectory": true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<TargetFramework>netcoreapp2.1</TargetFramework> | ||
<WarningLevel>5</WarningLevel> | ||
<IsPackable>false</IsPackable> | ||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<Compile Include="Fixtures.fs" /> | ||
<Compile Include="LocationSeriesTests.fs" /> | ||
<Compile Include="LocationEpochTests.fs" /> | ||
<Compile Include="LocationTests.fs" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" /> | ||
|
||
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00033" /> | ||
<PackageReference Include="Equinox.MemoryStore" Version="2.1.0" /> | ||
<PackageReference Include="FsCheck.xUnit" Version="2.14.2" /> | ||
<PackageReference Include="unquote" Version="5.0" /> | ||
<PackageReference Include="xunit" Version="2.4.1" /> | ||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\Domain\Domain.fsproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
[<AutoOpen>] | ||
module Fc.Domain.Tests.Fixtures | ||
|
||
open Serilog | ||
open System | ||
|
||
module EnvVar = | ||
|
||
let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj | ||
|
||
module EventStore = | ||
|
||
open Equinox.EventStore | ||
let connect () = | ||
match EnvVar.tryGet "EQUINOX_ES_HOST", EnvVar.tryGet "EQUINOX_ES_USERNAME", EnvVar.tryGet "EQUINOX_ES_PASSWORD" with | ||
| Some h, Some u, Some p -> | ||
let appName = "Domain.Tests" | ||
let discovery = Discovery.GossipDns h | ||
let connector = Connector(u, p, TimeSpan.FromSeconds 5., 5, Logger.SerilogNormal Serilog.Log.Logger) | ||
let connection = connector.Establish(appName, discovery, ConnectionStrategy.ClusterSingle NodePreference.Master) |> Async.RunSynchronously | ||
let context = Context(connection, BatchingPolicy(500)) | ||
let cache = Equinox.Cache (appName, 10) | ||
context, cache | ||
| h, u, p -> | ||
failwithf "Host, Username and Password EQUINOX_ES_* Environment variables are required (%b,%b,%b)" | ||
(Option.isSome h) (Option.isSome u) (Option.isSome p) | ||
|
||
module TestOutputLogger = | ||
|
||
/// Adapts the XUnit ITestOutputHelper to be a Serilog Sink | ||
type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) = | ||
let template = "{Timestamp:HH:mm:ss.fff zzz} [{Level:u3}] {Message} {Properties}{NewLine}{Exception}" | ||
let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter(template, null); | ||
let writeSerilogEvent logEvent = | ||
use writer = new System.IO.StringWriter() | ||
formatter.Format(logEvent, writer) | ||
let messageLine = string writer | ||
testOutput.WriteLine messageLine | ||
System.Diagnostics.Debug.Write messageLine | ||
interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent | ||
|
||
let create output = | ||
let logger = TestOutputAdapter output | ||
LoggerConfiguration().Destructure.FSharpTypes().WriteTo.Sink(logger).CreateLogger() | ||
|
||
(* Generic FsCheck helpers *) | ||
|
||
let (|Id|) (x : Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag | ||
let inline mkId () = Guid.NewGuid() |> (|Id|) | ||
let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|) | ||
let (|IdsAtLeastOne|) (Ids xs, Id x) = [| yield x; yield! xs |] | ||
let (|AtLeastOne|) (x, xs) = x::xs | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
module Fc.Domain.Tests.LocationEpochTests | ||
|
||
open FsCheck.Xunit | ||
open Swensen.Unquote | ||
|
||
open Fc.Domain.Location.Epoch | ||
|
||
let decide transactionId delta _balance = | ||
match delta with | ||
| 0 -> (), [] | ||
| delta when delta < 0 -> (), [Events.Removed {| delta = -delta; id = transactionId |}] | ||
| delta -> (), [Events.Added {| delta = delta; id = transactionId |}] | ||
|
||
let verifyDeltaEvent transactionId delta events = | ||
let dEvents = events |> List.filter (function Events.Added _ | Events.Removed _ -> true | _ -> false) | ||
test <@ decide transactionId delta (Unchecked.defaultof<_>) = ((), dEvents) @> | ||
|
||
let [<Property>] properties transactionId carriedForward delta1 closeImmediately delta2 close = | ||
|
||
(* Starting with an empty stream, we'll need to supply the balance carried forward, optionally we apply a delta and potentially close *) | ||
|
||
let initialShouldClose _state = closeImmediately | ||
let res, events = | ||
sync (Some carriedForward) (decide transactionId delta1) initialShouldClose Fold.initial | ||
let cfEvents events = events |> List.choose (function Events.CarriedForward e -> Some e | _ -> None) | ||
let closeEvents events = events |> List.filter (function Events.Closed -> true | _ -> false) | ||
let state1 = Fold.fold Fold.initial events | ||
let expectedBalance = carriedForward.initial + delta1 | ||
// Only expect closing if it was requested | ||
let expectImmediateClose = closeImmediately | ||
let (Fold.Current bal) = res.history | ||
test <@ Option.isSome res.result | ||
&& expectedBalance = bal @> | ||
test <@ carriedForward = List.head (cfEvents events) | ||
&& (not expectImmediateClose || 1 = Seq.length (closeEvents events)) @> | ||
verifyDeltaEvent transactionId delta1 events | ||
|
||
(* After initializing, validate we don't need to supply a carriedForward, and don't produce a CarriedForward event *) | ||
|
||
let shouldClose _state = close | ||
let { isOpen = isOpen; result = worked; history = (Fold.Current bal) }, events = | ||
sync None (decide transactionId delta2) shouldClose state1 | ||
let expectedBalance = if expectImmediateClose then expectedBalance else expectedBalance + delta2 | ||
test <@ [] = cfEvents events | ||
&& (expectImmediateClose || not close || 1 = Seq.length (closeEvents events)) @> | ||
test <@ (expectImmediateClose || close || isOpen) | ||
&& expectedBalance = bal @> | ||
if not expectImmediateClose then | ||
test <@ Option.isSome worked @> | ||
verifyDeltaEvent transactionId delta2 events | ||
|
||
let [<Property>] ``codec can roundtrip`` event = | ||
let ee = Events.codec.Encode(None, event) | ||
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) | ||
test <@ Some event = Events.codec.TryDecode ie @> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
module Fc.Domain.Tests.LocationSeriesTests | ||
|
||
open FsCheck.Xunit | ||
open FSharp.UMX | ||
open Swensen.Unquote | ||
|
||
open Fc.Domain.Location.Series | ||
|
||
let [<Property>] properties c1 c2 = | ||
let events = interpretAdvanceIngestionEpoch c1 Fold.initial | ||
let state1 = Fold.fold Fold.initial events | ||
let epoch0 = %0 | ||
match c1, events, state1 with | ||
// Started events are not written for < 0 | ||
| n, [], activeEpoch when n < epoch0 -> | ||
test <@ None = activeEpoch @> | ||
// Any >=0 value should trigger a Started event, initially | ||
| n, [Events.Started { epoch = ee }], Some activatedEpoch -> | ||
test <@ n >= epoch0 && n = ee && n = activatedEpoch @> | ||
// Nothing else should yield events | ||
| _, l, _ -> | ||
test <@ List.isEmpty l @> | ||
|
||
let events = interpretAdvanceIngestionEpoch c2 state1 | ||
let state2 = Fold.fold state1 events | ||
match state1, c2, events, state2 with | ||
// Started events are not written for < 0 | ||
| None, n, [], activeEpoch when n < epoch0 -> | ||
test <@ None = activeEpoch @> | ||
// Any >= 0 epochId should trigger a Started event if first command didnt do anything | ||
| None, n, [Events.Started { epoch = ee }], Some activatedEpoch -> | ||
let eEpoch = %ee | ||
test <@ n >= epoch0 && n = eEpoch && n = activatedEpoch @> | ||
// Any higher epochId should trigger a Started event (gaps are fine - we are only tying to reduce walks) | ||
| Some s1, n, [Events.Started { epoch = ee }], Some activatedEpoch -> | ||
let eEpoch = %ee | ||
test <@ n > s1 && n = eEpoch && n > epoch0 && n = activatedEpoch @> | ||
// Nothing else should yield events | ||
| _, _, l, _ -> | ||
test <@ List.isEmpty l @> | ||
|
||
let [<Property>] ``codec can roundtrip`` event = | ||
let ee = Events.codec.Encode(None, event) | ||
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) | ||
test <@ Some event = Events.codec.TryDecode ie @> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
module Fc.Domain.Tests.LocationTests | ||
|
||
open FsCheck.Xunit | ||
open FSharp.UMX | ||
open Swensen.Unquote | ||
open System | ||
|
||
open Fc.Domain | ||
open Fc.Domain.Location | ||
|
||
/// Helpers to match `module Cosmos/EventStore` wrapping inside the impl | ||
module Location = | ||
|
||
module MemoryStore = | ||
|
||
open Equinox.MemoryStore | ||
|
||
module Series = | ||
|
||
let resolve store = Resolver(store, Series.Events.codec, Series.Fold.fold, Series.Fold.initial).Resolve | ||
|
||
module Epoch = | ||
|
||
let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Fold.fold, Epoch.Fold.initial).Resolve | ||
|
||
let create (zeroBalance, toBalanceCarriedForward, shouldClose) store = | ||
let maxAttempts = Int32.MaxValue | ||
let series = Series.create (fun (id, _opt) -> Series.resolve store id) maxAttempts | ||
let epochs = Epoch.create (Epoch.resolve store) maxAttempts | ||
create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) | ||
|
||
let run (service : Service) (IdsAtLeastOne locations, deltas : _[], transactionId) = Async.RunSynchronously <| async { | ||
let runId = mkId () // Need to make making state in store unique when replaying or shrinking | ||
let locations = locations |> Array.map (fun x -> % (sprintf "%O/%O" x runId)) | ||
|
||
let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache | ||
|
||
(* Apply random deltas *) | ||
|
||
let adjust delta (state : Epoch.Fold.State) = | ||
let (Epoch.Fold.Balance bal) = state | ||
let value = max -bal delta | ||
if value = 0 then 0, [] | ||
elif value < 0 then value, [Epoch.Events.Removed {| delta = -value; id = transactionId |}] | ||
else value, [Epoch.Events.Added {| delta = value; id = transactionId |}] | ||
let! appliedDeltas = seq { for loc, x in updates -> async { let! eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel | ||
let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l, xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq | ||
|
||
(* Verify loading yields identical state *) | ||
|
||
let! balances = seq { for loc in locations -> async { let! bal = service.Execute(loc,(fun (Epoch.Fold.Balance bal) -> bal, [])) in return loc,bal } } |> Async.Parallel | ||
test <@ expectedBalances = Set.ofSeq balances @> } | ||
|
||
let [<Property>] ``MemoryStore properties`` epochLen args = | ||
let store = Equinox.MemoryStore.VolatileStore() | ||
|
||
let epochLen, idsWindow = max 1 epochLen, 5 | ||
let zero, cf, sc = Epoch.zeroBalance, Epoch.toBalanceCarriedForward idsWindow, Epoch.shouldClose epochLen | ||
|
||
let service = Location.MemoryStore.create (zero, cf, sc) store | ||
run service args | ||
|
||
type EventStore(testOutput) = | ||
|
||
let log = TestOutputLogger.create testOutput | ||
do Serilog.Log.Logger <- log | ||
|
||
let context, cache = EventStore.connect () | ||
|
||
let [<Property(MaxTest=5, MaxFail=1)>] properties epochLen args = | ||
let epochLen, idsWindow = max 1 epochLen, 5 | ||
let zero, cf, sc = Epoch.zeroBalance, Epoch.toBalanceCarriedForward idsWindow, Epoch.shouldClose epochLen | ||
|
||
let service = Location.EventStore.create (zero, cf, sc) (context, cache, 50) | ||
run service args |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<TargetFramework>netstandard2.0</TargetFramework> | ||
<WarningLevel>5</WarningLevel> | ||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<Compile Include="Infrastructure.fs" /> | ||
<Compile Include="LocationSeries.fs" /> | ||
<Compile Include="LocationEpoch.fs" /> | ||
<Compile Include="Location.fs" /> | ||
<Compile Include="InventoryEpoch.fs" /> | ||
<Compile Include="Inventory.fs" /> | ||
<Compile Include="StockTransaction.fs" /> | ||
<Compile Include="StockProcessManager.fs" /> | ||
</ItemGroup> | ||
<ItemGroup> | ||
<PackageReference Include="Equinox.EventStore" Version="2.1.0" /> | ||
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.1.0" /> | ||
</ItemGroup> | ||
|
||
</Project> |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this HTML anchor tag a side effect from an IDE or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, just so I can link to a section within the readme, i.e. github.com/jet/equinox#eqxfc - changelog.md uses it also to provide an alias thats better than default generated ones