# Architektura CQRS



W klasycznych aplikacjach monolitycznych największym problemem jest skalowalność. Początkowo można dodać więcej pamięci *RAM* lub szybszy procesor *CPU*, jednak do czasu osiągniecia granicy opłacalności lub wyczerpania możliwości technicznych (skalowanie wzdłuż). Istnieje wiele koncepcji, które pozwalają rozwiązać problem skalowalności. Można przepisać aplikację na architekturę microserwisową. Polega ona na dekompozycji monolitycznej aplikacji na bardzo małe serwisy, które spełniają pojedynczą funkcję biznesową. Przykładowo jednym serwisem może być obsługa stanów magazynowych. Przy bardzo dużej liczbie zapytań do konkretnego serwisu można go skalować (przy zastosowaniu *load balancera*, który będzie kierował ruch do serwisu z najmniejszą liczbą żądań). Takie podejście daje dużą swobodę co do tego, które serwisy mogę ze sobą wymieniać dane, jak i ich liczby. Przykładowo w przeciętnej firmie liczba żądań dla usługi *stany magazynowe*, będzie większa, niż *raportowanie*. Dodatkowo łatwo wraz z mikroserwisami przekroczyć granicę jednej maszyny. Jedna z najpopularniejszych platform do obsługi mikroserwisów *Kubernetes* umożliwia skalowanie do bardzo dużej liczby *workerów*, na którym fizycznie pracują mikroserwisy. Aktualizacja jednego serwisu jest bardzo prosta, wystarczy podmienić jeden serwis bez konieczności wyłączenia całego systemu. Jednak same microserwisy gwarantują skalowalność do poziomu szybkości bazy danych. Architektura *CQRS* powstała z myślą o przesunięciu tej granicy. 

CQRS to akronim od *Command and Query Responsibility Segregation*. Głównym założeniem jest podzielenie operacji zmiany danych od ich odczytywania. W transakcyjnych bazach danych, każda aktualizacja wiersza powoduje zablokowanie  go, co znacznie spowalnia operacje odczytu, która wykonywana jest w tym samym czasie co aktualizacja. Często również stosuje się podejście, w którym każda instancja mikroserwisu ma swoja własną bazę danych. Ten właśnie problem rozwiązuje wzorzec architektoniczny *CQRS*. Sposób jego działania wraz z innymi wzorcami projektowymi prezentuje poniższy rysunek.

<center>


Jako `Front-end` należy rozumieć aplikację webową, desktopową, ale może to być również inny serwis. W zależności od operacji może zostać wybrana jedna z dwóch opcji, wykonanie `Command` modyfikującej dane lub wykonanie `Query`. Wzorzec *CQRS* nie narzuca technologii wykonania żądania. Można użyć żądania *HTTP* / *GRPC* lub innej.
1. W momencie wykonania `Command` uruchamiana jest walidacja. W zależności od jej wyniku, żądanie zwraca kod błędu lub jest procesowane dalej, poprzez wysłania zdarzenia do kolejki zdarzeń. W zależności od jego typu następuje odebranie zdarzenia przez `Event handler` i aktualizacja rekordu w bazie danych. Brak bezpośredniego połączenia między `Event handler`, a `Validation` jest spowodowane tym, że `Order DB` może być w wielu instancjach (jedna instancja mikroserwisu to jedna instancja bazy danych). Często spotykaną praktyką jest zastosowanie *Event Sourcing* omawianego w poprzednim laboratorium. Komponent ten umieszczany jest zazwyczaj za walidacją, która tworzy zdarzenie i dodaje go do kolejki oraz tworzy wpis w logu *Event souring*. 
2. Wykonanie `Query` powoduje odczyt z bazy danych odpowiednich danych związanych z zamówieniem.

Co więcej zarówno `Query` jak i `Command` mogą zawierać inne modele, co znacznie upraszcza zarządzaniem kodem. Przykładowo wiersz w bazie danych może zawierać różne kolumny jak data ostatniej modyfikacji, które z punktu widzenia biznesowego mogą nie mieć znaczenia. Kolejny przykład prezentuje przykładową implementację.

W tym miejscu należy zaznaczyć, że przydział instancji bazy danych dla konkretnego mikroserwisu jest implementacją wzorca *baza danych per mikroserwis* i w ogólnym przypadku nie jest wymagany.

Niekiedy w diagramie opisującym *CQRS* można znaleźć dodatkową bazę danych operacji `Command`. Stanowi to punkt zbierania danych i aktualizacja zbiorcza, co jakiś czas, co ma wydłużać czas bez blokady bazy danych. Poniższy kod zawiera przykładową implementację wzorca.

In [1]:
public class Order
{
    public Guid OrderId { get; init; }
    public OrderStatus Status { get; set; } = OrderStatus.Created;
    public DateTime LastStatusChanged { get; set; } = DateTime.UtcNow;
    public override string ToString() => $"{OrderId}: {Status} ({LastStatusChanged})";
}

