Skip to content

Simple wrapper for RabbitMQ Client for use in DotNetCore projects.

License

Notifications You must be signed in to change notification settings

blckmn/SimpleRabbit

Repository files navigation

SimpleRabbit

An easy wrapper for the RabbitMQ client that allows inclusion in DotNetCore projects. Can be used with the standard dependency injection provided in DotNetCore.

Getting started

Package Install

There are two packages on Nuget.

  1. SimpleRabbit.Netcore <- has the basics to be able to publish - and listen to queues where not hosting.
  2. SimpleRabbit.Netcore.Publish <- has extensions to allow for easier injection in DotNetCore and a factory allowing for multi-cluster rabbit publishing
  3. SimpleRabbit.Netcore.Service <- has the IHostedService option for running as a host to listen to queues.

Installing is as easy as: dotnet add package SimpleRabbit.NetCore or Install-Package SimpleRabbit.NetCore depending on your setup.

Component Diagram

Below is a C4 Component diagram of the Simple Rabbit library:

C4Component
title Component diagram for Simple Rabbit

Container(rabbitmq, "RabbitMQ", "RabbitMQ Server", "Message broker system")

Container_Boundary(simpleRabbit, "SimpleRabbit.NetCore") {
    Component(publishService, "IPublishService", "Interface", "Enables message publishing to exchanges")
    Component(basicRabbitService, "IBasicRabbitService", "Interface", "Wrapper over the Rabbit connection factory.<br /> Provides the basics to interact with a rabbit instance")
    Component(queueService, "IQueueService", "Interface", "Enables queue subscriptions and<br /> the handling of messages")
    Component(iMessageHandler, "IMessageHandler", "Interface", "Contract for handlers to process<br /> messages based on consumer tag")
    Component(asyncMessageHandler, "AsyncMessageHandler", "Class", "Enables async message processing<br /> via a queue of process tasks")
    
    Rel(publishService, basicRabbitService, "Extends")
    Rel(queueService, basicRabbitService, "Extends")
    Rel(asyncMessageHandler, iMessageHandler, "Implements")
}

Container_Boundary(simpleRabbitPublish, "SimpleRabbit.NetCore.Publish", "Provides a Publishing factory that can be used to have multiple rabbit configurations") {
    Component(publisherFactory, "PublisherFactory", "Class", "Factory to handle IPublishService instances<br /> for each rabbit configuration")

    Rel(publisherFactory, publishService, "Uses")
}

Container_Boundary(simpleRabbitService, "SimpleRabbit.NetCore.Service", "Provides a Publishing factory that can be used to have multiple rabbit configurations") {
    Component(queueFactory, "QueueFactory", "Class", "Manages IMessageHandlers created by clients")

    Rel(queueFactory, iMessageHandler, "Uses")
    Rel(queueFactory, queueService, "Uses")
}

Rel(queueService, rabbitmq, "Uses")
Rel(publishService, rabbitmq, "Uses")
Rel(basicRabbitService, rabbitmq, "Uses")

UpdateLayoutConfig($c4ShapeInRow="3", $c4BoundaryInRow="1")

Contributing

Please fork this repo then create a PR from the fork into the original one.

Usage

Publishing

sample project

private static void Main()
{
    var configuration = new ConfigurationBuilder()
        .AddJsonFile("appsettings.json", true)
        .Build();

    var services = new ServiceCollection();
    services
        .AddPublisherServices()
        .AddRabbitConfiguration(configuration.GetSection("RabbitConfiguration"))

    var provider = services.BuildServiceProvider();

    var publisher = provider.GetService<IPublishService>();
    publisher.Publish(exchange: "ExchangeName", body: "Test body");
}

The corresponding appsettings.json file (to provide connectivity to rabbit):

{
    "RabbitConfiguration": {
        "Username": "username",
        "Password": "password",
        "VirtualHost": "/",
        "Hostnames": [
            "host1",
            "host2"
        ]
    }
}

Subscribing

sample project

