# Chapter 16 - Mediator and CQS Patterns

## Vertical Slice Architecture

- Each layer creates a specific feature
- Goal is to encapsulate featyres together
- CQS (Commands and Queries) and Mediator patterns work well for this purpose



## The Mediator Pattern

- Behavioral pattern that controlls how objects interact with one another
    * Breaks tight coupling between components
- Object ("colleagues") use mediators (middle man) to communicate

In [1]:
classDiagram
    direction LR
    class Consumer
    class IMediator {
        <<Interface>>
        + Send(Message) void
    }
    class IColleague {
        <<Interface>>
        + Receive(Message) void
    }
    Consumer-->IMediator
    IMediator-->IColleague

### Mediator Namespace

In [2]:
public interface IMediator
{
    void Send(Message message);
}

public interface IColleague
{
    string Name { get; }
    void ReceiveMessage(Message message);
}

public record class Message(IColleague Sender, string Content);

In [3]:
public class ConcreteMediator : IMediator
{
    private readonly List<IColleague> _colleagues;
    public ConcreteMediator(params IColleague[] colleagues)
    {
        ArgumentNullException.ThrowIfNull(colleagues);
        _colleagues = new List<IColleague>(colleagues);
    }

    public void Send(Message message)
    {
        foreach (var colleague in _colleagues)
        {
            colleague.ReceiveMessage(message);
        }
    }
}

public class ConcreteColleague : IColleague
{
    private readonly IMessageWriter<Message> _messageWriter;
    public ConcreteColleague(string name, IMessageWriter<Message> messageWriter)
    {
        Name = name ?? throw new ArgumentNullException(nameof(name));
        _messageWriter = messageWriter ?? throw new ArgumentNullException(nameof(messageWriter));
    }

    public string Name { get; }

    public void ReceiveMessage(Message message)
    {
        _messageWriter.Write(message);
    }
}

public interface IMessageWriter<Tmessage>
{
    void Write(Tmessage message);
} 


In [4]:
public class MessageWriter : IMessageWriter<Message>
{
    public StringBuilder Output { get; } = new StringBuilder();

    public void Write(Message message)
    {
        Output.AppendLine($"[{message.Sender.Name}]: {message.Content}");
    }
}

var messageWriter1 = new MessageWriter();
var colleague1 = new ConcreteColleague("Colleague 1", messageWriter1);
var messageWriter2 = new MessageWriter();
var colleague2 = new ConcreteColleague("Colleague 2", messageWriter2);

var mediator = new ConcreteMediator(colleague1, colleague2);

mediator.Send(new Message(colleague1, "Hello from Colleague 1!"));
mediator.Send(new Message(colleague2, "Hello from Colleague 2!"));
messageWriter1.Output.ToString().Display();
messageWriter2.Output.ToString().Display();

[Colleague 1]: Hello from Colleague 1!
[Colleague 2]: Hello from Colleague 2!


[Colleague 1]: Hello from Colleague 1!
[Colleague 2]: Hello from Colleague 2!


See [C16\Mediator\test\Mediator.Tests\ChatRoomTest.cs](..\C16\Mediator\test\Mediator.Tests\ChatRoomTest.cs) for more realistic example.

## The CQS Pattern

- Command-Query Seperation
- Commands change the state of an object or system
- Queries return values without changing the state
- CQRS
    * Extension of CQS - Discussed more in microservices
    * Often paired with event-driven architectures



### Simple Example: CQS Pattern in C#

Below is a minimal example demonstrating the Command-Query Separation (CQS) pattern.

- **Command**: Changes state (e.g., adds an item).
- **Query**: Returns data without changing state.

In [5]:
public class ItemRepository
{
    private readonly List<string> _items = new();

    // Command: changes state
    public void AddItem(string item)
    {
        _items.Add(item);
    }

    // Query: returns data, does not change state
    public IReadOnlyList<string> GetItems() => _items.AsReadOnly();
}

