/
NSBConfigure.cs
172 lines (136 loc) · 8 KB
/
NSBConfigure.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
using Aggregates.Contracts;
using Aggregates.Exceptions;
using Aggregates.Extensions;
using Aggregates.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using NServiceBus;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Extensions.Logging;
using System;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using System.Threading.Tasks;
namespace Aggregates
{
[ExcludeFromCodeCoverage]
public static class NSBConfigure
{
public static Settings NServiceBus(this Settings config, EndpointConfiguration endpointConfig)
{
IStartableEndpointWithExternallyManagedContainer startableEndpoint = null;
IEndpointInstance instance = null;
{
var settings = endpointConfig.GetSettings();
var conventions = endpointConfig.Conventions();
settings.Set(NSBDefaults.AggregatesSettings, config);
settings.Set(NSBDefaults.AggregatesConfiguration, config.Configuration);
// set the configured endpoint name to the one NSB config was constructed with
config.SetEndpointName(settings.Get<string>("NServiceBus.Routing.EndpointName"));
conventions.DefiningCommandsAs(type => typeof(Messages.ICommand).IsAssignableFrom(type));
conventions.DefiningEventsAs(type => typeof(Messages.IEvent).IsAssignableFrom(type));
conventions.DefiningMessagesAs(type => typeof(Messages.IMessage).IsAssignableFrom(type));
endpointConfig.AssemblyScanner().ScanAppDomainAssemblies = true;
endpointConfig.AssemblyScanner().ThrowExceptions = false;
endpointConfig.EnableCallbacks();
endpointConfig.EnableInstallers();
endpointConfig.UseSerialization<Internal.AggregatesSerializer>();
endpointConfig.EnableFeature<Feature>();
}
Settings.RegistrationTasks.Add((container, settings) =>
{
container.AddTransient<IEventMapper, EventMapper>();
// Replacing the possible existing Aggregates.UnitOfWork
container.RemoveAll<UnitOfWork.IDomainUnitOfWork>();
container.Add(ServiceDescriptor.Scoped<UnitOfWork.IDomainUnitOfWork, NSBUnitOfWork>());
// Carry over important NSB headers
// The headers this sets should be set via other means
//container.AddTransient<Func<IMutate>>(_ => () => new NSBMutator());
container.AddTransient<IEventFactory, EventFactory>();
container.AddTransient<Contracts.IMessageDispatcher, Dispatcher>();
container.AddTransient<IMessaging, NServiceBusMessaging>();
container.AddTransient<IMessageSession>((_) =>
{
// If the app doesn't wait for NSB to start before asking for IMessageSession
// session will be null, they should wait
// see examples (Hello World/Client/Program.cs) for how to wait
if (instance == null)
throw new Exception("NServiceBus has not started yet");
return instance;
});
var nsbSettings = endpointConfig.GetSettings();
nsbSettings.Set("SlowAlertThreshold", config.SlowAlertThreshold);
nsbSettings.Set("CommandDestination", config.CommandDestination);
endpointConfig.MakeInstanceUniquelyAddressable(settings.UniqueAddress);
// Callbacks need 1 slot so minimum is 2
//endpointConfig.LimitMessageProcessingConcurrencyTo(2);
// NSB doesn't have an endpoint name setter other than the constructor, hack it in
nsbSettings.Set("NServiceBus.Routing.EndpointName", settings.Endpoint);
var recoverability = endpointConfig.Recoverability();
// Callbacks that are too late - discard instead of moving to error queue
recoverability.CustomPolicy((config, context) =>
{
if (context.Exception is InvalidOperationException invalidOperationException &&
invalidOperationException.Message.StartsWith("No handlers could be found", StringComparison.OrdinalIgnoreCase))
{
return RecoverabilityAction.Discard("Callback no longer active");
}
return DefaultRecoverabilityPolicy.Invoke(config, context);
});
recoverability.Failed(recovery =>
{
recovery.OnMessageSentToErrorQueue((message, token) =>
{
var loggerFactory = settings.Configuration.ServiceProvider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger("Recoverability");
var ex = message.Exception;
var messageText = Encoding.UTF8.GetString(message.Body.Span).MaxLines(10);
logger.ErrorEvent("Fault", ex, "[{MessageId:l}] has failed and being sent to error queue [{ErrorQueue}]: {ExceptionType} - {ExceptionMessage}\n{@Body}",
message.MessageId, message.ErrorQueue, ex.GetType().Name, ex.Message, messageText);
return Task.CompletedTask;
});
});
// business exceptions are permentant and shouldnt be retried
recoverability.AddUnrecoverableException<BusinessException>();
recoverability.AddUnrecoverableException<SagaWasAborted>();
recoverability.AddUnrecoverableException<SagaAbortionFailureException>();
// we dont need immediate retries
recoverability.Immediate(recovery => recovery.NumberOfRetries(0));
recoverability.Delayed(recovery =>
{
recovery.TimeIncrease(TimeSpan.FromSeconds(2));
recovery.NumberOfRetries(config.Retries);
recovery.OnMessageBeingRetried((message, token) =>
{
var loggerFactory = settings.Configuration.ServiceProvider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger("Recoverability");
var level = LogLevel.Information;
if (message.RetryAttempt > (config.Retries / 2))
level = LogLevel.Warning;
var ex = message.Exception;
var messageText = Encoding.UTF8.GetString(message.Body.Span).MaxLines(10);
logger.LogEvent(level, "Catch", ex, "[{MessageId:l}] has failed and will retry {Attempts} more times: {ExceptionType} - {ExceptionMessage}\n{@Body}", message.MessageId,
config.Retries - message.RetryAttempt, ex?.GetType().Name, ex?.Message, messageText);
return Task.CompletedTask;
});
});
// todo: not sure this is needed anymore since NSb uses microsoft too now
startableEndpoint = EndpointWithExternallyManagedContainer.Create(endpointConfig, container);
return Task.CompletedTask;
});
Settings.BusTasks.Add(async (container, settings) =>
{
var logFactory = container.GetService<ILoggerFactory>();
if (logFactory != null)
global::NServiceBus.Logging.LogManager.UseFactory(new ExtensionsLoggerFactory(logFactory));
instance = await startableEndpoint.Start(container).ConfigureAwait(false);
});
Settings.ShutdownTasks.Add((container, settings) =>
{
return instance.Stop();
});
return config;
}
}
}