# Event-Orientation Demo

Event-Orientation (EO) strukturiert die Verarbeitung von CQNS-Nachrichten in einer Pipeline auf der Basis von Event Sourcing. Das Ziel ist die Entkopplung von Nachrichtenverarbeitungen, die Vermeidung vorzeitiger Optimierungen und ein größerer Fokus auf das Wesentliche.

In [1]:
// Allgemeine CQS Message Hierarchie
public abstract class Message{}

public abstract class Request : Message{}
public abstract class Response : Message{}

public abstract class Command : Request{}
public abstract class CommandStatus : Response{}
public class Success : CommandStatus {}
public class Failure : CommandStatus { public string ErrorMessage; }

public abstract class Query : Request{}
public abstract class QueryResult : Response{}

Eine Nachricht (Message) ist vor allem Command und Query, die zu Command Status (Success und Failure) bzw. Query Result führen. Ihre Verarbeitung zieht den Zustand repräsentiert durch Events in einem Eventstrom heran. Bei der Verarbeitung von Commands entstehen daraus weitere Events, die in den Eventstrom fließen.

![](images/EO1.jpg)

Der für eine Nachrichtenverarbeitung relevante Zustand liegt im Eventstrom in Form einer Untermenge aller Events vor, dem sog. Kontext.

Der Kontext wird der Nachrichtenverarbeitung jedoch nicht roh angeboten, sondern in Form eines mehr oder weniger umfangreichen Modells, dem Nachrichtenmodell (Message Model).

Die Nachrichtenverarbeitung (Handling) findet daher im Rahmen einer Pipeline statt:

1. Schritt _Load_: Nachrichtenmodell für eine Nachricht beschaffen.
2. Schritt _Handle_: Nachricht und Nachrichtenmodell verarbeiten (Processing); in einem Command entstehen neue Events.
3. Schritt _Update_: Neue Events im Eventstrom aufzeichnen und damit alle Nachrichtenmodelle aktaualisieren.

![](images/EO2.jpg)

_Load_ und _Update_: stellen das Message Model Management dar.

Mit _Load_ ist die Beschaffung eines Nachrichtenmodells aus dem eigentlichen Message Processing herausgezogen. Mit _Update_ ist die Aktualisierung eines gechacheten Modells aus dem Processing herausgezogen.

## Event Store

Der Anwendungszustand wird im Event Store in Form von Events gespeichert.

Der Event Store ist eine einfache append-only Datenstruktur, deren Inhalt in der Reihenfolge der Aufzeichnung (also chronologisch) wieder abgespielt werden kann.

In [2]:
public abstract class Event {}

public interface IEventStore {
    void Record(params Event[] events);
    
    IEnumerable<Event> Replay();
    IEnumerable<Event> Replay(int fromIndex);
    
    int Count {get;}
    
    event Action<IEnumerable<Event>> OnRecorded;
}

In [3]:
class InMemEventStore : IEventStore {
    private List<Event> _events = new List<Event>();
    
    public InMemEventStore() {}
    public InMemEventStore(IEnumerable<Event> events) {
        foreach(var e in events)
            Record(e);
    }
    
    
    public void Record(params Event[] events) {
        _events.AddRange(events);
        OnRecorded(events);
    }
    
    public IEnumerable<Event> Replay() {
        return _events;
    }
    
    public IEnumerable<Event> Replay(int fromIndex) {
        return _events.Where((_,i) => i>=fromIndex);
    }
    
    public int Count => _events.Count;
    
    public event Action<IEnumerable<Event>> OnRecorded = _ => {};
}

In [4]:
// Beispiel für die Nutzung des Event Store
class MyEvent : Event {
    public string Value;
}

var es = new InMemEventStore();
es.OnRecorded += events => display($"{events.Count()} event(s) added");

es.Record(
    new MyEvent{Value="a"},
    new MyEvent{Value="b"},
    new MyEvent{Value="c"}
);

display("All events after the first:");
foreach(var e in es.Replay(1))
    display(e);

3 event(s) added

All events after the first:

Value
b


Value
c


## Beispiel: Bounded Queue

Eine Bounded Queue hat eine begrenzte Länge. Solange die nicht erreicht ist, können Elemente in sie eingestellt werden. Sobald sie jedoch "voll ist", schlägt eine weitere Befüllung fehl.

In [5]:
public interface IBoundedQueue<T> {
    bool TryEnqueue(T value);
    
    T Dequeue();
    