// Usage example:
var repo = new ItemRepository();
repo.AddItem("Apple"); // Command
repo.AddItem("Banana"); // Command
var items = repo.GetItems(); // Query
string.Join(", ", items).Display(); // Output: Apple, Banana

Apple, Banana

### Combining CQS with the Mediator Pattern

The Mediator pattern can be used to decouple the sender of a command or query from its handler. In a typical CQS+Mediator setup:
- **Commands** and **Queries** are represented as separate request objects.
- A **Mediator** receives these requests and dispatches them to the appropriate handler.
- This approach is common in modern .NET applications using libraries like MediatR.

Below is a simple example combining both patterns.

In [6]:
// Command
public record AddItemCommand(string Item);

// Query
public record GetItemsQuery();

// Handler interfaces
public interface ICommandHandler<TCommand>
{
    void Handle(TCommand command);
}

public interface IQueryHandler<TQuery, TResult>
{
    TResult Handle(TQuery query);
}

// Repository (same as before)
public class ItemRepository2
{
    private readonly List<string> _items = new();
    public void Add(string item) => _items.Add(item);
    public IReadOnlyList<string> GetAll() => _items.AsReadOnly();
}

// Command Handler
public class AddItemHandler : ICommandHandler<AddItemCommand>
{
    private readonly ItemRepository2 _repo;
    public AddItemHandler(ItemRepository2 repo) => _repo = repo;
    public void Handle(AddItemCommand command) => _repo.Add(command.Item);
}

// Query Handler
public class GetItemsHandler : IQueryHandler<GetItemsQuery, IReadOnlyList<string>>
{
    private readonly ItemRepository2 _repo;
    public GetItemsHandler(ItemRepository2 repo) => _repo = repo;
    public IReadOnlyList<string> Handle(GetItemsQuery query) => _repo.GetAll();
}

// Simple Mediator
public class SimpleMediator
{
    private readonly AddItemHandler _addHandler;
    private readonly GetItemsHandler _getHandler;
    public SimpleMediator(AddItemHandler addHandler, GetItemsHandler getHandler)
    {
        _addHandler = addHandler;
        _getHandler = getHandler;
    }
    public void Send(AddItemCommand command) => _addHandler.Handle(command);
    public IReadOnlyList<string> Send(GetItemsQuery query) => _getHandler.Handle(query);
}

// Usage:
var repo2 = new ItemRepository2();
var mediator2 = new SimpleMediator(new AddItemHandler(repo2), new GetItemsHandler(repo2));
mediator2.Send(new AddItemCommand("Orange"));
mediator2.Send(new AddItemCommand("Pear"));
var result = mediator2.Send(new GetItemsQuery());
string.Join(", ", result).Display(); // Output: Orange, Pear

Orange, Pear

See [C16/CQS/src/CQS/](../C16/CQS/src/CQS/) for more realistic example.

## Using MediatR

https://github.com/LuckyPennySoftware/MediatR

```
dotnet add package MediatR
```

Supports in-process communication through messaging. It suuports request/response flow though command, queries, notifications, and events

### Packages / Dependencies

In [1]:
#r "nuget: Microsoft.EntityFrameworkCore, 9.0.2"
#r "nuget: Microsoft.EntityFrameworkCore.InMemory, 9.0.2"

In [2]:
#r "nuget: Microsoft.Extensions.DependencyInjection, 9.0.2"
#r "nuget: Microsoft.Extensions.DependencyInjection.Abstractions, 9.0.2"

In [3]:
#r "nuget: MediatR"

### Core

In [4]:
using System.Threading;

public class NegativeValueException : Exception
{
    public NegativeValueException(int amountToAddOrRevove)
        : base($"The amount to add or remove can't be negative. Provided: {amountToAddOrRevove}.")
    {

    }
}

