Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Middleware for Handlers #809

Merged
merged 44 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7c1eed3
- Add handler context and dispatcher middleware base
gkinsman Sep 24, 2020
a228797
- Most of the pipeline done, just need to work out dynamically callin…
gkinsman Oct 17, 2020
4f3ae1c
- Implement new middleware pipeline for handling messages
gkinsman Oct 19, 2020
e4c29f6
- Implements a middleware layer between the dispatcher and message ha…
gkinsman Oct 20, 2020
7ee01f4
- Fix exactly once tests
gkinsman Oct 20, 2020
0204f4f
- Undo move refactor
gkinsman Oct 20, 2020
c85fe9c
- Add middleware test to show usage in various scenarios
gkinsman Oct 20, 2020
5a2d391
- Remove generic lookup and use existing type parameter
gkinsman Oct 20, 2020
623cb35
- Fix build
gkinsman Oct 20, 2020
a9b73ed
Update src/JustSaying/AwsTools/MessageHandling/Dispatch/MiddlewareMap.cs
gkinsman Oct 22, 2020
025d9ea
Update src/JustSaying/Fluent/QueueSubscriptionBuilder`1.cs
gkinsman Oct 22, 2020
866ba1d
Update src/JustSaying/Fluent/SqsReadConfigurationBuilder.cs
gkinsman Oct 22, 2020
6a641c0
Update src/JustSaying/JustSayingBus.cs
gkinsman Oct 22, 2020
cabe1d7
Update src/JustSaying/Messaging/Channels/Dispatch/MultiplexerSubscrib…
gkinsman Oct 22, 2020
45996fd
Update src/JustSaying/Messaging/Middleware/ExactlyOnce/ExactlyOnceMid…
gkinsman Oct 22, 2020
d22002f
Update src/JustSaying/Messaging/Middleware/ExactlyOnce/ExactlyOnceMid…
gkinsman Oct 22, 2020
2c23cb2
Update src/JustSaying/Messaging/Middleware/ExactlyOnce/ExactlyOnceMid…
gkinsman Oct 22, 2020
cbddbab
Update src/JustSaying/Messaging/Middleware/Handle/DelegateHandleMiddl…
gkinsman Oct 22, 2020
fa0a53f
Update src/JustSaying/Messaging/Middleware/Handle/HandlerMiddlewareBu…
gkinsman Oct 22, 2020
2b9f086
- Move MessageAs to an extension method to keep the mesage context clean
gkinsman Oct 22, 2020
440c816
Update src/JustSaying/Messaging/Middleware/Handle/HandlerMiddlewareBu…
gkinsman Oct 22, 2020
1a5f2e7
- Dispatcher shouldn't catch OperationCanceledExceptions thrown by mi…
gkinsman Oct 22, 2020
ab8cb98
- Removes the MessageLock API from ServicesBuilder in favour of injec…
gkinsman Oct 22, 2020
45b4891
- Remove usage of ServicesBuilder from ISubscriptionBuilder
gkinsman Oct 22, 2020
4ad9f3f
- Remove MessageHandlerWrapper, and implement stopwatch handler as mi…
gkinsman Nov 12, 2020
fe83b00
- Remove some extra spaces
gkinsman Nov 12, 2020
d8ca59a
- Log in stopwatch tests so output is visible in test output
gkinsman Nov 12, 2020
6ba06f3
- Update log message to use middleware terminology
gkinsman Nov 12, 2020
c3b1a73
- Rename variable to make more sense
gkinsman Nov 12, 2020
d72faa0
- Remove duplicated middleware configuration API. All config is done …
gkinsman Nov 12, 2020
69dcdb4
- Remove unused type
gkinsman Nov 12, 2020
76bbf9e
- Remove remaining ExactlyOnce bits
gkinsman Nov 13, 2020
ce39495
- Implement custom interrogation for middlewares
gkinsman Nov 13, 2020
9183d18
- Remove unnecessary test handler
gkinsman Nov 13, 2020
5fdf08f
- Remove .received file
gkinsman Nov 13, 2020
6b9cab2
- Internalise some things we dont need to be public
gkinsman Nov 17, 2020
c2b098a
- ToList a projection to ensure its materialised
gkinsman Nov 18, 2020
2a73fd8
- Move middlewares into common JustSaying.Messaging.Middleware namesp…
gkinsman Nov 18, 2020
1188922
- Swap order of startup so publisher is initialised first
gkinsman Dec 7, 2020
0eed837
- Add more xml docs
gkinsman Dec 7, 2020
629105a
Update src/JustSaying/Messaging/Middleware/Handle/HandlerMiddlewareBu…
gkinsman Dec 7, 2020
634e4ed
- Remove unnecessary explicit folders from csproj
gkinsman Dec 7, 2020
20f9b19
Update src/JustSaying/Messaging/Middleware/ExactlyOnce/ExactlyOnceHan…
gkinsman Dec 8, 2020
2ad8bb0
- Switch to explicit resharper rule exceptions instead of dotsettings…
gkinsman Dec 8, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Ordering API subscriber running");

await _bus.StartAsync(stoppingToken);
await _publisher.StartAsync(stoppingToken);
await _bus.StartAsync(stoppingToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public IHandlerAsync<T> ResolveHandler<T>(HandlerResolutionContext context)
}

/// <inheritdoc />
public T ResolveService<T>()
=> ServiceProvider.GetRequiredService<T>();
public T ResolveService<T>() => ServiceProvider.GetRequiredService<T>();
}
}
122 changes: 0 additions & 122 deletions src/JustSaying/AwsTools/MessageHandling/Dispatch/HandlerMap.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.Threading;
using System.Threading.Tasks;
using JustSaying.Messaging.Channels.Context;
using JustSaying.Messaging.Interrogation;

namespace JustSaying.AwsTools.MessageHandling.Dispatch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageProcessingStrategies;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware;
using JustSaying.Messaging.Monitoring;
using Microsoft.Extensions.Logging;
using Message = JustSaying.Models.Message;
Expand All @@ -21,7 +22,7 @@ public class MessageDispatcher : IMessageDispatcher
{
private readonly IMessageSerializationRegister _serializationRegister;
private readonly IMessageMonitor _messagingMonitor;
private readonly HandlerMap _handlerMap;
private readonly MiddlewareMap _middlewareMap;
private readonly IMessageBackoffStrategy _messageBackoffStrategy;
private readonly IMessageContextAccessor _messageContextAccessor;

Expand All @@ -30,14 +31,14 @@ public class MessageDispatcher : IMessageDispatcher
public MessageDispatcher(
IMessageSerializationRegister serializationRegister,
IMessageMonitor messagingMonitor,
HandlerMap handlerMap,
MiddlewareMap middlewareMap,
ILoggerFactory loggerFactory,
IMessageBackoffStrategy messageBackoffStrategy,
IMessageContextAccessor messageContextAccessor)
{
_serializationRegister = serializationRegister;
_messagingMonitor = messagingMonitor;
_handlerMap = handlerMap;
_middlewareMap = middlewareMap;
_logger = loggerFactory.CreateLogger("JustSaying");
_messageBackoffStrategy = messageBackoffStrategy;
_messageContextAccessor = messageContextAccessor;
Expand Down Expand Up @@ -70,7 +71,7 @@ public class MessageDispatcher : IMessageDispatcher
_messageContextAccessor.MessageContext =
new MessageContext(messageContext.Message, messageContext.QueueUri, attributes);

handlingSucceeded = await CallMessageHandler(messageContext.QueueName, typedMessage)
handlingSucceeded = await CallMessageHandler(messageContext.QueueName, typedMessage, cancellationToken)
.ConfigureAwait(false);
}

Expand All @@ -79,8 +80,9 @@ public class MessageDispatcher : IMessageDispatcher
await messageContext.DeleteMessageFromQueueAsync(cancellationToken).ConfigureAwait(false);
}
}

#pragma warning disable CA1031
catch (Exception ex)
catch (Exception ex) when(!(ex is OperationCanceledException))
#pragma warning restore CA1031
{
_logger.LogError(
Expand Down Expand Up @@ -157,36 +159,38 @@ public class MessageDispatcher : IMessageDispatcher
}
}

private async Task<bool> CallMessageHandler(string queueName, Message message)
private async Task<bool> CallMessageHandler(string queueName, Message message, CancellationToken cancellationToken)
{
var messageType = message.GetType();

var handler = _handlerMap.Get(queueName, messageType);
var middleware = _middlewareMap.Get(queueName, messageType);

if (handler == null)
if (middleware == null)
{
_logger.LogError(
"Failed to dispatch. Handler for message of type '{MessageTypeName}' not found in handler map.",
"Failed to dispatch. Middleware for message of type '{MessageTypeName}' not found in middleware map.",
message.GetType().FullName);
return false;
}

var watch = System.Diagnostics.Stopwatch.StartNew();

using (_messagingMonitor.MeasureHandler())
using (_messagingMonitor.MeasureDispatch())
{
bool handlerSucceeded = false;
bool dispatchSuccessful = false;
try
{
handlerSucceeded = await handler(message).ConfigureAwait(false);
var context = new HandleMessageContext(message, messageType, queueName);
dispatchSuccessful = await middleware.RunAsync(context, null, cancellationToken)
.ConfigureAwait(false);
}
finally
{
watch.Stop();

var logMessage =
"{Status} handling message with Id '{MessageId}' of type {MessageType} in {TimeToHandle}ms.";
if (handlerSucceeded)
if (dispatchSuccessful)
{
_logger.LogInformation(logMessage,
"Succeeded",
Expand All @@ -204,7 +208,7 @@ private async Task<bool> CallMessageHandler(string queueName, Message message)
}
}

return handlerSucceeded;
return dispatchSuccessful;
}
}

Expand Down

This file was deleted.

Loading