Skip to content

Commit

Permalink
Merge pull request #86 from marcwittke/hotfix/5.1.16
Browse files Browse the repository at this point in the history
InMemoryEventBus async Handling korrigiert
  • Loading branch information
marcwittke committed Oct 15, 2019
2 parents 262ec8e + 263e9f5 commit a5e4f3e
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace Backend.Fx.Environment.Persistence
public interface IDatabaseUtil
{
bool WaitUntilAvailable(int retries, Func<int, TimeSpan> sleepDurationProvider);
Task<bool> WaitUntilAvailableAsync(int retries, Func<int, TimeSpan> sleepDurationProvider, CancellationToken cancellationToken = default);
Task<bool> WaitUntilAvailableAsync(int retries, Func<int, TimeSpan> sleepDurationProvider, CancellationToken cancellationToken = default(CancellationToken));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected BackendFxApplication(ICompositionRoot compositionRoot, ITenantIdServic
public ITenantIdService TenantIdService { get; }

/// <inheritdoc />
public async Task Boot(CancellationToken cancellationToken = default)
public async Task Boot(CancellationToken cancellationToken = default(CancellationToken))
{
Logger.Info("Booting application");
await OnBoot(cancellationToken);
Expand All @@ -52,7 +52,7 @@ public async Task Boot(CancellationToken cancellationToken = default)
_isBooted.Set();
}

public bool WaitForBoot(int timeoutMilliSeconds = int.MaxValue, CancellationToken cancellationToken = default)
public bool WaitForBoot(int timeoutMilliSeconds = int.MaxValue, CancellationToken cancellationToken = default(CancellationToken))
{
return _isBooted.Wait(timeoutMilliSeconds, cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ public interface IBackendFxApplication : IDisposable
/// <summary>
/// allows synchronously awaiting application startup
/// </summary>
bool WaitForBoot(int timeoutMilliSeconds = int.MaxValue, CancellationToken cancellationToken = default);
bool WaitForBoot(int timeoutMilliSeconds = int.MaxValue, CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Initializes ans starts the application (async)
/// </summary>
/// <returns></returns>
Task Boot(CancellationToken cancellationToken = default);
Task Boot(CancellationToken cancellationToken = default(CancellationToken));

IDisposable BeginScope(IIdentity identity = null, TenantId tenantId = null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ public void Subscribe<TEvent>(IIntegrationEventHandler<TEvent> handler)
protected abstract void Subscribe(string eventName);
protected abstract void Unsubscribe(string eventName);

protected virtual async Task ProcessAsync(string eventName, EventProcessingContext context)
protected virtual void Process(string eventName, EventProcessingContext context)
{
Logger.Info($"Processing a {eventName} event");
if (_subscriptions.TryGetValue(eventName, out List<ISubscription> subscriptions))
{
foreach (var subscription in subscriptions)
{
await _application.InvokeAsync(
() => Task.Factory.StartNew(() => subscription.Process(eventName, context)),
_application.Invoke(
() => subscription.Process(eventName, context),
new SystemIdentity(),
context.TenantId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ public override void Connect()

public override Task Publish(IIntegrationEvent integrationEvent)
{
#pragma warning disable 4014
// Processing is done on the thread pool and not being awaited. This emulates best the behavior of a real
// event bus, that incorporates network transfer and another system handling the event
ProcessAsync(integrationEvent.GetType().FullName, new InMemoryProcessingContext(integrationEvent));
#pragma warning restore 4014
Task.Run(() => Process(integrationEvent.GetType().FullName, new InMemoryProcessingContext(integrationEvent)));

// the returning Task is about publishing the event, not processing!
return Task.CompletedTask;
}

Expand Down
4 changes: 2 additions & 2 deletions src/implementations/Backend.Fx.RabbitMq/RabbitMQEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public override void Connect()
}
}

private async void ChannelOnMessageReceived(object sender, BasicDeliverEventArgs args)
private void ChannelOnMessageReceived(object sender, BasicDeliverEventArgs args)
{
Logger.Debug($"RabbitMQ message with routing key {args.RoutingKey} received");
await ProcessAsync(args.RoutingKey, new RabbitMqEventProcessingContext(args.Body));
Process(args.RoutingKey, new RabbitMqEventProcessingContext(args.Body));
}

public override Task Publish(IIntegrationEvent integrationEvent)
Expand Down
4 changes: 2 additions & 2 deletions src/implementations/Backend.Fx.SqlServer/MsSqlServerUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ public bool WaitUntilAvailable(int retries, Func<int, TimeSpan> sleepDurationPro
});
}

public async Task<bool> WaitUntilAvailableAsync(int retries, Func<int, TimeSpan> sleepDurationProvider, CancellationToken cancellationToken = default)
public async Task<bool> WaitUntilAvailableAsync(int retries, Func<int, TimeSpan> sleepDurationProvider, CancellationToken cancellationToken = default(CancellationToken))
{
Logger.Info($"Probing for SQL instance with {retries} retries.");
SqlConnectionStringBuilder sb = new SqlConnectionStringBuilder(ConnectionString) { InitialCatalog = "master" };
var sb = new SqlConnectionStringBuilder(ConnectionString) { InitialCatalog = "master" };
return await Policy
.HandleResult<bool>(result => result == false)
.WaitAndRetryAsync(retries, sleepDurationProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public SerializingEventBus(IBackendFxApplication application)
public override void Connect()
{ }

public override async Task Publish(IIntegrationEvent integrationEvent)
public override Task Publish(IIntegrationEvent integrationEvent)
{
var jsonString = JsonConvert.SerializeObject(integrationEvent);
await ProcessAsync(integrationEvent.GetType().FullName, new SerializingProcessingContext(jsonString));
return Task.Run(()=>Process(integrationEvent.GetType().FullName, new SerializingProcessingContext(jsonString)));
}

protected override void Subscribe(string eventName)
Expand Down

0 comments on commit a5e4f3e

Please sign in to comment.