public static async Task Main(string[] args)
{
    var builder = new HostBuilder()
        .ConfigureAppConfiguration((hostingContext, config) =>
        {
            config.AddJsonFile("appsettings.json", true);
        })
        .ConfigureServices((context, services) =>
        {
            var config = context.Configuration;
            services
                .AddRabbitConfiguration(config.GetSection("RabbitConfiguration"))
                .AddSubscriberConfiguration(config.GetSection("Subscribers"))
                .AddSubscriberServices();
                .AddSingletonMessageHandler<MessageProcessor>()
        });

    await builder.RunConsoleAsync();
}

The message handler is chosen based on the CanProcess call. The consumer tag is passed in i.e. tags are matched not queues. This allows a handler to handle multiple messages from multiple queues.

internal class MessageProcessor : IMessageHandler
{
    public bool CanProcess(string tag)
    {
        /* validate whether this handler will handle this tag */
        return true;
    }

    public async Task<Acknowledgement> Process(BasicMessage message)
    {
        var body = message.Body;

        if (string.IsNullOrWhiteSpace(body))
            Console.WriteLine($"Message contents: {body}");
        else
            Console.WriteLine($"Empty message: {message.MessageId}")
           
        // See the Acknowledgement enum for more options
        return Acknowledgement.Ack;
    }
}

Subscribers are a list (of queues to consume), and they are auto wired up to the queue and are eventing based. The ConsumerTag is the tag passed to CanProcess(string tag)

{
    "RabbitConfiguration": {
        "Username": "username",
        "Password": "password",
        "VirtualHost": "/",
        "Hostnames": [
            "host1",
            "host2"
        ]
    },
    "Subscribers": [
        {
            "ConsumerTag": "TestTagName",
            "QueueName": "Test"
        }
    ]
}

Multiple Rabbit clusters

Multi-Publishing Multi-Subscribing

There is additional factories that can make use of multiple configurations. The setup for the publisher factory is as follows

public class Program
{
    public static void Main(string[] args)
    {
        var configuration = new ConfigurationBuilder()
            .AddJsonFile("appsettings.json", true)
            .Build();

        PublishService PublisherFactory(IServiceProvider service, string name)
        {
            var p = service.GetService<PublishService>();
            p.ConfigurationName = name;
            return p;
        }

        services
            .AddRabbitConfiguration("name1", configuration.GetSection("RabbitConfigurations:Configuration1"))
            .AddRabbitConfiguration("name2", configuration.GetSection("RabbitConfigurations:Configuration2"))
            .AddPublisherFactory();

        var provider = services.BuildServiceProvider();

        //Publishing is based on the configuration name provided
        var publisher = provider.GetService<List<IPublishService>>().FirstOrDefault(p => p.ConfigurationName.Equals("name1"));
        publisher?.Publish(exchange: "ExchangeName", body: "Test body");
    }
}

The appsettings.json file will look something like this:

{
  "RabbitConfiguration": {
    "Configuration1": {
        "Username": "username",
        "Password": "password",
        "VirtualHost": "/",
        "Hostnames": [
            "host1",
            "host2"
        ]
    },
    "Configuration2": {
        "Username": "username",
        "Password": "password",
        "VirtualHost": "/",
        "Hostnames": [
            "host1",
            "host2"
        ]
    }
  }
}

Note: the multiple configuration setup uses IOptionMonitor, which will listen to changes, invalidating a connection if the configuration is changed in runtime.

Multi-Subscriber

Multiple cluster subscribing only requires additional registrations of configurations. This will also invalidate and respawn connections on configuration update like publishing.

 services
    .AddRabbitConfiguration("name",config.GetSection("RabbitConfiguration"))
    .AddSubscriberConfiguration("name",config.GetSection("Subscribers"))

Note : The names of the configuration will be used to match the list of queues and rabbit configuration together.

Local Rabbit

Within the RabbitMQ server folder there contains a docker build of a pre-configured rabbit server, able to run the example projects.

Within that directory, run the following bash command to start the server:

docker build -t localrabbit . && 
docker run -p 5672:5672 -p 15672:15672 localrabbit

About

Simple wrapper for RabbitMQ Client for use in DotNetCore projects.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages