Skip to content

Commit

Permalink
#12 - Unification EventStore usage (#14)
Browse files Browse the repository at this point in the history
* Upgraded to .NET core 2.1

* Downgraded version of EntityFramework, because it uses newer version of Npgsql because it collides with Marten

* Unified EventStore usage

* Ignored Pending events for Client entity framework entity
  • Loading branch information
oskardudycz committed Sep 5, 2018
1 parent cdc0263 commit a579edb
Show file tree
Hide file tree
Showing 16 changed files with 271 additions and 137 deletions.
13 changes: 8 additions & 5 deletions Domain/Events/EventBus.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using MediatR;
using System.Threading.Tasks;
using System.Threading.Tasks;
using MediatR;

namespace Domain.Events
{
Expand All @@ -12,9 +12,12 @@ public EventBus(IMediator mediator)
_mediator = mediator;
}

public Task Publish<TEvent>(TEvent @event) where TEvent : IEvent
public async Task Publish<TEvent>(params TEvent[] events) where TEvent : IEvent
{
return _mediator.Publish(@event);
foreach (var @event in events)
{
await _mediator.Publish(@event);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
using Domain.Events;
using System;
using System;
using System.Collections.Generic;
using Domain.Events;

namespace Domain.Aggregates
{
public abstract class EventSource : IAggregate
public abstract class EventSourcedAggregate : IEventSourcedAggregate
{
public Guid Id { get; protected set; }

public Queue<IEvent> PendingEvents { get; private set; }
protected EventSource()

protected EventSourcedAggregate()
{
PendingEvents = new Queue<IEvent>();
}
Expand All @@ -20,4 +20,4 @@ protected void Append(IEvent @event)
PendingEvents.Enqueue(@event);
}
}
}
}
4 changes: 2 additions & 2 deletions Domain/Events/IEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ namespace Domain.Events
{
public interface IEventBus
{
Task Publish<TEvent>(TEvent @event) where TEvent : IEvent;
Task Publish<TEvent>(params TEvent[] events) where TEvent : IEvent;
}
}
}
10 changes: 10 additions & 0 deletions Domain/Events/IEventSourcedAggregate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Collections.Generic;
using Domain.Events;

namespace Domain.Aggregates
{
public interface IEventSourcedAggregate : IAggregate
{
Queue<IEvent> PendingEvents { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Marten.Integration.Tests.TestsInfrasructure;
using SharpTestsEx;
using Xunit;

namespace Marten.Integration.Tests.EventStore.Projections
{
public class OutOfOrderProjectionsTest : MartenTest
{
private interface ITaskEvent
{
Guid TaskId { get; set; }

int TaskVersion { get; set; }
}

private class TaskCreated : ITaskEvent
{
public Guid TaskId { get; set; }
public string Description { get; set; }

public int TaskVersion { get; set; }
}

private class TaskUpdated : ITaskEvent
{
public Guid TaskId { get; set; }
public string Description { get; set; }

public int TaskVersion { get; set; }
}

private class Task
{
public Guid TaskId { get; set; }

public string Description { get; set; }
}

private class TaskList
{
public Guid Id { get; set; }
public List<Task> List { get; private set; }

public TaskList()
{
List = new List<Task>();
}

public void Apply(TaskCreated @event)
{
List.Add(new Task { TaskId = @event.TaskId, Description = @event.Description });
}

public void Apply(TaskUpdated @event)
{
var task = List.SingleOrDefault(t => t.TaskId == @event.TaskId);

if (task == null)
{
return;
}

task.Description = @event.Description;
}
}

protected override IDocumentSession CreateSession(Action<StoreOptions> setStoreOptions)
{
var store = DocumentStore.For(options =>
{
options.Connection(Settings.ConnectionString);
options.AutoCreateSchemaObjects = AutoCreate.All;
options.DatabaseSchemaName = SchemaName;
options.Events.DatabaseSchemaName = SchemaName;
//It's needed to manualy set that inline aggegation should be applied
options.Events.InlineProjections.AggregateStreamsWith<TaskList>();
});

return store.OpenSession();
}

[Fact]
public void GivenOutOfOrderEvents_WhenPublishedWithSetVersion_ThenLiveAggregationWorksFine()
{
var firstTaskId = Guid.NewGuid();
var secondTaskId = Guid.NewGuid();

var events = new ITaskEvent[]
{
new TaskUpdated {TaskId = firstTaskId, Description = "Final First Task Description", TaskVersion = 4 },
new TaskCreated {TaskId = firstTaskId, Description = "First Task", TaskVersion = 1 },
new TaskCreated {TaskId = secondTaskId, Description = "Second Task 2", TaskVersion = 2 },
new TaskUpdated {TaskId = firstTaskId, Description = "Intermediate First Task Description", TaskVersion = 3},
new TaskUpdated {TaskId = secondTaskId, Description = "Final Second Task Description", TaskVersion = 5},
};

//1. Create events
var streamId = EventStore.StartStream<TaskList>(events).Id;

Session.SaveChanges();

//2. Get live agregation
var taskListFromLiveAggregation = EventStore.AggregateStream<TaskList>(streamId);

//3. Get inline aggregation
var taskListFromInlineAggregation = Session.Load<TaskList>(streamId);

taskListFromLiveAggregation.Should().Not.Be.Null();
taskListFromInlineAggregation.Should().Not.Be.Null();

taskListFromLiveAggregation.List.Count.Should().Be.EqualTo(2);
taskListFromLiveAggregation.List.Count.Should().Be.EqualTo(taskListFromInlineAggregation.List.Count);
}
}
}
20 changes: 13 additions & 7 deletions Sample/EventSourcing.Sample.Clients/Domain/Clients/Client.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
using Domain.Aggregates;
using System;
using Domain.Aggregates;
using EventSourcing.Sample.Clients.Contracts.Clients.DTOs;
using System;
using EventSourcing.Sample.Clients.Contracts.Clients.Events;

namespace EventSourcing.Sample.Clients.Domain.Clients
{
public class Client : IAggregate
public class Client : EventSourcedAggregate
{
public Guid Id { get; private set; }

public string Name { get; private set; }

public string Email { get; private set; }

public Client()
{

}

public Client(Guid id, string name, string email)
{
Id = id;
Name = name;
Email = email;

Append(new ClientCreated(id, new ClientInfo
{
Email = email,
Name = name
}));
}

public void Update(ClientInfo clientInfo)
{
Name = clientInfo.Name;
Email = clientInfo.Email;

Append(new ClientUpdated(Id, clientInfo));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Domain.Commands;
using Domain.Events;
using EventSourcing.Sample.Clients.Contracts.Clients.Commands;
using EventSourcing.Sample.Clients.Contracts.Clients.Events;
using EventSourcing.Sample.Clients.Storage;
using MediatR;
using Microsoft.EntityFrameworkCore;
Expand All @@ -20,7 +19,9 @@ public class ClientsCommandHandler :

private DbSet<Client> Clients;

public ClientsCommandHandler(ClientsDbContext dbContext, IEventBus eventBus)
public ClientsCommandHandler(
ClientsDbContext dbContext,
IEventBus eventBus)
{
this.dbContext = dbContext;
Clients = dbContext.Clients;
Expand All @@ -29,15 +30,15 @@ public ClientsCommandHandler(ClientsDbContext dbContext, IEventBus eventBus)

public async Task<Unit> Handle(CreateClient command, CancellationToken cancellationToken = default(CancellationToken))
{
await Clients.AddAsync(new Client(
var client = new Client(
command.Id.Value,
command.Data.Name,
command.Data.Email
));
);

await dbContext.SaveChangesAsync(cancellationToken);
await Clients.AddAsync(client);

await eventBus.Publish(new ClientCreated(command.Id.Value, command.Data));
await SaveAndPublish(client, cancellationToken);

return Unit.Value;
}
Expand All @@ -50,9 +51,7 @@ public async Task<Unit> Handle(UpdateClient command, CancellationToken cancellat

dbContext.Update(client);

await dbContext.SaveChangesAsync(cancellationToken);

await eventBus.Publish(new ClientUpdated(command.Id, command.Data));
await SaveAndPublish(client, cancellationToken);

return Unit.Value;
}
Expand All @@ -63,11 +62,16 @@ public async Task<Unit> Handle(DeleteClient command, CancellationToken cancellat

dbContext.Remove(client);

await dbContext.SaveChangesAsync(cancellationToken);

await eventBus.Publish(new ClientDeleted(command.Id));
await SaveAndPublish(client, cancellationToken);

return Unit.Value;
}

private async Task SaveAndPublish(Client client, CancellationToken cancellationToken = default(CancellationToken))
{
await dbContext.SaveChangesAsync(cancellationToken);

await eventBus.Publish(client.PendingEvents.ToArray());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ public ClientsDbContext(DbContextOptions<ClientsDbContext> options)
{
}

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Client>().Ignore(c => c.PendingEvents);
}

public DbSet<Client> Clients { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using Domain.Aggregates;
using System;
using Domain.Aggregates;
using EventSourcing.Sample.Tasks.Contracts.Accounts.Events;
using EventSourcing.Sample.Tasks.Contracts.Transactions;
using EventSourcing.Sample.Tasks.Contracts.Transactions.Events;
using EventSourcing.Sample.Transactions.Domain.Accounts;
using System;

namespace EventSourcing.Sample.Tasks.Domain.Accounts
{
public class Account : EventSource
public class Account : EventSourcedAggregate
{
public Guid ClientId { get; private set; }

Expand Down Expand Up @@ -63,4 +63,4 @@ public void Apply(NewOutflowRecorded @event)
Balance -= @event.Outflow.Ammount;
}
}
}
}
Loading

0 comments on commit a579edb

Please sign in to comment.