    int Capacity {get;}
    int Count {get;}
}

Der Zustand der Bounded Queue (ihre Einträge) soll mittels EO verwaltet werden. Er wird daher in einem Event Store gehalten, in dem drei Events aufgezeichnet werden:

In [6]:
class Enqueued<T> : Event {
    public T Value;
}

class Dequeued : Event {}

class CapacityChanged : Event {
    public int Capacity;
}

Das Interface `IBoundedQueue<T>` ist eine Fassade, die die EO-Nachrichtenverarbeitung verbirgt. Die zu verarbeitenden Nachrichten sind:

* Commands
  * Enqueue // fügt ein Element ans Ende der Queue an
    * erzeugt `Enqueued` Event
    * schlägt fehl, falls Kapazität schon erreicht
  * Dequeue // entfernt das vorderste Element in der Queue
    * erzeugt `Dequeued` Event
    * schlägt fehlt, falls Queue leer
  
* Queries
  * Head // liefert das vorderste Element in der Queue
  * Count // zählt die Elemente in der Queue
  * Capacity // max. Anzahl von Elementen in der Queue

In [7]:
// Bounded Queue Messages
class Enqueue<T> : Command { public T Value; }
class Dequeue : Command {}

class Head : Query{}
class HeadResult<T> : QueryResult { public T Value; }

class Count : Query{}
class CountResult : QueryResult { public int Count; }

class Capacity : Query{}
class CapacityResult : QueryResult { public int Capacity; }

Alle Nachrichten werden im Rahmen eines Kontextes verarbeitet, der zu Nachrichtenmodellen verdichtet vorgelegt wird.

### Enqueue Nachrichtenmodell


Damit ein Element erfolgreich angefügt werden kann, muss der Verarbeitung nur bekannt sein, wieviele Elemente schon in der Queue sind. Außerdem muss die Kapazität bekannt sein.

Kontext:
* `Enqueued<T>`
* `Dequeueed`
* `CapacityChanged`

In [8]:
class EnqueueModel {
    public int Count;
    public int Capacity;
}

### Dequeue Nachrichtenmodell

Für das Entfernen eines Elementes ist lediglich nötig zu wissen, ob es überhaupt ein Element in der Queue gibt. Ein boolscher Wert könnte hier ausreichen. Aber mit einer Anzahl kann die Nachrichtenverarbeitung auch arbeiten.

Kontext:
* `Enqueued<T>`
* `Dequeueed`

In [9]:
class DequeueModel {
    int Count;
}

### Head Nachrichtenmodell

Nur das aktuelle/vorderste Element der Queue ist im Modell nötig.

Kontext:
* `Enqueued<T>`
* `Dequeueed`

In [10]:
class HeadModel<T> {
    public T Head;
}

### Count Nachrichtenmodell

Die Anzahl der Elemente in der Queue.

Kontext:
* `Enqueued<T>`
* `Dequeueed`

In [11]:
class CountModel {
    public int Count;
}

### Capacity Nachrichtenmodell

Die max. Anzahl der Elemente in der Queue.

Kontext:
* `CapacityChanged`

In [12]:
class CapacityModel {
    public int Capacity;
}

### Optimierung der Nachrichtenmodelle

Es zeigt sich, dass sich die Modelldaten mehrerer Nachrichten überschneiden. Auch wenn dadurch Abhängigkeiten entstehen, also Nachrichtenverarbeitungen über Events hinaus gekoppelte werden, könnte es anzeigt sein, mehrere Modells zusammenzufassen, z.B. `EnqueueModel`, `DequeueModel`, `CountModel` und `CapacityModel` zu:

In [13]:
class StatsModel {
    public int Count;
    public int Capacity;
}

### Model Manager

Ein Message Model Manager besteht aus zwei Methoden: `Load` und `Update`.

Über `Load` wird das Message Model am Anfang der Pipeline beschafft. Und `Update` hält es im Lichte aktueller Events auf dem Stand. Achtung: Das ist eine Optimierung!

In [14]:
public interface IMessageModelManagement<TMessage,TMessageModel> where TMessage : Request {
    TMessageModel Load(TMessage message);
    void Update(IEnumerable<Event> events);
}

Konkrete Manager für die einzelnen Messages implementieren das Interface. Jeder ist damit konzentriert auf nur ein Model.

Der erste entscheidet sich dafür, das Modell zu cachen und mit neuen Events zu aktualisieren:

