Skip to content

Commit

Permalink
Refactored EventStoreDBRepository to be wrapped by optimistic concurr…
Browse files Browse the repository at this point in the history
…ency scope instead of having classes injected
  • Loading branch information
oskardudycz committed Mar 2, 2022
1 parent 5c6cd88 commit 63d98d3
Show file tree
Hide file tree
Showing 22 changed files with 188 additions and 217 deletions.
2 changes: 1 addition & 1 deletion Core.EventStoreDB/Config.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using Core.BackgroundWorkers;
using Core.Events.NoMediator;
using Core.EventStoreDB.OptimisticConcurrency;
Expand Down Expand Up @@ -32,6 +31,7 @@ public static IServiceCollection AddEventStoreDB(this IServiceCollection service
.AddSingleton(new EventStoreClient(EventStoreClientSettings.Create(eventStoreDBConfig.ConnectionString)))
.AddScoped<EventStoreDBExpectedStreamRevisionProvider, EventStoreDBExpectedStreamRevisionProvider>()
.AddScoped<EventStoreDBNextStreamRevisionProvider, EventStoreDBNextStreamRevisionProvider>()
.AddScoped<EventStoreDBOptimisticConcurrencyScope, EventStoreDBOptimisticConcurrencyScope>()
.AddTransient<EventStoreDBSubscriptionToAll, EventStoreDBSubscriptionToAll>();

if (options?.UseInternalCheckpointing != false)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
namespace Core.EventStoreDB.OptimisticConcurrency;
using System;
using System.Threading.Tasks;

namespace Core.EventStoreDB.OptimisticConcurrency;

public class EventStoreDBOptimisticConcurrencyScope
{
private readonly EventStoreDBExpectedStreamRevisionProvider expectedStreamVersionProvider;
private readonly EventStoreDBNextStreamRevisionProvider nextStreamVersionProvider;

public EventStoreDBOptimisticConcurrencyScope(
EventStoreDBExpectedStreamRevisionProvider expectedStreamVersionProvider,
EventStoreDBNextStreamRevisionProvider nextStreamVersionProvider
)
{
this.expectedStreamVersionProvider = expectedStreamVersionProvider;
this.nextStreamVersionProvider = nextStreamVersionProvider;
}

public async Task Do(Func<ulong?, Task<ulong>> handler)
{
var expectedVersion = expectedStreamVersionProvider.Value;

var nextVersion = await handler(expectedVersion);

nextStreamVersionProvider.Set(nextVersion);
}
}

public class EventStoreDBExpectedStreamRevisionProvider
{
Expand Down
38 changes: 13 additions & 25 deletions Core.EventStoreDB/Repository/EventStoreDBRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using Core.Aggregates;
using Core.Events;
using Core.EventStoreDB.Events;
using Core.EventStoreDB.OptimisticConcurrency;
using Core.EventStoreDB.Serialization;
using EventStore.Client;

Expand All @@ -15,26 +14,18 @@ namespace Core.EventStoreDB.Repository;
public interface IEventStoreDBRepository<T> where T : class, IAggregate
{
Task<T?> Find(Guid id, CancellationToken cancellationToken);
Task Add(T aggregate, CancellationToken cancellationToken);
Task Update(T aggregate, CancellationToken cancellationToken);
Task Delete(T aggregate, CancellationToken cancellationToken);
Task<ulong> Add(T aggregate, CancellationToken cancellationToken);
Task<ulong> Update(T aggregate, ulong? expectedRevision = null, CancellationToken cancellationToken = default);
Task<ulong> Delete(T aggregate, ulong? expectedRevision = null, CancellationToken cancellationToken = default);
}

public class EventStoreDBRepository<T>: IEventStoreDBRepository<T> where T : class, IAggregate
{
private readonly EventStoreClient eventStore;
private readonly EventStoreDBExpectedStreamRevisionProvider expectedStreamRevisionProvider;
private readonly EventStoreDBNextStreamRevisionProvider nextStreamRevisionProvider;

public EventStoreDBRepository(
EventStoreClient eventStore,
EventStoreDBExpectedStreamRevisionProvider expectedStreamRevisionProvider,
EventStoreDBNextStreamRevisionProvider nextStreamRevisionProvider
)
public EventStoreDBRepository(EventStoreClient eventStore)
{
this.eventStore = eventStore ?? throw new ArgumentNullException(nameof(eventStore));
this.expectedStreamRevisionProvider = expectedStreamRevisionProvider;
this.nextStreamRevisionProvider = nextStreamRevisionProvider;
}

public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
Expand All @@ -43,35 +34,32 @@ EventStoreDBNextStreamRevisionProvider nextStreamRevisionProvider
cancellationToken
);

public async Task Add(T aggregate, CancellationToken cancellationToken)
public async Task<ulong> Add(T aggregate, CancellationToken cancellationToken = default)
{
var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
StreamState.NoStream,
GetEventsToStore(aggregate),
cancellationToken: cancellationToken
);
nextStreamRevisionProvider.Set(result.NextExpectedStreamRevision);
return result.NextExpectedStreamRevision;
}

public async Task Update(T aggregate, CancellationToken cancellationToken)
public async Task<ulong> Update(T aggregate, ulong? expectedRevision = null, CancellationToken cancellationToken = default)
{
var nextVersion = expectedRevision ?? (ulong)aggregate.Version;

var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
GetExpectedStreamRevision(),
nextVersion,
GetEventsToStore(aggregate),
cancellationToken: cancellationToken
);
nextStreamRevisionProvider.Set(result.NextExpectedStreamRevision);
return result.NextExpectedStreamRevision;
}

