Skip to content

Commit

Permalink
Merge pull request #42 from ojdev/dev
Browse files Browse the repository at this point in the history
增加死信队列的支持
  • Loading branch information
ojdev committed Dec 11, 2019
2 parents 7e94107 + acc2151 commit ba2de73
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp3.0</TargetFramework>
<TargetFramework>netcoreapp3.1</TargetFramework>
<AspNetCoreHostingModel>InProcess</AspNetCoreHostingModel>
<IncludeOpenAPIAnalyzers>true</IncludeOpenAPIAnalyzers>
</PropertyGroup>
Expand Down
6 changes: 6 additions & 0 deletions RabbitMQ.EventBus.AspNetCore.Sample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public void ConfigureServices(IServiceCollection services)
eventBusOption.ClientProvidedAssembly(assemblyName);
eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
eventBusOption.RetryOnFailure(TimeSpan.FromSeconds(1));
eventBusOption.MessageTTL(2000);
eventBusOption.DeadLetterExchangeConfig(config =>
{
config.Enabled = true;
config.ExchangeNameSuffix = "-test";
});
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
namespace RabbitMQ.EventBus.AspNetCore.Configurations
{
/// <summary>
/// 死信交换机
/// </summary>
public class DeadLetterExchangeConfig
{
/// <summary>
/// 是否开启(默认开启)
/// </summary>
public bool Enabled { set; get; }
/// <summary>
/// 交换机名前缀(默认为"dead-")
/// </summary>
public string ExchangeNamePrefix { set; get; }
/// <summary>
/// 交换机名后缀
/// </summary>
public string ExchangeNameSuffix { set; get; }
/// <summary>
/// 自定义交换机名(留空则使用原有的交换机名)
/// </summary>
public string CustomizeExchangeName { set; get; }
/// <summary>
///
/// </summary>
/// <param name="enabled">是否开启(默认开启)</param>
/// <param name="exchangeNamePrefix">交换机名前缀(默认为"dead-")</param>
/// <param name="exchangeNameSuffix">交换机名后缀</param>
/// <param name="customizeExchangeName">自定义交换机名(留空则使用原有的交换机名)</param>
public DeadLetterExchangeConfig(bool enabled = true, string exchangeNamePrefix = "dead-", string exchangeNameSuffix = null, string customizeExchangeName = null)
{
Enabled = enabled;
ExchangeNameSuffix = exchangeNameSuffix;
ExchangeNamePrefix = exchangeNamePrefix;
CustomizeExchangeName = customizeExchangeName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace RabbitMQ.EventBus.AspNetCore.Configurations
/// </summary>
public sealed class RabbitMQEventBusConnectionConfiguration
{
private int? messageTTL;

/// <summary>
///
/// </summary>
Expand Down Expand Up @@ -37,6 +39,25 @@ public sealed class RabbitMQEventBusConnectionConfiguration
/// </summary>
public QueuePrefixType Prefix { get; set; }
/// <summary>
/// 死信交换机设置
/// </summary>
public DeadLetterExchangeConfig DeadLetterExchange { set; get; }
/// <summary>
/// 消息驻留时长(毫秒),超过此市场的则判断为死信
/// </summary>
public int? MessageTTL
{
get => messageTTL;
set
{
if (value < 0)
{
throw new ArgumentOutOfRangeException($"{nameof(MessageTTL)}必须大于0");
}
messageTTL = value;
}
}
/// <summary>
///
/// </summary>
public RabbitMQEventBusConnectionConfiguration()
Expand All @@ -46,6 +67,7 @@ public RabbitMQEventBusConnectionConfiguration()
NetworkRecoveryInterval = TimeSpan.FromSeconds(5);
AutomaticRecoveryEnabled = true;
ConsumerFailRetryInterval = TimeSpan.FromSeconds(1);
DeadLetterExchange = new DeadLetterExchangeConfig();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,26 @@ public void QueuePrefix(QueuePrefixType queuePrefix = QueuePrefixType.ClientProv
{
Configuration.Prefix = queuePrefix;
}
/// <summary>
/// 设置消息的驻留时常(毫秒)
/// 如果开启了死信队列设置则默认为60000毫秒
/// </summary>
/// <param name="millisecond"></param>
public void MessageTTL(int millisecond)
{
Configuration.MessageTTL = millisecond;
}
/// <summary>
/// 死信队列设置
/// </summary>
/// <param name="config"></param>
public void DeadLetterExchangeConfig(Action<DeadLetterExchangeConfig> config)
{
config?.Invoke(Configuration.DeadLetterExchange);
if (Configuration.DeadLetterExchange.Enabled && Configuration.MessageTTL == null)
{
Configuration.MessageTTL = 60000;
}
}
}
}
34 changes: 33 additions & 1 deletion src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using RabbitMQ.EventBus.AspNetCore.Factories;
using RabbitMQ.EventBus.AspNetCore.Modules;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

Expand Down Expand Up @@ -85,6 +86,33 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic)
}
IModel channel;
#region snippet
var arguments = new Dictionary<string, object>();

#region 死信队列设置
if (_persistentConnection.Configuration.DeadLetterExchange.Enabled)
{
string deadExchangeName = $"{_persistentConnection.Configuration.DeadLetterExchange.ExchangeNamePrefix}{_persistentConnection.Configuration.DeadLetterExchange.CustomizeExchangeName ?? attr.Exchange}{_persistentConnection.Configuration.DeadLetterExchange.ExchangeNameSuffix}";
string deadQueueName = $"{_persistentConnection.Configuration.DeadLetterExchange.ExchangeNamePrefix}{queue}{_persistentConnection.Configuration.DeadLetterExchange.ExchangeNameSuffix}";
IModel dlxChannel;
try
{
dlxChannel = _persistentConnection.ExchangeDeclare(exchange: deadExchangeName, type: type);
dlxChannel.QueueDeclarePassive(deadQueueName);
}
catch
{
dlxChannel = _persistentConnection.ExchangeDeclare(exchange: deadExchangeName, type: type);
dlxChannel.QueueDeclare(queue: deadQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
}
dlxChannel.QueueBind(deadQueueName, deadExchangeName, attr.RoutingKey, null);
arguments.Add("x-dead-letter-exchange", deadExchangeName);
}
#endregion

try
{
channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
Expand All @@ -93,11 +121,15 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic)
catch
{
channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
if (_persistentConnection.Configuration.MessageTTL != null && _persistentConnection.Configuration.MessageTTL > 0)
{
arguments.Add("x-message-ttl", _persistentConnection.Configuration.MessageTTL);
}
channel.QueueDeclare(queue: queue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
arguments: arguments);
}
#endregion
channel.QueueBind(queue, attr.Exchange, attr.RoutingKey, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void Dispose()

public bool TryConnect()
{
_logger.WriteLog(Configuration.Level, "RabbitMQ Client is trying to connect");
_logger.WriteLog(LogLevel.Information, "RabbitMQ Client is trying to connect");
lock (sync_root)
{
RetryPolicy policy = RetryPolicy.Handle<SocketException>()
Expand All @@ -78,7 +78,7 @@ public bool TryConnect()
policy.Execute(() =>
{
string connectionString = _connectionAction.Invoke();
_logger.WriteLog(Configuration.Level, $"[ConnectionString]:\t{connectionString}");
_logger.WriteLog(LogLevel.Information, $"[ConnectionString]:\t{connectionString}");
_connectionFactory.Uri = new Uri(connectionString);
_connection = _connectionFactory.CreateConnection(clientProvidedName: Configuration.ClientProvidedName);
});
Expand All @@ -88,7 +88,7 @@ public bool TryConnect()
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
_logger.WriteLog(Configuration.Level, $"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
_logger.WriteLog(LogLevel.Information, $"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.0.0" />
<PackageReference Include="Polly" Version="7.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Polly" Version="7.2.0" />
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
<PackageReference Include="System.Text.Json" Version="4.6.0" />
<PackageReference Include="System.Text.Json" Version="4.7.0" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit ba2de73

Please sign in to comment.