Skip to content

High-performance yet easy to use mediator pattern and in-process message bus implementation in .NET

Notifications You must be signed in to change notification settings

roeibajayo/MediatorCore

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

91 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

MediatorCore

NuGet NuGet

High-performance yet easy to use mediator pattern and in-process message bus implementation in .NET.

πŸš€ Features

βœ” Fast and low memory usage. πŸ”₯

βœ” Meditor pattern implementation.

βœ” Use many queues types as handlers, so you don't need to use external message bus libraries.

βœ” Developers friendly ❀️ Easy to use.

Supported handlers:

  • Request without response IRequestHandler<>
  • Request with response IResponseHandler<,>
  • Notification (parallel execution) INotificationHandler<>
  • Bubbling notification IBubblingNotificationHandler<,>
  • Queue IQueueHandler<> or with options IQueueHandler<,>
  • Stack IStackHandler<> or with options IStackHandler<,>
  • Debounce queue IDebounceQueueHandler<,>
  • Throttling queue IThrottlingQueueHandler<,>
  • Accumulator queue IAccumulatorQueueHandler<,>

Benchmarks MediatorCore (1.3.0) vs MediatR (12.0.1):

Method Mean Error StdDev Allocated
Response_MediatorCore 151.4 ns 68.35 ns 3.75 ns 336 B
Response_MediatR 212.0 ns 926.96 ns 50.81 ns 408 B
ParallelNotification_Simple_MediatorCore 1,680.1 ns 2,093.40 ns 114.75 ns 872 B
ParallelNotification_Simple_MediatR 2,497.9 ns 2,837.47 ns 155.53 ns 872 B
ParallelNotification_LongRunning_MediatorCore 2,884.0 ns 1,366.19 ns 74.89 ns 1136 B
ParallelNotification_LongRunning_MediatR 3,202.3 ns 921.75 ns 50.52 ns 1160 B
InsertToQueue 119.7 ns 99.76 ns 5.47 ns 48 B
InsertToStack 281.0 ns 1,199.47 ns 65.75 ns 64 B

Install & Registering:

Install MediatorCore with NuGet:

Install-Package MediatorCore

Or via the .NET Core command line interface:

dotnet add package MediatorCore

then register the required services easly:

services.AddMediatorCore(); // register all handlers from the calling assembly
// or:
// services.AddMediatorCore<TMarker>(); -> can used multiple times
// services.AddMediatorCore(new Assembly { .. });

Example of creating a Request/Response:

// the response:
public record SimpleResponse(bool Success);

// the request (message):
public record SimpleRequest(int Id) : IResponseMessage<SimpleResponse>;

// the handler:
public class SimpleResponseMessageHandler : IResponseHandler<SimpleRequest, SimpleResponse>
{
    public async Task<SimpleResponse> HandleAsync(SimpleRequest message, CancellationToken cancellationToken)
    {
        var response = new SimpleResponse(true);
        await Task.Delay(200, cancellationToken); // simulate some work
        return response;
    }
}

then call the request:

public class Example
{
    private readonly IPublisher publisher;

    public Example(IPublisher publisher)
    {
        this.publisher = publisher;
    }

    public async Task GetResponseFromHandlerAsync()
    {
        var request = new SimpleRequest(1);
        var response = await publisher.GetResponseAsync(request);
        // do something with the response ...
    }
}

Example of creating a Request without response:

// the message:
public record SimpleRequest(int Id) : IRequestMessage;

// the handler:
public class SimpleRequestMessageHandler : IRequestHandler<SimpleRequest>
{
    public async Task HandleAsync(SimpleRequest message, CancellationToken cancellationToken)
    {
        await Task.Delay(300, cancellationToken); // simulate some work
        throw new Exception("hello world");
    }
 
    public async Task HandleException(SimpleRequest message,
        Exception exception,
        int reties, Func<Task> retry,
        CancellationToken cancellationToken)
    {
        // you can just ignore the exception and continue:
        // return default;

        // handle the exception..

        if (reties == 3)
            return;

        await retry();
    }
}

then call the request:

public class Example
{
    private readonly IPublisher publisher;