public class NotEnoughStockException : Exception
{
    public NotEnoughStockException(int quantityInStock, int amountToRemove)
        : base($"You cannot remove {amountToRemove} item(s) when there is only {quantityInStock} item(s) left.")
    {
        QuantityInStock = quantityInStock;
        AmountToRemove = amountToRemove;
    }

    public int QuantityInStock { get; }
    public int AmountToRemove { get; }
}

public class ProductNotFoundException : Exception
{
    public ProductNotFoundException(int productId)
        : base($"The product '{productId}' was not found.")
    {
        ProductId = productId;
    }

    public int ProductId { get; }
}



### Core.Entities

In [5]:


// Core.Entities namespace
public class Product
{
    public Product(string name, int quantityInStock, int? id = null)
    {
        Name = name ?? throw new ArgumentNullException(nameof(name));
        QuantityInStock = quantityInStock;
        Id = id;
    }

    public int? Id { get; init; }
    public string Name { get; init; }
    public int QuantityInStock { get; private set; }

    public void AddStock(int amount)
    {
        if (amount == 0) { return; }
        if (amount < 0) { throw new NegativeValueException(amount); }
        QuantityInStock += amount;
    }

    public void RemoveStock(int amount)
    {
        if (amount == 0) { return; }
        if (amount < 0) { throw new NegativeValueException(amount); }
        if (amount > QuantityInStock) { throw new NotEnoughStockException(QuantityInStock, amount); }
        QuantityInStock -= amount;
    }
}


### Core.Repositories

In [6]:

// using Core.Entities

public interface IProductRepository
{
    Task<IEnumerable<Product>> AllAsync(CancellationToken cancellationToken);
    Task<Product?> FindByIdAsync(int productId, CancellationToken cancellationToken);
    Task CreateAsync(Product product, CancellationToken cancellationToken);
    Task UpdateAsync(Product product, CancellationToken cancellationToken);
    Task DeleteAsync(int productId, CancellationToken cancellationToken);
}

### Core.UseCases

In [7]:

//using Core.Repositories;
using MediatR;

// Core/UseCases/AddStocks.cs
public class AddStocks
{
    public class Command : IRequest<int>
    {
        public int ProductId { get; set; }
        public int Amount { get; set; }
    }

    public class Handler : IRequestHandler<Command, int>
    {
        private readonly IProductRepository _productRepository;
        public Handler(IProductRepository productRepository)
        {
            _productRepository = productRepository ?? throw new ArgumentNullException(nameof(productRepository));
        }

        public async Task<int> Handle(Command request, CancellationToken cancellationToken)
        {
            var product = await _productRepository.FindByIdAsync(request.ProductId, cancellationToken);
            if (product == null)
            {
                throw new ProductNotFoundException(request.ProductId);
            }
            product.AddStock(request.Amount);
            await _productRepository.UpdateAsync(product, cancellationToken);
            return product.QuantityInStock;
        }
    }
}

// Core/UseCases/RemoveStocks.cs
public class RemoveStocks
{
    public class Command : IRequest<int>
    {
        public int ProductId { get; set; }
        public int Amount { get; set; }
    }

    public class Handler : IRequestHandler<Command, int>
    {
        private readonly IProductRepository _productRepository;
        public Handler(IProductRepository productRepository)
        {
            _productRepository = productRepository ?? throw new ArgumentNullException(nameof(productRepository));
        }

        public async Task<int> Handle(Command request, CancellationToken cancellationToken)
        {
            var product = await _productRepository.FindByIdAsync(request.ProductId, cancellationToken);
            if (product == null)
            {
                throw new ProductNotFoundException(request.ProductId);
            }
            product.RemoveStock(request.Amount);
            await _productRepository.UpdateAsync(product, cancellationToken);
            return product.QuantityInStock;
        }
    }
}

### Infrastructure.Data.EF

In [8]:

//using Core.Entities;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.EntityFrameworkCore;


