Skip to content

potapich1978/EventFlowChannel

Repository files navigation

Tests Coverage License NuGet

Event Dispatcher Architecture with Channel-Based Processing

Overview

A high-performance event-driven processing system built on .NET Standard 2.0 that provides thread-safe message dispatching with configurable channel options.
The architecture combines the flexibility of event-driven programming with the robustness of channel-based communication patterns.

Key Features

  • Thread-Safe Event Dispatching - Safe concurrent event publishing and consumption
  • Configurable Channel Types - Support for both bounded and unbounded channels
  • Fluent Configuration API - Intuitive builder pattern for channel configuration
  • Multiple Reader Support - Configurable number of concurrent consumers
  • .NET Standard 2.0 Compatibility - Works with .NET Framework 4.6.1+ and .NET Core 2.0+
  • Dependency Injection Ready - Seamless integration with Microsoft DI container

Thread Safety

  • Thread-Safe Publishing - Multiple writers supported through channel synchronization
  • Concurrent Consumption - Multiple readers process events simultaneously
  • Lifecycle Management - Atomic start/stop operations with proper cancellation support
  • Exception Handling - Isolated error handling per event with continuous processing

Best Practices

  • Handler Design
  • Keep handlers focused and single-purpose
  • Implement proper error handling
  • Use async operations for I/O-bound work

Channel Configuration

  • Use bounded channels for predictable memory usage
  • Match reader count to available cores for CPU-bound work
  • Enable multiple writers for high-throughput scenarios

Monitoring

  • Implement comprehensive logging using all three log levels (Message, Warning, Error)

Performance Considerations

  • Bounded Channels - Prevent memory overgrowth with configurable capacity
  • Reader Count - Optimize based on workload (I/O vs CPU bound)
  • Channel Options - SingleWriter/SingleReader optimizations available
  • Async/Await - Non-blocking operations throughout the pipeline
  • Cancellation Support - Use timeouts to prevent uncontrolled memory growth

Example Use Cases

  • Microservices Communication - Event-based service coordination
  • Data Processing Pipelines - Parallel event processing
  • Real-time Applications - WebSocket/gRpc message processing
  • Background Processing - Async task execution
  • Event Sourcing - Reliable event publishing with configurable backpressure

Core Components

Event Interfaces

Channel Infrastructure

Builder Pattern

Exception Types

Installation

<PackageReference Include="Potapich.EventFlowChannel" Version="1.3.0" />

Quick Start

1. Define Events and Handlers

public enum EventType { UserCreated, OrderProcessed }

public class UserCreatedEvent : IGenericEvent<EventType>
{
    public EventType EventType => EventType.UserCreated;
    public string UserName { get; set; }
}

public class UserEventHandler : IGenericEventHandler<EventType>
{
    public EventType EventType => EventType.UserCreated;
    
    public async Task HandleAsync(IGenericEvent<EventType> @event, CancellationToken token)
    {
        var userEvent = (UserCreatedEvent)@event;
        // Process event
    }
}

2. Configure Services

services.AddEventChannelBuilder<EventType>(typeof(Startup).Assembly)
        .AddSingleton<IGenericEventDispatcherLogger, MyLogger>();

3. Create and Use Channel

var channel = provider.GetService<IChannelBuilder<EventType>>()
    .Bounded()
    .WithCapacity(1000)
    .WithReadersCount(4)
    .WithMultipleWriters()
    .Build();

channel.Start();

// Modern async pattern with cancellation support
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
    await channel.EnqueueAsync(
        new UserCreatedEvent { UserName = "John" }, 
        cts.Token
    );
}
catch (CancelEnqueueMessageException ex)
{
    // Handle user cancellation or channel stop
    Console.WriteLine($"Enqueue cancelled: {ex.Message}");
}
catch (ChannelClosedException ex)
{
    // Handle channel not ready
    Console.WriteLine($"Channel closed: {ex.Message}");
}

// Legacy pattern (deprecated - avoid in new code)
await channel.Enqueue(new UserCreatedEvent { UserName = "John" });

Advanced Configuration

Bounded Channel with Custom Options

var channel = builder.Bounded()
    .WithCapacity(5000)
    .WithReadersCount(8)
    .WithFullMode(BoundedChannelFullMode.DropWrite)
    .WithMultipleWriters(false)
    .Build();

channel.Start();

// Enqueue with timeout to prevent memory issues with Wait mode
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await channel.EnqueueAsync(new UserCreatedEvent { UserName = "Jane" }, timeoutCts.Token);

// Legacy pattern (deprecated - avoid in new code)
await channel.Enqueue(new UserCreatedEvent { UserName = "John" });

Unbounded Channel

var channel = builder.Unbounded()
    .WithReadersCount(2)
    .WithMultipleWriters(true)
    .Build();

channel.Start();

// Enqueue with timeout to prevent memory issues in unbound channel
var userToken = new CancellationTokenSource();
await channel.EnqueueAsync(new UserCreatedEvent { UserName = "Bob" }, userToken.Token);

await channel.Enqueue(new UserCreatedEvent { UserName = "John" });

Logging Integration

Microsoft.Extensions.Logging

public class MicrosoftLoggerAdapter : IGenericEventDispatcherLogger
{
    private readonly ILogger _logger;

    public MicrosoftLoggerAdapter(ILoggerFactory factory)
    {
        _logger = factory.CreateLogger("EventDispatcher");
    }
    
    public void LogMessage(string message) => _logger.LogInformation(message);
    public void LogWarning(string message) => _logger.LogWarning(message);
    public void LogError(string message) => _logger.LogError(message);
    public void LogError(string message, Exception ex) => _logger.LogError(ex, message);
}

Serilog

public class SerilogAdapter : IGenericEventDispatcherLogger
{
    public void LogMessage(string message) => Serilog.Log.Information(message);
    public void LogWarning(string message) => Serilog.Log.Warning(message);
    public void LogError(string message) => Serilog.Log.Error(message);
    public void LogError(string message, Exception ex) => Serilog.Log.Error(ex, message);
}

Use IEventDispatcher without queue

Starting with version 1.1.0, the ability to explicitly register an event handler without registering a message channel has been added.

using ChannelReader;

services.AddEventDispatcher<EventType>(typeof(Startup).Assembly)
        .AddSingleton<IGenericEventDispatcherLogger, MyLogger>();

Support

  • GitHub Issues
  • API Documentation: See XML documentation in source code

About

High-performance event dispatcher for .NET with channel-based processing, DI support, and async event handlers

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages