Skip to content

Commit

Permalink
Merge pull request #41 from marcwittke/hotfix/3.2.11
Browse files Browse the repository at this point in the history
Hotfix/3.2.11
  • Loading branch information
marcwittke committed Jul 20, 2018
2 parents 7c56ce1 + 867fb39 commit 40c7fcd
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 32 deletions.
31 changes: 27 additions & 4 deletions src/Backend.Fx/BuildingBlocks/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Environment.MultiTenancy;
using Exceptions;
using Extensions;
using JetBrains.Annotations;
using Logging;
using Patterns.Authorization;
using Patterns.DependencyInjection;
Expand All @@ -18,6 +19,7 @@ public abstract class Repository<TAggregateRoot> : IRepository<TAggregateRoot> w

protected Repository(ICurrentTHolder<TenantId> tenantIdHolder, IAggregateAuthorization<TAggregateRoot> aggregateAuthorization)
{
Logger.Trace($"Instantiating a new Repository<{AggregateTypeName}> for tenant [{(tenantIdHolder.Current.HasValue ? tenantIdHolder.Current.Value.ToString() : "null")}]");
this.tenantIdHolder = tenantIdHolder;
this.aggregateAuthorization = aggregateAuthorization;
}
Expand Down Expand Up @@ -45,7 +47,7 @@ public IQueryable<TAggregateRoot> AggregateQueryable

public TAggregateRoot Single(int id)
{
Logger.Debug($"Removing {AggregateTypeName}[{id}]");
Logger.Debug($"Getting single {AggregateTypeName}[{id}]");
var aggregateRoot = AggregateQueryable.FirstOrDefault(aggr => aggr.Id.Equals(id));
if (aggregateRoot == null)
{
Expand All @@ -57,6 +59,7 @@ public TAggregateRoot Single(int id)

public TAggregateRoot SingleOrDefault(int id)
{
Logger.Debug($"Getting single or default {AggregateTypeName}[{id}]");
return AggregateQueryable.FirstOrDefault(aggr => aggr.Id.Equals(id));
}

Expand All @@ -65,8 +68,14 @@ public TAggregateRoot[] GetAll()
return AggregateQueryable.ToArray();
}

public void Delete(TAggregateRoot aggregateRoot)
public void Delete([NotNull] TAggregateRoot aggregateRoot)
{
if (aggregateRoot == null)
{
throw new ArgumentNullException(nameof(aggregateRoot));
}

Logger.Debug($"Deleting {AggregateTypeName}[{aggregateRoot.Id}]");
if (aggregateRoot.TenantId != tenantIdHolder.Current.Value || !aggregateAuthorization.CanDelete(aggregateRoot))
{
throw new System.Security.SecurityException($"You are not allowed to delete {typeof(TAggregateRoot).Name}[{aggregateRoot.Id}]");
Expand All @@ -75,10 +84,16 @@ public void Delete(TAggregateRoot aggregateRoot)
DeletePersistent(aggregateRoot);
}

public void Add(TAggregateRoot aggregateRoot)
public void Add([NotNull] TAggregateRoot aggregateRoot)
{
if (aggregateRoot == null)
{
throw new ArgumentNullException(nameof(aggregateRoot));
}

if (aggregateAuthorization.CanCreate(aggregateRoot))
{
Logger.Debug($"Adding {AggregateTypeName}[{aggregateRoot.Id}]");
aggregateRoot.TenantId = tenantIdHolder.Current.Value;
AddPersistent(aggregateRoot);
}
Expand All @@ -88,15 +103,23 @@ public void Add(TAggregateRoot aggregateRoot)
}
}

public void AddRange(TAggregateRoot[] aggregateRoots)
public void AddRange([NotNull] TAggregateRoot[] aggregateRoots)
{
if (aggregateRoots == null)
{
throw new ArgumentNullException(nameof(aggregateRoots));
}

aggregateRoots.ForAll(agg =>
{
if (!aggregateAuthorization.CanCreate(agg))
{
throw new System.Security.SecurityException($"You are not allowed to create records of type {typeof(TAggregateRoot).Name}");
}
});

Logger.Debug($"Adding {aggregateRoots.Length} items of type {AggregateTypeName}");

aggregateRoots.ForAll(agg => agg.TenantId = tenantIdHolder.Current.Value);

AddRangePersistent(aggregateRoots);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
namespace Backend.Fx.Patterns.EventAggregation.Domain
{
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using Logging;

public class DomainEventAggregator : IDomainEventAggregator
{
private static readonly ILogger Logger = LogManager.Create<DomainEventAggregator>();
private readonly IDomainEventHandlerProvider domainEventHandlerProvider;
private readonly List<(string, string, Action)> handleActions = new List<(string, string, Action)>();
private readonly ConcurrentQueue<(string, string, Action)> handleActions = new ConcurrentQueue<(string, string, Action)>();

public DomainEventAggregator(IDomainEventHandlerProvider domainEventHandlerProvider)
{
Expand All @@ -30,14 +30,14 @@ public DomainEventAggregator(IDomainEventHandlerProvider domainEventHandlerProvi
injectedHandler.GetType().Name,
() => injectedHandler.Handle(domainEvent));

handleActions.Add(handleAction);
handleActions.Enqueue(handleAction);
Logger.Debug($"Invocation of {injectedHandler.GetType().Name} for domain event {typeof(TDomainEvent).Name} registered. It will be executed on completion of unit of work");
}
}

public void RaiseEvents()
{
foreach (var handleAction in handleActions)
while (handleActions.TryDequeue(out var handleAction))
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ public interface IEventBus : IDisposable
{
void Connect();

/// <summary>
/// Directly publishes an event on the event bus without delay.
/// In most cases you want to publish an event when the cause is considered as safely done, e.g. when the
/// wrapping transaction is committed. Use <see cref="IEventBusScope"/> to let the framework raise all events
/// after committing the unit of work.
/// </summary>
/// <param name="integrationEvent"></param>
/// <returns></returns>
Task Publish(IIntegrationEvent integrationEvent);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
namespace Backend.Fx.Patterns.EventAggregation.Integration
{
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using BuildingBlocks;

public interface IEventBusScope : IDomainService
{
/// <summary>
/// Enqueues an event to be raised later.
/// Intention is to let events bubble up after an operation has terminated, e.g. when a wrapping
/// unit of work has completed.
/// </summary>
/// <param name="integrationEvent"></param>
void Publish(IIntegrationEvent integrationEvent);
Task RaiseEvents();
}

public class EventBusScope : IEventBusScope
{
private readonly List<IIntegrationEvent> integrationEvents = new List<IIntegrationEvent>();
private readonly ConcurrentQueue<IIntegrationEvent> integrationEvents = new ConcurrentQueue<IIntegrationEvent>();
private readonly IEventBus eventBus;

public EventBusScope(IEventBus eventBus)
Expand All @@ -22,12 +28,12 @@ public EventBusScope(IEventBus eventBus)

void IEventBusScope.Publish(IIntegrationEvent integrationEvent)
{
integrationEvents.Add(integrationEvent);
integrationEvents.Enqueue(integrationEvent);
}

public async Task RaiseEvents()
{
foreach (var integrationEvent in integrationEvents)
while (integrationEvents.TryDequeue(out var integrationEvent))
{
await eventBus.Publish(integrationEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,42 @@

public class InMemoryEventBus : EventBus
{
private readonly IExceptionLogger exceptionLogger;

public InMemoryEventBus(IScopeManager scopeManager, IExceptionLogger exceptionLogger)
: base(scopeManager, exceptionLogger)
{ }
{
this.exceptionLogger = exceptionLogger;
}

public override void Connect()
{ }

public override Task Publish(IIntegrationEvent integrationEvent)
{
return Task.Run(
() => Process(integrationEvent.GetType().FullName, new InMemoryProcessingContext(integrationEvent)));
// Processing is done on the thread pool and not being awaited. This emulates best the behavior of a real
// event bus, that incorporates network transfer and another system handling the event
Task.Run(() =>
{
try
{
Process(integrationEvent.GetType().FullName, new InMemoryProcessingContext(integrationEvent));
}
catch (Exception ex)
{
exceptionLogger.LogException(ex);
}
});
return Task.CompletedTask;
}

protected override void Subscribe(string eventName)
{}
{ }

protected override void Unsubscribe(string eventName)
{}
{ }

private class InMemoryProcessingContext : EventProcessingContext
private class InMemoryProcessingContext : EventProcessingContext
{
private readonly IIntegrationEvent integrationEvent;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Backend.Fx.Tests.Patterns.EventAggregation.Integration
{
using System.Threading;
using Fx.Patterns.EventAggregation.Integration;

public class LongRunningEventHandler : IIntegrationEventHandler<TestIntegrationEvent>
{
public void Handle(TestIntegrationEvent eventData)
{
Thread.Sleep(1000);
eventData.Processed.Set();
}
}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Backend.Fx.Tests.Patterns.EventAggregation.Integration
{
using System.Threading;
using Fx.Patterns.EventAggregation.Integration;

public class TestIntegrationEvent : IntegrationEvent
Expand All @@ -8,6 +9,8 @@ public class TestIntegrationEvent : IntegrationEvent

public string StringParam { get; }

public ManualResetEventSlim Processed = new ManualResetEventSlim(false);

public TestIntegrationEvent(int intParam, string stringParam) : base(55)
{
IntParam = intParam;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace Backend.Fx.Tests.Patterns.EventAggregation.Integration
{
using System;
using System.Diagnostics;
using System.Security.Principal;
using System.Threading.Tasks;
using FakeItEasy;
Expand All @@ -12,10 +13,24 @@

public sealed class TheInMemoryEventBus : TheEventBus
{

protected override IEventBus Create(IScopeManager scopeManager, IExceptionLogger exceptionLogger)
{
return new InMemoryEventBus(scopeManager, exceptionLogger);
}

[Fact]
public async Task HandlesEventsAsynchronously()
{
Sut.Subscribe<LongRunningEventHandler, TestIntegrationEvent>();
Stopwatch sw = new Stopwatch();
sw.Start();
var integrationEvent = new TestIntegrationEvent(1,"a");
await Sut.Publish(integrationEvent);
Assert.True(sw.ElapsedMilliseconds < 100);
integrationEvent.Processed.Wait(1500);
Assert.True(sw.ElapsedMilliseconds > 1000);
}
}

public sealed class TheSerializingEventBus : TheEventBus
Expand All @@ -29,22 +44,23 @@ protected override IEventBus Create(IScopeManager scopeManager, IExceptionLogger
public abstract class TheEventBus
{
private readonly EventBusFakeInjection inj = new EventBusFakeInjection();
private readonly IEventBus sut;
public IEventBus Sut { get; }

protected TheEventBus()
{
// ReSharper disable once VirtualMemberCallInConstructor
sut = Create(inj.ScopeManager, inj.ExceptionLogger);
Sut = Create(inj.ScopeManager, inj.ExceptionLogger);
}

protected abstract IEventBus Create(IScopeManager scopeManager, IExceptionLogger exceptionLogger);

[Fact]
public async Task CallsTypedEventHandler()
{
sut.Subscribe<TypedEventHandler, TestIntegrationEvent>();
await sut.Publish(new TestIntegrationEvent(34, "gaga"));

Sut.Subscribe<TypedEventHandler, TestIntegrationEvent>();
var integrationEvent = new TestIntegrationEvent(34, "gaga");
await Sut.Publish(integrationEvent);
integrationEvent.Processed.Wait(1500);
A.CallTo(() => inj.TypedHandler.Handle(A<TestIntegrationEvent>
.That
.Matches(evt => evt.IntParam == 34 && evt.StringParam == "gaga")))
Expand All @@ -56,8 +72,10 @@ public async Task CallsTypedEventHandler()
[Fact]
public async void HandlesExceptionFromTypedEventHandler()
{
sut.Subscribe<ThrowingTypedEventHandler, TestIntegrationEvent>();
await sut.Publish(new TestIntegrationEvent(34, "gaga"));
Sut.Subscribe<ThrowingTypedEventHandler, TestIntegrationEvent>();
var integrationEvent = new TestIntegrationEvent(34, "gaga");
await Sut.Publish(integrationEvent);
integrationEvent.Processed.Wait(1500);

A.CallTo(() => inj.ExceptionLogger.LogException(A<InvalidOperationException>
.That
Expand All @@ -68,17 +86,22 @@ public async void HandlesExceptionFromTypedEventHandler()
[Fact]
public async void CallsDynamicEventHandler()
{
sut.Subscribe<DynamicEventHandler>(typeof(TestIntegrationEvent).FullName);
await sut.Publish(new TestIntegrationEvent(34, "gaga"));
Sut.Subscribe<DynamicEventHandler>(typeof(TestIntegrationEvent).FullName);
var integrationEvent = new TestIntegrationEvent(34, "gaga");
await Sut.Publish(integrationEvent);
integrationEvent.Processed.Wait(1500);

A.CallTo(() => inj.TypedHandler.Handle(A<TestIntegrationEvent>._)).MustNotHaveHappened();
A.CallTo(() => inj.DynamicHandler.Handle(A<object>._)).MustHaveHappenedOnceExactly();
}

[Fact]
public async void HandlesExceptionFromDynamicEventHandler()
{
sut.Subscribe<ThrowingDynamicEventHandler>(typeof(TestIntegrationEvent).FullName);
await sut.Publish(new TestIntegrationEvent(34, "gaga"));
Sut.Subscribe<ThrowingDynamicEventHandler>(typeof(TestIntegrationEvent).FullName);
var integrationEvent = new TestIntegrationEvent(34, "gaga");
await Sut.Publish(integrationEvent);
integrationEvent.Processed.Wait(1500);

A.CallTo(() => inj.ExceptionLogger.LogException(A<InvalidOperationException>
.That
Expand All @@ -89,9 +112,12 @@ public async void HandlesExceptionFromDynamicEventHandler()
[Fact]
public async void CallsMixedEventHandlers()
{
sut.Subscribe<DynamicEventHandler>(typeof(TestIntegrationEvent).FullName);
sut.Subscribe<TypedEventHandler, TestIntegrationEvent>();
await sut.Publish(new TestIntegrationEvent(34, "gaga"));
Sut.Subscribe<DynamicEventHandler>(typeof(TestIntegrationEvent).FullName);
Sut.Subscribe<TypedEventHandler, TestIntegrationEvent>();
var integrationEvent = new TestIntegrationEvent(34, "gaga");
await Sut.Publish(integrationEvent);
integrationEvent.Processed.Wait(1500);

A.CallTo(() => inj.TypedHandler.Handle(A<TestIntegrationEvent>
.That
.Matches(evt => evt.IntParam == 34 && evt.StringParam == "gaga")))
Expand All @@ -116,6 +142,9 @@ public EventBusFakeInjection()
A.CallTo(() => Scope.GetInstance(A<Type>.That.IsEqualTo(typeof(TypedEventHandler))))
.Returns(new TypedEventHandler(TypedHandler));

A.CallTo(() => Scope.GetInstance(A<Type>.That.IsEqualTo(typeof(LongRunningEventHandler))))
.Returns(new LongRunningEventHandler());

A.CallTo(() => Scope.GetInstance(A<Type>.That.IsEqualTo(typeof(ThrowingTypedEventHandler))))
.Returns(new ThrowingTypedEventHandler());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public TypedEventHandler(IIntegrationEventHandler<TestIntegrationEvent> integrat
public void Handle(TestIntegrationEvent eventData)
{
integrationEventHandlerImplementation.Handle(eventData);
eventData.Processed.Set();
}
}
}

0 comments on commit 40c7fcd

Please sign in to comment.