public enum OrderStatus { Created, Confirmed, Shipped, ReadyForDelivery, Cancelled, Refunded }

W powyższym kodzie, klasa `Order` została zdefiniowana wraz z podstawowymi polami. Identyfikatorem zamówienia jest `OrderId` jako typ *GUID* jako typ *GUID* postaci `7c9e6679-7433-42fa-944b-e07fc1f90ae7`. Jest to globalny identyfikator często używany w praktyce ze względu na to, że może być utworzony bezpośrednio w kodzie bez konieczności sprawdzenia czy już istnieje w systemie (liczba to ciąg losowy, a liczba kombinacji wynosi $16^{32}$).  Klasa `OrderStatus` odpowiada za bieżący status zamówienia.

Kolejny kod zawiera sposób implementacji `Handlers`. Jako kolejki nie zostanie użyty *RabbitMQ*, który został omówiony w poprzednim laboratorium, a jedynie zwykłe zdarzenia w języku C#, dla przejrzystości przykładów. Poniżej znajduje się implementacja `EventBrokera` działającego w oparciu o zdarzenia w języku C#.

In [2]:
public interface Event {}

public abstract class Query: Event    
{
    public object Result;        
}

public abstract class Command: Event
{

}

public class EventBroker
{
    public IList<Event> AllEvents { get; private set; } = new List<Event>();
    
    public event EventHandler<Query> OnNewQuery;
    
    public event EventHandler<Command> OnNewCommand;    
    
    public T Query<T>(Query query) where T: class {        
        OnNewQuery?.Invoke(this, query);        
        return (T)query.Result;
    }
    
    public void Command(Command command) {
    
        OnNewCommand?.Invoke(this, command);
        
        AllEvents.Add(command);
    }
}





Dwie pierwsze klasy definiują abstrakcje dla zdarzeń wraz z ich podziałem na `Query` i `Command`. Pierwsza `Query` jako klasa bazowa dla `Query` oraz klasa bazowa `Command`. Interfejs `Event` oznacza klasę bazową dla zdarzeń w systemie. Otrzymanie wyniku z *query* następuje po wywołaniu funkcji `Query` typu generycznego. Wyzwala zdarzenie a następnie zwraca rzutowaną wartość `Query.Result` na odpowiedni typ podany w argumencie funkcji generycznej. Implementacja komend jest łatwiejsza ze względu na to, że komendy zgodnie z definicją mają wykonywać operacje bez wyniku (podejście *fire and forget*). 

Następnym etapem jest zdefiniowane repozytorium, które przechowuje dane. Poniższa implementacja wykorzystuje technikę *repository pattern*, która umożliwia w łatwy sposób podmianę ficznej reprezentacji danych.

In [3]:
public interface IOrderQueryRepository
{
    Order GetOrder(Guid orderId);
}

public interface IOrderCommandRepository
{
    void UpdateStatus(Guid orderId, OrderStatus newOrderStatus);
}

public class OrderRepository: IOrderQueryRepository, IOrderCommandRepository
{
    private IList<Order> Orders = new List<Order>();
    
    public OrderRepository()
    {
        Orders.Add(new Order() { 
            OrderId = Guid.Parse("00000000-0000-0000-0000-000000000001"),
            Status = OrderStatus.Created
        });
    }
    
    public Order GetOrder(Guid orderId) => 
            Orders
                .Where(x => x.OrderId == orderId)                
                .FirstOrDefault();
    
    public void UpdateStatus(Guid orderId, OrderStatus newOrderStatus) {
       var order = Orders.Where(x=> x.OrderId == orderId).FirstOrDefault();
       
       if (order != null) {
           order.Status = newOrderStatus;
           order.LastStatusChanged = DateTime.UtcNow;
        }
    }
}

 W konstrukturze klasy `OrderRepository` została dodana encja, która w kolejnych listingach będzie przeglądana i modyfikowana.
 Kolejny kod zawiera przykładową implementację klasy dziedziczącej po klasie bazowej `Query` i `OrderQueryHandler`.

In [4]:
public class OrderQuery: Query
{
    public Guid OrderId { get; init; }    
}

public class OrderQueryHandler
{    
    protected IOrderQueryRepository RepDB { get; private set; }
    
    public OrderQueryHandler(IOrderQueryRepository repository, EventBroker broker) {
        this.RepDB = repository;
        broker.OnNewQuery += QueryHandler;
    }
    
    public void QueryHandler(object sender, Query query) {
        switch (query)
        {
            case OrderQuery orderQuery:                
                query.Result = RepDB.GetOrder(orderQuery.OrderId);
                break;
        }
    }
}

Klasa `OrderQuery` implementuje obiekt `Query` i stanowi interface pośredniczący między kolejką, a klasą `OrderQueryHandler`, która obsługuje ten typ komunikatu. Kolejny kod przedstawia implementację dwóch komend oraz dwóch klas do ich obsługi.

