Skip to content

Commit

Permalink
[Host.RabbitMQ] RabbitMQ Implementation #67
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed May 14, 2023
1 parent f2974a3 commit e38499d
Show file tree
Hide file tree
Showing 69 changed files with 2,271 additions and 141 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ jobs:
kafka_password: ${{ secrets.kafka_password }}
redis_password: ${{ secrets.redis_password }}
mqtt_password: ${{ secrets.mqtt_password }}
rabbitmq_password: ${{ secrets.rabbitmq_password }}
- name: Upload test results
uses: actions/upload-artifact@v3
with:
Expand Down
21 changes: 12 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i

- [Introduction](docs/intro.md)
- Providers:
- [Apache Kafka](docs/provider_kafka.md)
- [Azure ServiceBus](docs/provider_azure_servicebus.md)
- [Azure EventHubs](docs/provider_azure_eventhubs.md)
- [Redis](docs/provider_redis.md)
- [Memory](docs/provider_memory.md)
- [Mqtt](docs/provider_mqtt.md)
- [Azure ServiceBus](docs/provider_azure_servicebus.md)
- [Apache Kafka](docs/provider_kafka.md)
- [Hybrid](docs/provider_hybrid.md)
- [Memory](docs/provider_memory.md)
- [MQTT](docs/provider_mqtt.md)
- [RabbitMQ](docs/provider_rabbitmq.md)
- [Redis](docs/provider_redis.md)
- Plugins:
- [Serialization](docs/serialization.md)
- [Transactional Outbox](docs/plugin_outbox.md)
Expand All @@ -61,12 +62,13 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `SlimMessageBus` | The core API for SlimMessageBus | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.svg)](https://www.nuget.org/packages/SlimMessageBus) |
| **Transport providers** | | |
| `.Host.Kafka` | Transport provider for Apache Kafka | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Kafka.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Kafka) |
| `.Host.AzureServiceBus` | Transport provider for Azure Service Bus | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AzureServiceBus.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AzureServiceBus) |
| `.Host.AzureEventHub` | Transport provider for Azure Event Hubs | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AzureEventHub.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AzureEventHub) |
| `.Host.Redis` | Transport provider for Redis | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Redis.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Redis) |
| `.Host.AzureServiceBus` | Transport provider for Azure Service Bus | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AzureServiceBus.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AzureServiceBus) |
| `.Host.Kafka` | Transport provider for Apache Kafka | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Kafka.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Kafka) |
| `.Host.Memory` | Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Memory.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Memory) |
| `.Host.Mqtt` | Transport provider for MQTT | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Mqtt.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Mqtt) |
| `.Host.MQTT` | Transport provider for MQTT | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.MQTT.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.MQTT) |
| `.Host.RabbitMQ` | Transport provider for RabbitMQ | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.RabbitMQ.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.RabbitMQ) |
| `.Host.Redis` | Transport provider for Redis | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Redis.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Redis) |
| **Serialization** | | |
| `.Host.Serialization.Json` | Serialization plugin for JSON (Newtonsoft.Json library) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.Json.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Json) |
| `.Host.Serialization.SystemTextJson` | Serialization plugin for JSON (System.Text.Json library) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.SystemTextJson.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.SystemTextJson) |
Expand Down Expand Up @@ -369,5 +371,6 @@ Thanks to the following service cloud providers for providing free instances for
- Redis - [Redis Labs](https://redislabs.com/)
- Kafka - [CloudKarafka](https://www.cloudkarafka.com/)
- MQTT - [HiveMQ](https://www.hivemq.com/)
- RabbitMQ - [CloudAMQP](https://www.cloudamqp.com/)

If you want to help and sponsor, please write to me.
1 change: 1 addition & 0 deletions build/tasks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ $projects = @(
"SlimMessageBus.Host.Memory",
"SlimMessageBus.Host.Redis",
"SlimMessageBus.Host.Mqtt",
"SlimMessageBus.Host.RabbitMQ",

"SlimMessageBus.Host.FluentValidation",

Expand Down
1 change: 1 addition & 0 deletions docs/NuGet.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Transports:
- Azure Event Hub
- Redis
- MQTT / Azure IoT Hub
- RabbitMQ
- In-Memory transport (domain events, mediator)
- Hybrid (composition of the bus out of many transports)

Expand Down
272 changes: 272 additions & 0 deletions docs/provider_rabbitmq.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
# RabbitMQ transport provider for SlimMessageBus <!-- omit in toc -->

Please read the [Introduction](intro.md) before reading this provider documentation.

- [Underlying client](#underlying-client)
- [Concepts](#concepts)
- [Configuration](#configuration)
- [Producers](#producers)
- [Consumers](#consumers)
- [Consumer Error Handling](#consumer-error-handling)
- [Dead Letter Exchange](#dead-letter-exchange)
- [Custom Consumer Error Handler](#custom-consumer-error-handler)
- [Message Headers](#message-headers)
- [Topology Provisioning](#topology-provisioning)
- [Not Supported](#not-supported)

## Underlying client

The [`RabbitMQ`](https://www.nuget.org/packages/SlimMessageBus.Host.RabbitMQ) transport provider uses [RabbitMQ.Client](https://www.nuget.org/packages/RabbitMQ.Client/) client to connect to the RabbitMQ cluster via the AMQP protocol.

## Concepts

The RabbitMQ and AMQP protocol introduce couple of concepts:

- Exchange - entities to which producers send messages,
- Queue - mailboxes which consumers read messages from,
- Binding - are rules that exchanges use to route messages to queues.

[AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html) provides a brilliant overview.

## Configuration

The RabbitMQ transport configuration is arranged via the `.WithProviderRabbitMQ(cfg => {})` method on the message bus builder.

```cs
using SlimMessageBus.Host.RabbitMQ;

// Register SlimMessageBus in MSDI
services.AddSlimMessageBus((mbb) =>
{
// Use RabbitMQ transport provider
mbb.WithProviderRabbitMQ(cfg =>
{
// Connect using AMQP URI
cfg.ConnectionString = configuration["RabbitMQ:ConnectionString"];

// Alternatively, when not using AMQP URI:
// cfg.ConnectionFactory.HostName = "..."
// cfg.ConnectionFactory.VirtualHost = "..."
// cfg.ConnectionFactory.UserName = "..."
// cfg.ConnectionFactory.Password = "..."
// cfg.ConnectionFactory.Ssl.Enabled = true
// Fine tune the underlying RabbitMQ.Client:
// cfg.ConnectionFactory.ClientProvidedName = $"MyService_{Environment.MachineName}";
});

mbb.AddServicesFromAssemblyContaining<PingConsumer>();
mbb.AddJsonSerializer();
});
```

The relevant elements of the `cfg`:

- The `ConnectionString` allows to set the AMQP URI.
This property is a convenience wrapper on top of `ConnectionFactory.Uri` from the underlying client library.
The URI has the following form: `amqps://<username>:<password>@<host>/<virtual-host>`.
- The `ConnectionFactory` allows to access other client settings. It can be used to setup other connection details in case the AMQP URI cannot be used or there is a need to fine tune the client.

### Producers

Producers need to declare the exchange name and type the message should be delivered to. SMB will provision the specified exchange.
Additionally, we can specify:

- the modifier that allows to assign message properties (`MessageId`, `ContentType`, and headers),
- the message key provider that is used in routing for relevant exchange types.

```cs
mbb.Produce<OrderEvent>(x => x
// Will declare an orders exchange of type Fanout
.Exchange("orders", exchangeType: ExchangeType.Fanout)
// Will use a routing key provider that for a given message will take it's Id field
.RoutingKeyProvider((m, p) => m.Id.ToString())
// Will use
.MessagePropertiesModifier((m, p) =>
{
p.MessageId = GetMessageId(m);
}));
```

We can also set defaults for all producers on the bus level:

```cs
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
// All exchanges declared on producers will be durable by default
cfg.UseExchangeDefaults(durable: true);

// All messages will get the ContentType message property assigned
cfg.UseMessagePropertiesModifier((m, p) =>
{
p.ContentType = MediaTypeNames.Application.Json;
});
});

mbb.AddJsonSerializer();
});
```

### Consumers

Consumers need to specify the queue name from which the consumer should be reading from. SMB will provison the specified queue.
Additionally,

- when the exchange name binding is specified then SMB will provision that binding with the broker,
- when [dead letter exchange](#dead-letter-exchange) is specified then the queue will provisioned with the broker, and if the exchange type is specified it will also be provisioned.

```cs
mbb.Consume<PingMessage>(x => x
// Use the subscriber queue, do not auto delete
.Queue("subscriber", autoDelete: false)
//
.ExchangeBinding("ping")
// The queue declaration in RabbitMQ will have a reference to the dead letter exchange and the DL exchange will be created
.DeadLetterExchange("subscriber-dlq", exchangeType: ExchangeType: Direct)
.WithConsumer<PingConsumer>());
```

We can specify defaults for all consumers on the bus level:

```cs
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
cfg.UseDeadLetterExchangeDefaults(durable: false, autoDelete: false, exchangeType: ExchangeType.Direct, routingKey: string.Empty);
cfg.UseQueueDefaults(durable: false);
});
});
```

### Consumer Error Handling

By default the the transport implementation performs a negative ack (nack) in the AMQP protocol for any message that failed in the consumer. As a result the message will be marked as failed and routed to an dead letter exchange or discarded by the RabbitMQ broker.

The recomendation here is to either:

- configure a [dead letter exchange](#dead-letter-exchange) configured on the consumer queue,
- or provide a [custom error handler](#custom-consumer-error-handler) (retry the message couple of times, if failed send to a dead letter exchange).

#### Dead Letter Exchange

The [Dead Letter Exchanges](https://www.rabbitmq.com/dlx.html) is a feature of RabbitMQ that will forward failed messages from a particular queue to a special exchange.

In SMB on the consumer declaration we can specify which dead letter exchange should be used:

```cs
mbb.Consume<PingMessage>(x => x
.Queue("subscriber", autoDelete: false)
.ExchangeBinding(topic)
// The queue provisioned in RabbitMQ will have a reference to the dead letter exchange
.DeadLetterExchange("subscriber-dlq")
.WithConsumer<PingConsumer>());
```

However, the `subscriber-dlq` will not be created by SMB in the sample. For it to be created the `ExchangeType` has to be specified, so that SMB knows what exchange type should it apply.
It can be specified on the consumer:

```cs
mbb.Consume<PingMessage>(x => x
.Queue("subscriber", autoDelete: false)
.ExchangeBinding(topic)
// The queue provisioned in RabbitMQ will have a reference to the dead letter exchange and the DL exchange will be provisioned
.DeadLetterExchange("subscriber-dlq", exchangeType: ExchangeType: Direct)
.WithConsumer<PingConsumer>());
```

Alternatively, a bus wide default can be specified for all dead letter exchanges:

```cs
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
// All the declared dead letter exchanges on the consumers will be of Direct type
cfg.UseDeadLetterExchangeDefaults(durable: false, autoDelete: false, exchangeType: ExchangeType.Direct, routingKey: string.Empty);
});
});
```

#### Custom Consumer Error Handler

Define a custom consumer error handler implementation of `RabbitMqConsumerErrorHandler<>`:

```cs
public class CustomRabbitMqConsumerErrorHandler<T> : RabbitMqConsumerErrorHandler<T>
{
// Inject needed dependencies via construction
public override async Task<bool> OnHandleError(T message, IConsumerContext consumerContext, Exception exception)
{
// Check if this is consumer context for RabbitMQ
var isRabbitMqContext = consumerContext.GetTransportMessage() != null;
if (isRabbitMqContext)
{
if (exception is TransientException)
{
// Send negative acknowledge but ask the broker to retry
consumerContext.NackWithRequeue();
}
else
{
// Send negative acknowledge (if dead letter setup it will be routed to it)
consumerContext.Nack();
}
// Mark that the errored message was handled
return true;
}
return false;
}
}
```

Then register the implementation in MSDI for all (or specified) message types.

```cs
// Register error handler in MSDI for any message type
services.AddTransient(typeof(RabbitMqConsumerErrorHandler<>), typeof(CustomRabbitMqConsumerErrorHandler<>));
```

> When error handler is not found in the DI or it returns `false` then default error handling will be applied.
## Message Headers

- UseDefaultMessageProperties
- Default behaviour
- Custom Serializer

## Topology Provisioning

SMB automatically creates exchanges from producers, queues, dead letter exchanges and bindings from consumers.

However, if you need to layer on other topology elements (or peform cleanup) this could be achieved with `UseTopologyInitalizer()`:

```cs
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
cfg.UseTopologyInitalizer((channel, applyDefaultTopology) =>
{
// perform some cleanup if needed
channel.QueueDelete("subscriber-0", ifUnused: true, ifEmpty: false);
channel.QueueDelete("subscriber-1", ifUnused: true, ifEmpty: false);
channel.ExchangeDelete("test-ping", ifUnused: true);
channel.ExchangeDelete("subscriber-dlq", ifUnused: true);

// apply default SMB infered topology
applyDefaultTopology();
});
});
});
```

Avoiding the call `applyDefaultTopology()` will suppress the SMB inferred topology creation.
This might be useful in case the SMB inferred topology is not desired or there are other custom needs.

## Not Supported

- The request-response is not yet supported. It will be made available soon.
19 changes: 19 additions & 0 deletions src/Common.NuGet.Properties.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="utf-8"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="Common.Properties.xml" />

<PropertyGroup>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageReadmeFile>NuGet.md</PackageReadmeFile>
<PackageIcon>icon.png</PackageIcon>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\docs\NuGet.md" Pack="true" PackagePath="\"/>
<None Include="..\..\icon.png">
<Pack>True</Pack>
<PackagePath></PackagePath>
</None>
</ItemGroup>

</Project>
11 changes: 0 additions & 11 deletions src/Common.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,5 @@
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<LangVersion>latest</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
<PackageReadmeFile>NuGet.md</PackageReadmeFile>
<PackageIcon>icon.png</PackageIcon>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\docs\NuGet.md" Pack="true" PackagePath="\"/>
<None Include="..\..\icon.png">
<Pack>True</Pack>
<PackagePath></PackagePath>
</None>
</ItemGroup>

</Project>
3 changes: 1 addition & 2 deletions src/Host.Serialization.Properties.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">

<Import Project="Common.Properties.xml" />
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Version>2.0.1</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
</Project>
Loading

0 comments on commit e38499d

Please sign in to comment.