public class ProductContext : DbContext
{
    public ProductContext(DbContextOptions options)
        : base(options)
    {
    }

    public DbSet<Product> Products => Set<Product>();
}

// using Core;
// using Core.Entities;
// using Core.Repositories;
// using Microsoft.EntityFrameworkCore;

public class ProductRepository : IProductRepository
{
    private readonly ProductContext _db;
    public ProductRepository(ProductContext db)
    {
        _db = db ?? throw new ArgumentNullException(nameof(db));
    }

    public async Task<IEnumerable<Product>> AllAsync(CancellationToken cancellationToken)
    {
        return (await _db.Products.ToArrayAsync(cancellationToken));
    }

    public async Task CreateAsync(Product product, CancellationToken cancellationToken)
    {
        _db.Products.Add(product);
        await _db.SaveChangesAsync(cancellationToken);
    }

    public async Task DeleteAsync(int productId, CancellationToken cancellationToken)
    {
        var product = await FindByIdAsync(productId, cancellationToken);
        if (product == null)
        {
            throw new ProductNotFoundException(productId);
        }
        _db.Products.Remove(product);
        await _db.SaveChangesAsync(cancellationToken);
    }

    public async Task<Product?> FindByIdAsync(int productId, CancellationToken cancellationToken)
    {
        var product = await _db.Products.FindAsync(new object[] { productId }, cancellationToken);
        return product;
    }

    public async Task UpdateAsync(Product product, CancellationToken cancellationToken)
    {
        _db.Entry(product).State = EntityState.Modified;
        await _db.SaveChangesAsync(cancellationToken);
    }
}



### Web

In [9]:
string[] args = {"--urls","http://localhost:7000"};

In [26]:
// using Core;
// using Core.Entities;
// using Core.Repositories;
// using Core.UseCases;
// using Infrastructure.Data.EF;


#r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.AspNetCore.dll"
#r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.Extensions.Hosting.dll"
#r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.AspNetCore.Mvc.ViewFeatures.dll"
#r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.AspNetCore.Diagnostics.dll"
#r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.AspNetCore.Http.dll"
#r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.AspNetCore.Http.Results.dll"
// #r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.Extensions.DependencyInjection.dll"
// #r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.Extensions.DependencyInjection.Abstractions.dll"
#r "C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App\9.0.2\Microsoft.Extensions.Logging.Abstractions.dll"
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.HttpResults;
using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using System.IO;

try{
    if (app != null)
    {
        await app.StopAsync();
    }
}
catch (Exception ex)
{
    Console.WriteLine($"Error Stopping application: {ex.Message}");
}

var builder = WebApplication.CreateBuilder(args);


builder.Services
    // Core Layer
    .AddMediatR(cfg => cfg.RegisterServicesFromAssemblyContaining<NotEnoughStockException>())

    // Infrastructure Layer (mapping Core to Infrastructure.Data.EF)
    .AddScoped<ProductContext>()
    .AddScoped<IProductRepository, ProductRepository>()
    //Manual registration of MediatR handlers - only needed for workbooks
    .AddTransient<MediatR.IRequestHandler<AddStocks.Command, Int32>,AddStocks.Handler>()
    .AddTransient<MediatR.IRequestHandler<RemoveStocks.Command, Int32>,RemoveStocks.Handler>()

    .AddDbContext<Microsoft.EntityFrameworkCore.DbContext>(options => options
        .UseInMemoryDatabase("ProductContextMemoryDB")
        .ConfigureWarnings(builder => builder.Ignore(InMemoryEventId.TransactionIgnoredWarning))
    )
;