In [15]:
class StatsModelManager<T> : IMessageModelManagement<Request,StatsModel> {
    private StatsModel _model = new StatsModel();
    
    public StatsModel Load(Request _)
        => _model;
    
    public void Update(IEnumerable<Event> events)
        => events.ToList().ForEach(Apply);
    
    void Apply(Event e) {
        switch(e) {
            case Enqueued<T> _:
                _model.Count++;
                break;
            case Dequeued _:
                _model.Count--;
                break;
            case CapacityChanged cc:
                _model.Capacity = cc.Capacity;
                break;
        }
    }
}


var events = new Event[]{
    new CapacityChanged{Capacity=5},
    new Enqueued<int>{Value=1},
    new Enqueued<int>{Value=2},
    new Dequeued(),
    new Enqueued<int>{Value=3},
    new CapacityChanged{Capacity=3}
};

var sut = new StatsModelManager<int>();
sut.Update(events);
var model = sut.Load(null);

display($"count: {model.Count}, capacity: {model.Capacity}");

count: 2, capacity: 3

Der zweite hingegen baut das Modell bei jedem Load aus dem Kontext auf:

In [16]:
class HeadModelManager<T> : IMessageModelManagement<Head,HeadModel<T>> {
    private IEventStore _es;
    
    public HeadModelManager(IEventStore es) => _es = es;

    
    public HeadModel<T> Load(Head _) {
        var q = new Queue<T>();
        _es.Replay().ToList()
                    .ForEach(e => Apply(q, e));
        return new HeadModel<T>{ Head = q.Peek() };
    }
        
    void Apply(Queue<T> q, Event e) {
        switch(e) {
            case Enqueued<T> eq:
                q.Enqueue(eq.Value);
                break;
            case Dequeued _:
                q.Dequeue();
                break;
        }
    }
    
    
    public void Update(IEnumerable<Event> events) {}
}


var events = new Event[]{
    new CapacityChanged{Capacity=5},
    new Enqueued<int>{Value=1},
    new Enqueued<int>{Value=2},
    new Dequeued(),
    new Enqueued<int>{Value=3},
    new CapacityChanged{Capacity=3}
};

var es = new InMemEventStore(events);
var sut = new HeadModelManager<int>(es);
sut.Update(events);
var model = sut.Load(new Head());

display($"head: {model.Head}")

head: 2

### Message Processing

Das eigentliche Message Processing basiert auf dem Message Model und der Message. Die allgemeine Form ist wie folgt leicht unterschiedlich für Commands und Queries:

In [17]:
public interface ICommandProcessing<TCommand,TMessageModel> {
    (CommandStatus Status, IEnumerable<Event> Events) Execute(TCommand command, TMessageModel model);
}

public interface IQueryProcessing<TQuery,TMessageModel,TQueryResult> {
    TQueryResult Answer(TQuery query, TMessageModel model);
}

Für jede Message wird damit ein spezieller Processor aufgesetzt, der sich ganz auf eine Aufgabe konzentrieren kann:

In [18]:
class EnqueueProcessor<T> : ICommandProcessing<Enqueue<T>,StatsModel> {
    public (CommandStatus Status, IEnumerable<Event> Events) Execute(Enqueue<T> command, StatsModel model) {
        if (model.Count >= model.Capacity) return (new Failure{ErrorMessage="Queue full!"}, new Event[0]);
        return (new Success(), new Event[]{new Enqueued<T>{Value=command.Value}});
    }
}

In [19]:
class DequeueProcessor : ICommandProcessing<Dequeue,StatsModel> {
    public (CommandStatus Status, IEnumerable<Event> Events) Execute(Dequeue command, StatsModel model) {
        if (model.Count <= 0) return (new Failure{ErrorMessage="Queue empty!"}, new Event[0]);
        return (new Success(), new Event[]{new Dequeued()});
    }
}

In [20]:
class HeadProcessor<T> : IQueryProcessing<Head,HeadModel<T>,HeadResult<T>> {
    public HeadResult<T> Answer(Head query, HeadModel<T> model) {
        return new HeadResult<T>{Value=model.Head};
    }
}

In [21]:
class CountProcessor : IQueryProcessing<Count,StatsModel,CountResult> {
    public CountResult Answer(Count query, StatsModel model) {
        return new CountResult{Count=model.Count};
    }
}

In [22]:
class CapacityProcessor : IQueryProcessing<Capacity,StatsModel,CapacityResult> {
    public CapacityResult Answer(Capacity query, StatsModel model) {
        return new CapacityResult{Capacity=model.Capacity};
    }
}