    public Example(IPublisher publisher)
    {
        this.publisher = publisher;
    }

    public async Task ExecuteRemoteCodeAsync()
    {
        var message = new SimpleRequest(1);
        await publisher.PublishAsync(message);

        // you can also use this as fire and forget request:
        // publisher.Publish(message);
    }
}

Example of creating a Bubbling notification:

// the message:
public record SharedBubblingNotificationMessage(string Id, bool Bubble) : IBubblingNotificationMessage;

// first handler options:
public class BubblingNotification1Options
    : IBubblingNotificationOptions
{
    public int Sort => 1;
}

// first handler:
public class BubblingNotification1Handler : IBubblingNotificationHandler<SharedBubblingNotificationMessage, BubblingNotification1Options>
{
    public readonly ILogger logger;

    public BubblingNotification1Handler(ILogger logger)
    {
        this.logger = logger;
    }

    public Task<bool> HandleAsync(SharedBubblingNotificationMessage message, CancellationToken cancellationToken)
    {
        logger.LogDebug("BubblingNotification1Message: " + message.Id);
        return Task.FromResult(message.Bubble);
    }
}

// second handler options:
public class BubblingNotification2Options
    : IBubblingNotificationOptions
{
    public int Sort => 2;
}

// second handler:
public class BubblingNotification2Handler : IBubblingNotificationHandler<SharedBubblingNotificationMessage, BubblingNotification2Options>
{
    public readonly ILogger logger;

    public BubblingNotification2Handler(ILogger logger)
    {
        this.logger = logger;
    }

    public Task<bool> HandleAsync(SharedBubblingNotificationMessage message, CancellationToken cancellationToken)
    {
        logger.LogDebug("BubblingNotification2Message: " + message.Id);
        return Task.FromResult(true);
    }
}

then call the event:

public class Example
{
    private readonly IPublisher publisher;

    public Example(IPublisher publisher)
    {
        this.publisher = publisher;
    }

    public async Task SomeAction()
    {
        var message = new SharedBubblingNotificationMessage(1, true);
        await publisher.PublishAsync(message);

        // you can also use this as fire and forget notification:
        // publisher.Publish(message);
    }
}

Example of creating a Accumulator queue:

// the options of the queue:
public class LogsHandlerOptions :
    IAccumulatorQueueOptions
{
    public int MsInterval => 60 * 1000;
    public int? MaxMessagesOnDequeue => 100;
    public int? MaxMessagesStored => 1000;
    public MaxMessagesStoredBehaviors? MaxMessagesBehavior => MaxMessagesStoredBehaviors.ThrowExceptionOnAdd;
}

// the message:
public record LogMessage(DateTimeOffest Date, string Message) : IAccumulatorQueueMessage;

// the handler:
public class LogsHandler :
    IAccumulatorQueueHandler<LogMessage, LogsHandlerOptions>
{
    public readonly ILogger logger;

    public LogsHandler(ILogger logger)
    {
        this.logger = logger;
    }

    public Task HandleAsync(IEnumerable<LogMessage> messages)
    {
        foreach (var message in messages)
        {
            logger.LogDebug("Log message: " + message.Message);
        }
        return Task.CompletedTask;
    }

    public Task? HandleException(IEnumerable<LogMessage> items, 
        Exception exception, 
        int reties, 
        Func<Task> retry)
    {
        return default;
    }
}

then enqueue a log message:

public class Example
{
    private readonly IPublisher publisher;

    public Example(IPublisher publisher)
    {
        this.publisher = publisher;
    }
    
    public async Task SomeAction()
    {
        var log = new LogMessage(DateTimeOffest.UtcNow, "hello world from SomeAction");
        publisher.Publish(log);
    }
    
    public async Task AnotherAction()
    {
        var log = new LogMessage(DateTimeOffest.UtcNow, "hello world from AnotherAction");
        publisher.Publish(log);
    }
}

Roadmap:

  • More examples of use (check out the Unitests for now)
  • More handlers types
  • More unitests

Contribute

Please feel free to PR. I highly appreciate any contribution!

About

High-performance yet easy to use mediator pattern and in-process message bus implementation in .NET

Resources

Stars

Watchers

Forks

Languages