var app = builder.Build();
app.MapGet("/products", async (IProductRepository productRepository, CancellationToken cancellationToken) =>
{
    var products = await productRepository.AllAsync(cancellationToken);
    return products.Select(p => new
    {
        p.Id,
        p.Name,
        p.QuantityInStock
    });
});
app.MapPost("/products/{productId:int}/add-stocks", async (int productId, AddStocks.Command command, IMediator mediator, CancellationToken cancellationToken) =>
{
    try
    {
        command.ProductId = productId;
        var quantityInStock = await mediator.Send(command, cancellationToken);
        var stockLevel = new StockLevel(quantityInStock);
        return Results.Ok(stockLevel);
    }
    catch (ProductNotFoundException ex)
    {
        return Results.NotFound(new
        {
            ex.Message,
            productId,
        });
    }
    catch (Exception ex) // Catch all other exceptions
    {
        return Results.InternalServerError(new
        {
            ex.Message
        });
    }
});
app.MapPost("/products/{productId:int}/remove-stocks", async (int productId, RemoveStocks.Command command, IMediator mediator, CancellationToken cancellationToken) =>
{
    try
    {
        command.ProductId = productId;
        var quantityInStock = await mediator.Send(command, cancellationToken);
        var stockLevel = new StockLevel(quantityInStock);
        return Results.Ok(stockLevel);
    }
    catch (NotEnoughStockException ex)
    {
        return Results.Conflict(new
        {
            ex.Message,
            ex.AmountToRemove,
            ex.QuantityInStock
        });
    }
    catch (ProductNotFoundException ex)
    {
        return Results.NotFound(new
        {
            ex.Message,
            productId,
        });
    }
});

using (var seedScope = app.Services.CreateScope())
{
    var db = seedScope.ServiceProvider.GetRequiredService<ProductContext>();
    await ProductSeeder.SeedAsync(db);
}

var startTask = app.RunAsync();

internal static class ProductSeeder
{
    public static Task SeedAsync(ProductContext db)
    {
        db.Database.EnsureCreated();
        if (db.Products.Any())  
        {
            return Task.CompletedTask; // No need to seed if products already exist
        }
        
        db.Products.Add(new Product(
            id: 1,
            name: "Banana",
            quantityInStock: 50
        ));
        db.Products.Add(new Product(
            id: 2,
            name: "Apple",
            quantityInStock: 20
        ));
        db.Products.Add(new Product(
            id: 3,
            name: "Habanero Pepper",
            quantityInStock: 10
        ));
        return db.SaveChangesAsync();
    }
}

public record class StockLevel(int QuantityInStock);


info: Microsoft.Hosting.Lifetime[14]
      Now listening on: http://localhost:7000
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
      Content root path: c:\Users\jason\training\dotnet\Architecting-ASP.NET-Core-Applications-3E\MyNotes


In [22]:
using System.Net.Http;

var httpClient = new HttpClient();
var payload = new { Amount = "10" };
var json = System.Text.Json.JsonSerializer.Serialize(payload);
var content = new StringContent(json, System.Text.Encoding.UTF8, "application/json");
var response = await httpClient.PostAsync("http://localhost:7000/products/3/add-stocks", content);
//response.EnsureSuccessStatusCode();
display(response);
var responseData = await response.Content.ReadAsStringAsync();
responseData.DisplayAs("application/json")

In [27]:
using System.Net.Http;

var httpClient = new HttpClient();
var payload = new { Amount = "10" };
var json = System.Text.Json.JsonSerializer.Serialize(payload);
var content = new StringContent(json, System.Text.Encoding.UTF8, "application/json");
var response = await httpClient.PostAsync("http://localhost:7000/products/3/remove-stocks", content);
//response.EnsureSuccessStatusCode();
display(response);
var responseData = await response.Content.ReadAsStringAsync();
responseData.DisplayAs("application/json")

In [28]:
using System.Net.Http;

var httpClient = new HttpClient();

var response = await httpClient.GetAsync("http://localhost:7000/products");
response.EnsureSuccessStatusCode();
var content = await response.Content.ReadAsStringAsync();
content.DisplayAs("application/json")

In [25]:

await app.StopAsync();