### Message Pipelines

In einer Message Pipeline werden Message Processing und Message Model Management zusammengesteckt. Beispiel:

In [23]:
// Arrange
var statsModelManager = new StatsModelManager<int>();
var enqueueProcessor = new EnqueueProcessor<int>();
var countProcessor = new CountProcessor();

var events = new Event[]{
    new CapacityChanged{Capacity=5},
    new Enqueued<int>{Value=1},
    new Enqueued<int>{Value=2},
    new Dequeued(),
    new Enqueued<int>{Value=3},
    new CapacityChanged{Capacity=3}
};
statsModelManager.Update(events);

// Act
// Enqueue pipeline
var cmd = new Enqueue<int>{Value=42};
var model = statsModelManager.Load(cmd);
var cmdresult = enqueueProcessor.Execute(cmd,model);
statsModelManager.Update(cmdresult.Events);
display($"1. enqueue successful? {cmdresult.Status is Success}");

// Count pipeline
var qry = new Count();
model = statsModelManager.Load(qry);
var qryresult = countProcessor.Answer(qry,model);

// Enqueue pipeline
cmd = new Enqueue<int>{Value=99};
model = statsModelManager.Load(cmd);
cmdresult = enqueueProcessor.Execute(cmd,model);
statsModelManager.Update(cmdresult.Events);
display($"2. enqueue successful? {cmdresult.Status is Success}");

// Assert
display($"count after 1. enqueue: {qryresult.Count}");

1. enqueue successful? True

2. enqueue successful? False

count after 1. enqueue: 3

Message Pipelines werden im Message Handling zusammengesetzt für alle Messages.

In [24]:
class BoundedQueueMessageHandling<T> {
    private IEventStore _es;
    
    private StatsModelManager<T> _smm;
    private HeadModelManager<T> _hmm;
    
    private EnqueueProcessor<T> _ep;
    private DequeueProcessor _dp;
    private HeadProcessor<T> _hp;
    private CountProcessor _cop;
    private CapacityProcessor _cap;
    
    
    public BoundedQueueMessageHandling(int capacity) {
        // construction
        _es = new InMemEventStore();
        
        _smm = new StatsModelManager<T>();
        _hmm = new HeadModelManager<T>(_es);
        
        _ep = new EnqueueProcessor<T>();
        _dp = new DequeueProcessor();
        _hp = new HeadProcessor<T>();
        _cop = new CountProcessor();
        _cap = new CapacityProcessor();
        
        // initialization
        _es.Record(new CapacityChanged{Capacity=capacity});
        _smm.Update(_es.Replay());
        _hmm.Update(_es.Replay());
        
        // model subscription
        _es.OnRecorded += _smm.Update;
        _es.OnRecorded += _hmm.Update;
    }
    
    
    public CommandStatus Handle(Enqueue<T> command) {
        var model = _smm.Load(command);
        var result = _ep.Execute(command,model);
        _es.Record(result.Events.ToArray());
        return result.Status;
    }
    
    public CommandStatus Handle(Dequeue command) {
        var model = _smm.Load(command);
        var result = _dp.Execute(command,model);
        _es.Record(result.Events.ToArray());
        return result.Status;
    }
    
    
    public HeadResult<T> Handle(Head query) {
        var model = _hmm.Load(query);
        return _hp.Answer(query,model);
    }
    
    public CountResult Handle(Count query) {
        var model = _smm.Load(query);
        return _cop.Answer(query,model);
    }
    
    public CapacityResult Handle(Capacity query) {
        var model = _smm.Load(query);
        return _cap.Answer(query,model);
    }
}


var sut = new BoundedQueueMessageHandling<int>(3);

display($"capacity: {sut.Handle(new Capacity()).Capacity}");

sut.Handle(new Enqueue<int>{Value=1});
sut.Handle(new Enqueue<int>{Value=2});
sut.Handle(new Dequeue());
sut.Handle(new Enqueue<int>{Value=3});

display($"count: {sut.Handle(new Count()).Count}");

capacity: 3

count: 2

### Die Fassade

Und schließlich kann das Message Handling noch hinter der Interface-Fassade versteckt werden, so dass Nutzer einer Bounded Queue nichts bemerken von EO.

In [25]:
class BoundedQueue<T> {
    private BoundedQueueMessageHandling<T> _bqmh;
    