In [5]:
public class OrderStatusChangedCommand: Command
{
    public Guid OrderId { get; init; }
    public OrderStatus OldStatus { get; init; }
    public OrderStatus NewStatus { get; init; }
}

public class OrderStatusUpdateCommand: Command
{
    public Guid OrderId { get; init; }
    public OrderStatus OldStatus { get; init; }
    public OrderStatus NewStatus { get; init; }
}

public class OrderCommandHandler
{    
    protected IOrderCommandRepository RepDB { get; private set; }
    
    protected EventBroker Broker { get; private set; }
    
    public OrderCommandHandler(IOrderCommandRepository repository, EventBroker broker) 
    {
        this.RepDB = repository;
        this.Broker = broker;    
    }
    public void CommandHandler(Command command) {
        switch (command)
        {
            case OrderStatusChangedCommand orderCommand when command is OrderStatusChangedCommand:
                //here validation happens
                Broker.Command(new OrderStatusUpdateCommand() {
                    OrderId = orderCommand.OrderId,
                    OldStatus = orderCommand.OldStatus,
                    NewStatus = orderCommand.NewStatus
                });
                
                break;
        }
    }
}

public class OrderEventHandler
{    
    protected IOrderCommandRepository RepDB { get; private set; }
    
    public OrderEventHandler(IOrderCommandRepository repository, 
        EventBroker broker) 
    {
        this.RepDB = repository;
        broker.OnNewCommand += EventHandler;
    }
    
    public void EventHandler(object sender, Command command) {
        switch (command)
        {
            case OrderStatusUpdateCommand orderCommand:                
                RepDB.UpdateStatus(orderCommand.OrderId, orderCommand.NewStatus);
                break;
        }
    }
}

W implementacji zostały użyte dwie klasy obsługujące komendy dla bazy danych `Order`. Pierwsza `OrderCommandHandler` nie obsługuje kolejki ze względu na to, że ma za zadanie walidować dane przed ich wysłaniem do kolejki (`CommandHandler`) i zwrócenie danych w sposób deterministyczny. W przypadku walidacji zakończonej sukcesem funkcja `CommandHandle` dodaje do kolejki komendę `OrderStatusUpdateCommand`. Klasa `OrderEventHandler` ma za zadanie obsługiwać wszystkie zdarzenia związane z wykonywaniem fizycznej aktualizacji danych w bazie danych. W rzeczywistej aplikacji może się zdarzyć, że będzie więcej, niż jedna instancja bazy danych. W takim przypadku zdarzenie powinno być rozesłane do wszystkich klas obsługujących to zdarzenie (w *RabbitMQ* `Fanout`).

Klasa `OrderStatusChangedCommand` zawiera wymagane pole `OrderId` będący identyfikatorem zamówienia oraz dwa pola, pierwotny status oraz nowy na jaki został zmieniony. Oba pole są wymagane w przypadku implementacji `Event Sourcing`, co jest przydatne przy cofaniu komend. Co warto zauważyć `OrderStatusChangedCommand` oraz `OrderQuery` mogą przyjść zupełnie z innych części systemu, np. od kolejno od klienta oraz z widoku dla pracowników magazynu. Przykład użycia poniższych klas przedstawia poniższy kod.

In [6]:
using System.Threading;

var broker = new EventBroker();
var repDB = new OrderRepository();

var queryHandler = new OrderQueryHandler(repDB, broker);
var orderEventHandler = new OrderEventHandler(repDB, broker);
var commandHandler = new OrderCommandHandler(repDB, broker);

var orderId = Guid.Parse("00000000-0000-0000-0000-000000000001");

var queryCommand = new OrderQuery() { OrderId =  orderId }; 
Console.WriteLine(broker.Query<Order>(queryCommand));

Thread.Sleep(1000);

var statusChangedCommand = new OrderStatusChangedCommand() {
    OrderId = orderId,
    OldStatus = OrderStatus.Created,
    NewStatus = OrderStatus.Confirmed
};

commandHandler.CommandHandler(statusChangedCommand);

Console.WriteLine(broker.Query<Order>(queryCommand));

00000000-0000-0000-0000-000000000001: Created (12.05.2021 14:08:32)
00000000-0000-0000-0000-000000000001: Confirmed (12.05.2021 14:08:33)


Dla przejrzystości nie zostały użyte dwie biblioteki i serwisy, które mogłyby znacznie uprościć kod. Kontener `AutoFac` (*Inversion of control*), który ułatwiłby tworzenie zdarzeń, *RabbitMQ* co uprościłoby routowanie zdarzeń w funkcja obsługi oraz `AutoMapper` do kopiowania danych między klasami.

## Zadanie do wykonania

1. Dodaj nowe pole `version` (**`int`**) do klasy `OrderStatusUpdateCommand`. Dodaj klasę `OrderUndoCommand`, która cofa wszystkie modyfikacje aż do zadanej w parametrze wersji.

2. Zaimplementuj repozytorium do obsługi *SQLite* poznanego w poprzednich laboratoriach.