Skip to content

Commit

Permalink
T
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 15, 2019
1 parent d4c12e0 commit 0baa0d9
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 2 deletions.
130 changes: 130 additions & 0 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,136 @@ type Service(log, resolveStream) =
stream.Read
```

# Equinox Handler API Usage Guide

The most terse walkthrough of what's involved in using Equinox to do a Synchronous Query and/or Execute a Decision process is in the [Programming Model section](#programming-model). In this section we’ll walk through how one implements common usage patterns using the Equinox Handler API in more detail (also using slightly less F# tricks!).

## ES, CQRS, Event-driven architectures etc

There are a plethora of [basics underlying Event Sourcing](https://eventstore.org/docs/event-sourcing-basics/index.html) and its cousin, the [CQRS architectural style](https://docs.microsoft.com/en-us/azure/architecture/guide/architecture-styles/cqrs). There are also myriad ways of arranging [event driven processing across a system](https://docs.microsoft.com/en-us/azure/architecture/guide/architecture-styles/event-driven).

The goal of [CQRS](https://martinfowler.com/bliki/CQRS.html) is to enable representation of the same data using multiple models. It’s [not a panacea, and most definitely not a top level architecture](https://www.youtube.com/watch?v=LDW0QWie21s&feature=youtu.be&t=448), but you should at least be [considering whether it’s relevant in your context](https://vladikk.com/2017/03/20/tackling-complexity-in-cqrs/). There are [various ways of applying the general CQRS concept as part of an architecture](https://docs.microsoft.com/en-us/azure/architecture/patterns/cqrs).

## Applying Equinox in an Event-sourced architecture

There are many tradeoffs to be considered along the journey from an initial proof of concept implementation to a working system and evolving that over time to a successful and maintainable model. There is no official magical combination of CQRS and ES that is always correct, so it’s important to look at the following information as a map of all the facilities available - it’s certainly not a checklist; not all achievements must be unlocked.

At a high level we have:
- _Aggregates_ - a set of information (Entities and Value Objects) within which Invariants need to be maintained, leading us us to logically group them in order to consider them when making a Decision
- _Events_ - Events that have been accepted into the model represent the basic Facts from which State or Projections can be derived
- _Commands_ - taking intent implied by an upstream request or other impetus (e.g., automated synchronization based on an upstream data source) driving a need to sync a system’s model to reflect that implied need (while upholding the Aggregate's Invariants). The Decision process is responsible proposing Events to be appended to the Stream representing the relevant events in the timeline of that Aggregate.
- _State_ - the State at any point is inferred by _folding_ the events in order; this state typically feeds into the Decision with the goal of ensuring Idempotent handling (if its a retry and/or the desired state already pertained, typically one would expect the decision to be "no-op, no Events to emit")
- _Projections_ - while most State we'll fold from the events will be dictated by what we need (in addition to a Command's arguments) to be able to make the Decision implied by the command, the same Events that are _necessary_ for Command Handling to be able to uphold the Invariants can also be used as the basis for various summaries at the single aggregate level, Rich Events exposed as notification feeds at the boundary of the Bounded Context, and/or as denormalized representations forming a [Materialized View](https://docs.microsoft.com/en-us/azure/architecture/patterns/materialized-view).
- Queries - as part of the processing, one might wish to expose the state before or after the Decision and/or a computation based on that to the caller as a result. In its simplest form (just reading the value and emitting it without any potential Decision/Command even applying), such a _Synchronous Query_ is a gross violation of CQRS - reads should ideally be served from a Read Model_

## Programming Model walkthrough

### Core elements

In the code handling a given Aggregate’s Commands and Synchronous Queries, the elements you write divide into:

- Events (`codec`, `encode`, `tryDecode`, etc.)
- State/Folds (`fold`, `evolve`)
- Storage Model helpers (`isOrigin`,`unfold` etc)

while these are not omnipresent, for the purposes of this discussion we’ll take them as givens. See the [Programming Model](#programming-model) for a drilldown into these elements and their roles.

### Contexts, Handlers and Services

Equinox’s Command Handling consists of <300 lines including comments in https://github.com/jet/equinox/tree/master/src/Equinox - the elements involved are:
- [`Target` DU](https://github.com/jet/equinox/blob/12e36643685ff4f1fb2d19a4b56b88065280eb2c/src/Equinox/Handler.fs#L73) - used to identify the Stream pertaining to the relevant Aggregate that `resolveStream` will use to hydrate a `Handler`
- [`type Handler`](https://github.com/jet/equinox/blob/12e36643685ff4f1fb2d19a4b56b88065280eb2c/src/Equinox/Handler.fs#L41) - surface API one uses to execute a `Flow` on a specific Stream
- [`type Context`](https://github.com/jet/equinox/blob/12e36643685ff4f1fb2d19a4b56b88065280eb2c/src/Equinox/Handler.fs#L7) - `type` representing stream State in an Application Service and/or Handler
- [`module Flow`](https://github.com/jet/equinox/blob/12e36643685ff4f1fb2d19a4b56b88065280eb2c/src/Equinox/Flow.fs#L31) - internal implementation of Optimistic Concurrency Control / retry loop used by Handler

Its recommended to read the examples in conjunction with perusing the code in order to see the relatively simple implementations that underlie the abstractions; the few hundred lines can tell many of the thousands of words about to follow!

### Favorites Walkthrough

In this example, we’ll use possibly the simplest toy example - an unbounded list of items a user has favorited (starred) in an e-commerce system.

See [samples/Tutorial/Favorites.fsx](samples/Tutorial/Favorites.fsx). It’s recommended to load this in Visual Studio and feed it into the F# Interactive REPL to observe it step by step. Here, we'll skip some steps and only discuss some aspects.

#### `Event`s + `initial`+`fold`

```fsharp
type Event =
| Added of string
| Removed of string
let initial : string list = []
let evolve state = function
| Added sku -> sku :: state
| Removed sku -> state |> List.filter (fun x -> x <> sku)
let fold s xs = Seq.fold evolve s xs
```

Events are represented as an F# Discriminated Union; see the [article on the `UnionContractEncoder`](https://eiriktsarpalis.wordpress.com/2018/10/30/a-contract-pattern-for-schemaless-datastores/) for information about how that's typically used to map to/from an Event Type/Case in an underlying Event storage system.

The `evolve` function is responsible for computing the State implied by taking a given State and incorporating the effect that _single_ Event implies in that context and then yielding that result _without mutating either input_.

While the `evolve` function operates on a `state` and a _single_ event, `fold` (named for the standard FP operation of that name) walks a chain of events, propagating the running state into each evolve invocation. It is the `fold` operation that's typically used a) in tests and b) when passing a function to an Equinox Handler to manage the behavior

In order to fulfill the _without mutating either input_ constraint, typically `fold` and `evolve` either clone to a new array with space for the new events, or use a [persistent data structure, such as F#'s `list`] [https://en.wikipedia.org/wiki/Persistent_data_structure,]. The reason this is necessary is that the result from `fold` can also be used for one of the following reasons:

- computing a 'proposed' state which never materializes due to a failure to save and/or an Optimistic Concurrency failure
- the store can sometimes take a `state` from the cache and be `fold`ing in differnent `events` when the coflicting events are supplied after having been loaded for the retry in the loop
- concurrent executions against the stream may be taking place in parallel within the same process; this is permitted, Equinox makes no attempt to constrain the behavior in such a case

#### `Command`s + `interpret`

```fsharp
type Command =
| Add of string
| Remove of string
let interpret command state =
match command with
| Add sku -> if state |> List.contains sku then [] else [Added sku]
| Remove sku -> if state |> List.contains sku |> not then [] else [Removed sku]
```

Command handling should almost invariably be implemented in an [Idempotent](https://en.wikipedia.org/wiki/Idempotence) fashion. While in some cases a blind append can be .

#### `Handler`

```fsharp
type Handler(log, stream, ?maxAttempts) =
let inner = Equinox.Handler(fold, log, stream, maxAttempts = defaultArg maxAttempts 2)
member __.Execute command : Async<unit> =
inner.Decide <| fun ctx ->
ctx.Execute (interpret command)
member __.Read : Async<string list> =
inner.Query id
```

#### `Service`

```fsharp
type Service(log, resolveStream) =
let streamHandlerFor (clientId: string) =
let aggregateId = Equinox.AggregateId("Favorites", clientId)
let stream = resolveStream aggregateId
Handler(log, stream)
member __.Favorite(clientId, sku) =
let stream = streamHandlerFor clientId
stream.Execute(Add sku)
member __.Unfavorite(clientId, skus) =
let stream = streamHandlerFor clientId
stream.Execute(Remove skus)
member __.List(clientId): Async<string list> =
let stream = streamHandlerFor clientId
stream.Read
```

#### ‘Just’ reading the State

As a general rule, one should be considering whether the best way to manage the read side of your system in a CQRS manner by reading from a Projection to a Read Model. That said, there are plenty cases where Unfolds and/or the load on your system together with Caching will mean the right thing to do in order to obtain information can indeed be to do an inline read of the Events on the Aggregate’s Stream to get that data. (While this does impact scaling, a benefit its that the read is guaranteed to reflect the instantaneous state of the stream, versus being eventually consistent if you’re obtaining the same information via an asynchronous Projection)

Being able to read the state is also a key step in being able to run a Decision that upholds the Invariants of the Aggregate, which is why we’ll cover it first.

# Equinox Architectural Overview

There are virtually unlimited ways to build an event-sourced model. It's critical that, for any set of components to be useful, that they are designed in a manner where one combines small elements to compose a whole, [versus trying to provide a hardwired end-to-end 'framework'](https://youtu.be/LDW0QWie21s?t=1928).
Expand Down
11 changes: 9 additions & 2 deletions Equinox.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27110.0
# Visual Studio Version 16
VisualStudioVersion = 16.0.28531.58
MinimumVisualStudioVersion = 10.0.40219.1
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox", "src\Equinox\Equinox.fsproj", "{54CD058F-5B0A-4564-B732-1F6301E120AC}"
EndProject
Expand Down Expand Up @@ -67,6 +67,8 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Codec",
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Projection.Tests", "tests\Equinox.Projection.Tests\Equinox.Projection.Tests.fsproj", "{047F782D-DC37-4599-8FA0-F9B4D4C09C7B}"
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Tutorial", "samples\Tutorial\Tutorial.fsproj", "{D82AAB2E-7264-421A-A893-63A37E5F08B6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -157,6 +159,10 @@ Global
{047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{047F782D-DC37-4599-8FA0-F9B4D4C09C7B}.Release|Any CPU.Build.0 = Release|Any CPU
{D82AAB2E-7264-421A-A893-63A37E5F08B6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D82AAB2E-7264-421A-A893-63A37E5F08B6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D82AAB2E-7264-421A-A893-63A37E5F08B6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D82AAB2E-7264-421A-A893-63A37E5F08B6}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -171,6 +177,7 @@ Global
{1B0D4568-96FD-4083-8520-CD537C0B2FF0} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9}
{EC2EC658-3D85-44F3-AD2F-52AFCAFF8871} = {8CDE1CC3-8619-44DE-8B4D-4102CE476C35}
{8CDE1CC3-8619-44DE-8B4D-4102CE476C35} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9}
{D82AAB2E-7264-421A-A893-63A37E5F08B6} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {177E1E7B-E275-4FC6-AE3C-2C651ECCF71E}
Expand Down
150 changes: 150 additions & 0 deletions samples/Tutorial/Favorites.fsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Compile Tutorial.fsproj by either a) right-clicking or b) typing
// dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter
#r "bin/Debug/netstandard2.0/Serilog.dll"
#r "bin/Debug/netstandard2.0/Serilog.Sinks.Console.dll"
#r "bin/Debug/netstandard2.0/Equinox.dll"
#r "bin/Debug/netstandard2.0/Equinox.MemoryStore.dll"

module List =
let contains x = List.exists ((=) x)

(*
* EVENTS
*)

(* Define the events that will be saved in the Stream *)

type Event =
| Added of string
| Removed of string

let initial : string list = []
let evolve state = function
| Added sku -> sku :: state
| Removed sku -> state |> List.filter (fun x -> x <> sku)
let fold s xs = Seq.fold evolve s xs

(* With the basic Events and `fold` defined, we have enough to build the state from the Events:- *)

let initialState = initial
//val initialState : string list = []

let favesCba = fold initialState [Added "a"; Added "b"; Added "c"]
//val favesCba : string list = ["c"; "b"; "a"]

(*
* COMMANDS
*)

(* Now we can build a State from the Events, we can interpret a Command in terms of how we'd represent that in the stream *)

type Command =
| Add of string
| Remove of string
let interpret command state =
match command with
| Add sku -> if state |> List.contains sku then [] else [Added sku]
| Remove sku -> if state |> List.contains sku |> not then [] else [Removed sku]

(* Note we don't yield events if they won't have a relevant effect - the interpret function makes the processing idempotent
if a retry of a command happens, it should not make a difference *)

let removeBEffect = interpret (Remove "b") favesCba
//val removeBEffect : Event list = [Removed "b"]

let favesCa = fold favesCba removeBEffect
// val favesCa : string list = ["c"; "a"]

let _removeBAgainEffect = interpret (Remove "b") favesCa
//val _removeBAgainEffect : Event list = []

(*
* HANDLER API
*)

(* Equinox.Handler provides low level functions against a Stream given
a) the fold function so it can maintain the state as we did above
b) a log to send metrics and store roundtrip info to
c) a maximum number of attempts to make if we clash with a conflicting write *)

type Handler(log, stream, ?maxAttempts) =
let inner = Equinox.Handler(fold, log, stream, maxAttempts = defaultArg maxAttempts 2)
member __.Execute command : Async<unit> =
inner.Decide <| fun ctx ->
ctx.Execute (interpret command)
member __.Read : Async<string list> =
inner.Query id

(* When we Execute a command, Equinox.Handler will use `fold` and `interpret` to Decide whether Events need to be written
Normally, we'll let failures percolate via exceptions, but not return a result (i.e. we don't say "your command caused 1 event") *)

// For now, write logs to the Console (in practice we'd connect it to a concrete log sink)
open Serilog
let log = LoggerConfiguration().WriteTo.Console(Serilog.Events.LogEventLevel.Debug).CreateLogger()

// related streams are termed a Category; Each client will have it's own Stream.
let categoryName = "Favorites"
let clientAFavoritesStreamId = Equinox.AggregateId(categoryName,"ClientA")

// For test purposes, we use the in-memory store
let store = Equinox.MemoryStore.VolatileStore()

// Each store has a Resolver which provides an IStream instance which binds to a specific stream in a specific store
// ... because the nature of the contract with the handler is such that the store hands over State, we also pass the `initial` and `fold` as we used above
let stream streamName = Equinox.MemoryStore.MemResolver(store, fold, initial).Resolve(streamName)

// We hand the streamId to the resolver
let clientAStream = stream clientAFavoritesStreamId
// ... and pass the stream to the Handler
let handler = Handler(log, clientAStream)

(* Run some commands *)

handler.Execute (Add "a") |> Async.RunSynchronously
handler.Execute (Add "b") |> Async.RunSynchronously
// Idempotency comes into play if we run it twice:
handler.Execute (Add "b") |> Async.RunSynchronously

(* Read the current state *)

handler.Read |> Async.RunSynchronously
// val it : string list = ["b"; "a"]

(*
* SERVICES
*)

(* Building a service to package Command Handling and related functions
No, this is not doing CQRS! *)

type Service(log, resolveStream) =
let streamHandlerFor (clientId: string) =
let aggregateId = Equinox.AggregateId("Favorites", clientId)
let stream = resolveStream aggregateId
Handler(log, stream)

member __.Favorite(clientId, sku) =
let stream = streamHandlerFor clientId
stream.Execute(Add sku)

member __.Unfavorite(clientId, skus) =
let stream = streamHandlerFor clientId
stream.Execute(Remove skus)

member __.List(clientId): Async<string list> =
let stream = streamHandlerFor clientId
stream.Read

let resolveStream = Equinox.MemoryStore.MemResolver(store, fold, initial).Resolve

let service = Service(log, resolveStream)

let client = "ClientB"
service.Favorite(client, "a") |> Async.RunSynchronously
service.Favorite(client, "b") |> Async.RunSynchronously
service.List(client) |> Async.RunSynchronously
// val it : string list = ["b"; "a"]

service.Unfavorite(client, "b") |> Async.RunSynchronously
service.List(client) |> Async.RunSynchronously
//val it : string list = ["a"]
21 changes: 21 additions & 0 deletions samples/Tutorial/Tutorial.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>

<ItemGroup>
<None Include="Favorites.fsx" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Equinox.MemoryStore\Equinox.MemoryStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox\Equinox.fsproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
</ItemGroup>

</Project>

0 comments on commit 0baa0d9

Please sign in to comment.