public Task Delete(T aggregate, CancellationToken cancellationToken) =>
Update(aggregate, cancellationToken);

private StreamRevision GetExpectedStreamRevision() =>
expectedStreamRevisionProvider.Value ??
throw new ArgumentNullException(nameof(expectedStreamRevisionProvider.Value),
"Stream revision was not provided.");
public Task<ulong> Delete(T aggregate, ulong? expectedRevision = null, CancellationToken cancellationToken = default) =>
Update(aggregate, expectedRevision, cancellationToken);

private static IEnumerable<EventData> GetEventsToStore(T aggregate)
{
Expand Down
9 changes: 4 additions & 5 deletions Core.EventStoreDB/Repository/RepositoryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@ CancellationToken cancellationToken
return entity ?? throw AggregateNotFoundException.For<T>(id);
}

public static async Task<Unit> GetAndUpdate<T>(
public static async Task<ulong> GetAndUpdate<T>(
this IEventStoreDBRepository<T> repository,
Guid id,
Action<T> action,
CancellationToken cancellationToken
ulong? expectedVersion = null,
CancellationToken cancellationToken = default
) where T : class, IAggregate
{
var entity = await repository.Get(id, cancellationToken);

action(entity);

await repository.Update(entity, cancellationToken);

return Unit.Value;
return await repository.Update(entity, expectedVersion, cancellationToken);
}
}
3 changes: 0 additions & 3 deletions Core.Marten/ExternalProjections/MartenExternalProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@
using Core.Events;
using Core.Events.NoMediator;
using Core.Projections;
using Core.Serialization.Newtonsoft;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;

namespace Core.Marten.ExternalProjections;

public class MartenExternalProjection<TEvent, TView>: INoMediatorEventHandler<StreamEvent<TEvent>>
where TView : IVersionedProjection
where TEvent : notnull
{
private readonly string projectionName = typeof(TView).Name;
private readonly IDocumentSession session;
private readonly Func<TEvent, Guid> getId;

Expand Down
1 change: 0 additions & 1 deletion Core.WebApi/Tracing/Correlation/CorrelationIdMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Core.WebApi.OptimisticConcurrency;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Carts.ShoppingCarts.InitializingCart;
using Carts.Tests.Extensions.Reservations;
using Carts.Tests.Stubs.Repositories;
using Core.EventStoreDB.OptimisticConcurrency;
using FluentAssertions;
using Xunit;

Expand All @@ -18,9 +19,14 @@ public async Task ForInitCardCommand_ShouldAddNewCart()
{
// Given
var repository = new FakeRepository<ShoppingCart>();
var scope = new EventStoreDBOptimisticConcurrencyScope(
new EventStoreDBExpectedStreamRevisionProvider(),
new EventStoreDBNextStreamRevisionProvider()
);

var commandHandler = new HandleInitializeCart(
repository
repository,
scope
);

var command = InitializeShoppingCart.Create(Guid.NewGuid(), Guid.NewGuid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ public FakeRepository(params T[] aggregates)
return Task.FromResult(Aggregates.GetValueOrDefault(id));
}

public Task Add(T aggregate, CancellationToken cancellationToken)
public async Task<ulong> Add(T aggregate, CancellationToken cancellationToken = default)
{
Aggregates.Add(aggregate.Id, aggregate);
return Task.CompletedTask;
return await Task.FromResult((ulong)aggregate.Version);
}

public Task Update(T aggregate, CancellationToken cancellationToken)
public async Task<ulong> Update(T aggregate, ulong? expectedVersion = null, CancellationToken cancellationToken = default)
{
Aggregates[aggregate.Id] = aggregate;
return Task.CompletedTask;
return await Task.FromResult((ulong)aggregate.Version);
}

public Task Delete(T aggregate, CancellationToken cancellationToken)
public async Task<ulong> Delete(T aggregate, ulong? expectedVersion = null, CancellationToken cancellationToken = default)
{
Aggregates.Remove(aggregate.Id);
return Task.CompletedTask;
return await Task.FromResult((ulong)aggregate.Version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,17 @@
using Carts.Pricing;
using Carts.ShoppingCarts.Products;
using Core.Commands;
using Core.EventStoreDB.OptimisticConcurrency;
using Core.EventStoreDB.Repository;
using MediatR;

namespace Carts.ShoppingCarts.AddingProduct;

public class AddProduct: ICommand
public record AddProduct(
Guid CartId,
ProductItem ProductItem
): ICommand
{

public Guid CartId { get; }

public ProductItem ProductItem { get; }

private AddProduct(Guid cartId, ProductItem productItem)
{
CartId = cartId;
ProductItem = productItem;
}
public static AddProduct Create(Guid? cartId, ProductItem? productItem)
{
if (cartId == null || cartId == Guid.Empty)
Expand All @@ -37,21 +31,30 @@ internal class HandleAddProduct:
{
private readonly IEventStoreDBRepository<ShoppingCart> cartRepository;
private readonly IProductPriceCalculator productPriceCalculator;
private readonly EventStoreDBOptimisticConcurrencyScope scope;

public HandleAddProduct(
IEventStoreDBRepository<ShoppingCart> cartRepository,
IProductPriceCalculator productPriceCalculator
IProductPriceCalculator productPriceCalculator,
EventStoreDBOptimisticConcurrencyScope scope
)
{
this.cartRepository = cartRepository;
this.productPriceCalculator = productPriceCalculator;
this.scope = scope;
}

public Task<Unit> Handle(AddProduct command, CancellationToken cancellationToken)
public async Task<Unit> Handle(AddProduct command, CancellationToken cancellationToken)
{
return cartRepository.GetAndUpdate(
command.CartId,
cart => cart.AddProduct(productPriceCalculator, command.ProductItem),
cancellationToken);
await scope.Do(expectedRevision =>
cartRepository.GetAndUpdate(
command.CartId,
cart => cart.AddProduct(productPriceCalculator, command.ProductItem),
expectedRevision,
cancellationToken
)
);

return Unit.Value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,16 @@

namespace Carts.ShoppingCarts.AddingProduct;

public class ProductAdded: IEvent
public record ProductAdded(
Guid CartId,
PricedProductItem ProductItem
): IEvent
{
public Guid CartId { get; }

public PricedProductItem ProductItem { get; }

private ProductAdded(Guid cartId, PricedProductItem productItem)
{
CartId = cartId;
ProductItem = productItem;
}

public static ProductAdded Create(Guid cartId, PricedProductItem productItem)
{
if (cartId == Guid.Empty)
throw new ArgumentOutOfRangeException(nameof(cartId));

return new ProductAdded(cartId, productItem);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@
using System.Threading;
using System.Threading.Tasks;
using Core.Commands;
using Core.EventStoreDB.OptimisticConcurrency;
using Core.EventStoreDB.Repository;
using MediatR;

namespace Carts.ShoppingCarts.ConfirmingCart;

public class ConfirmShoppingCart: ICommand
public record ConfirmShoppingCart(
Guid CartId
): ICommand
{
public Guid CartId { get; }

private ConfirmShoppingCart(Guid cartId)
{
CartId = cartId;
}

public static ConfirmShoppingCart Create(Guid? cartId)
{
if (cartId == null || cartId == Guid.Empty)
Expand All @@ -29,19 +25,28 @@ internal class HandleConfirmCart:
ICommandHandler<ConfirmShoppingCart>
{
private readonly IEventStoreDBRepository<ShoppingCart> cartRepository;
private readonly EventStoreDBOptimisticConcurrencyScope scope;

public HandleConfirmCart(
IEventStoreDBRepository<ShoppingCart> cartRepository
IEventStoreDBRepository<ShoppingCart> cartRepository,
EventStoreDBOptimisticConcurrencyScope scope
)
{
this.cartRepository = cartRepository;
this.scope = scope;
}

public Task<Unit> Handle(ConfirmShoppingCart command, CancellationToken cancellationToken)
public async Task<Unit> Handle(ConfirmShoppingCart command, CancellationToken cancellationToken)
{
return cartRepository.GetAndUpdate(
command.CartId,
cart => cart.Confirm(),
cancellationToken);
await scope.Do(expectedRevision =>
cartRepository.GetAndUpdate(
command.CartId,
cart => cart.Confirm(),
expectedRevision,
cancellationToken
)
);

return Unit.Value;
}
}
Loading

0 comments on commit 63d98d3

Please sign in to comment.