    public BoundedQueue(int capacity) {
        _bqmh = new BoundedQueueMessageHandling<T>(capacity);
    }
    
    public bool TryEnqueue(T value) {
        var response = _bqmh.Handle(new Enqueue<T>{Value=value});
        return response is Success;
    }
    
    public T Dequeue() {
        if (this.Count > 0) {
            var head = _bqmh.Handle(new Head()).Value;
            _bqmh.Handle(new Dequeue());
            return head;
        }
        else
            throw new InvalidOperationException("Queue empty!");
    }
    
    public int Capacity => _bqmh.Handle(new Capacity()).Capacity;
    public int Count => _bqmh.Handle(new Count()).Count;
}


var sut = new BoundedQueue<int>(3);

display(sut.TryEnqueue(1));
display(sut.TryEnqueue(2));
display(sut.TryEnqueue(3));
display(sut.TryEnqueue(4));

display(sut.Dequeue());
display(sut.Dequeue());
display($"count: {sut.Count}, capacity: {sut.Capacity}");

display(sut.TryEnqueue(5));
display($"count: {sut.Count}, capacity: {sut.Capacity}");

count: 1, capacity: 3

count: 2, capacity: 3

### Zusammenfassung

Natürlich bläht der Einsatz von EO die Lösung für das Szenario unnötig auf. Andererseits ist es aber so einfach, damit die Organisation des Codes durch EO klar hervorsticht. Sie wird nicht durch umfängliche Domänenlogik vernebelt.

Die Klasse `BoundedQueue<T>` ist in der Lösung nur eine einfach nutzbare Fassade um das EO Message Handling.

Das EO Message Handling in `BoundedQueueMessageHandling<T>` definiert für jede Message eine Pipeline, die zuerst das zugehörige Message Model beschafft, in dessen Lichte die Message verarbeitet und schließlich ggf. das Message Model aktualisiert.

Das Schichtenmodell definiert für Software eine Grundstruktur bestehend aus drei Aspekten: Darstellung (Presentation Layer), Persistenz (Data Access Layer) und Domäne (Business Logic Layer). Der Zweck ist die Trennung grundsätzlich verschiedener Verantwortlichkeiten im Sinne des Single Responsibility Principle (SRP) und die saubere Ausrichtung der funktionalen Abhängigkeiten zwischen diesen Abhängigkeiten.

Ebenso definiert EO eine Grundstruktur für Software. Für EO besteht Software nicht aus von einander abhängigen Schichten, sondern aus von einander _unabhängigen_ Prozessen. Diese Prozesse stellen das Verhalten her, das Anwender im Rahmen einer Interaktion (z.B. Auswahl eines Menüpunktes, Klick auf einen Button) von einer Software erwarten.

Die Prozesse werden getriggert durch Messages (Request), die ihnen Input liefern, und antworten mit Messages (Response), die ihren Output enthalten.

Gemeinsam ist den Prozessen lediglich ein Eventstrom zur Fortschreibung des Zustandes der Software.

Die Prozesse wiederum gliedert EO in drei Schritte. Sie verarbeiten Messages, indem sie sie durch eine kleine Pipeline fließen lassen. Im Kern steht dabei der Message Processor. Er verarbeitet die Message im Lichte eines Kontextes, den ein Message Model Manager zu einem Message Model aggregiert hat.

Zweck der Trennung ist eine weitere Fokussierung der Verantwortlichkeiten und ihre Entkopplung durch funktionale Unabhängigkeit im Rahmen des Pipeline-Datenflusses.

Bei der Entwicklung kann ich mich Schritt für Schritt voranbewegen:

1. Definition von Messages
2. Definition von Events
3. Definition der Message Kontexte
4. Definition der Message Models
5. Implementation der Message Model Managers
  * Implementation von `Update` (optional)
  * Implementation von `Load`
6. Implementation der Message Processors
7. Zusammenstecken der Message Handling Pipelines

Diese Schritte kann ich in mehreren Iterationen durchlaufen. Ich muss nicht am Anfang alle Messages und alle Events zuerst festlegen. Im Grunde kann ich mit einer Interaktion und ihren Messages beginnen, dazu die Events definieren und dann bis zur Pipeline durchgehen. Anschließend die nächste Interaktion usw. usw.

In der Weise arbeite ich mich "scheibchenweise" durch eine Anwendung: ein Durchstich nach dem anderen. Gemeinsam haben die Durchstiche zunächst nur die Events und den Eventstore.