一个高性能、轻量级的本地事件总线库,专为 .NET 8+ 设计。
- 🚀 高性能 - 使用 Expression 树编译委托,比反射快 15 倍
- 🔒 线程安全 - 使用
ConcurrentDictionary+ImmutableArray无锁设计 - 📦 强类型 - 支持泛型事件,编译时类型检查
- 🎯 Topic 路由 - 支持基于 Topic 的事件路由和灵活匹配
- 🔄 重试机制 - 支持固定/线性/指数退避重试策略
- 🎯 事件过滤 - 按条件过滤事件
- 🔌 拦截器 - 支持前置、后置、失败拦截,用于监控、日志和异常处理
- ⚡ 异步优先 - 基于
Channel<T>的异步事件队列 - 🔀 哈希分片 - 支持固定数量的分片通道,保证顺序性的同时提升并发性能
- 📍 直接调用 - 支持
InvokeAsync直接调用订阅者,无需通过事件队列 - 🔧 依赖注入 - 原生支持 Microsoft.Extensions.DependencyInjection
- 🎨 灵活匹配 - 支持精确匹配、通配符(*)、正则表达式匹配
dotnet add package LocalEventBususing LocalEventBus;
using LocalEventBus.Abstractions;
// 使用默认配置
var eventBus = EventBusFactory.Create();
// 或使用自定义配置
var eventBus = EventBusFactory.Create(options =>
{
options.ChannelCapacity = 1000;
options.DefaultTimeout = TimeSpan.FromSeconds(30);
options.PartitionCount = 4; // 配置分片数量
options.RetryOptions.MaxRetryAttempts = 3;
options.RetryOptions.DelayStrategy = RetryDelayStrategy.ExponentialBackoff;
});using LocalEventBus;
using Microsoft.Extensions.DependencyInjection;
var services = new ServiceCollection();
// 添加事件总线服务
services.AddLocalEventBus(options =>
{
options.ChannelCapacity = 1000;
options.DefaultTimeout = TimeSpan.FromSeconds(30);
options.PartitionCount = 4; // 配置分片数量
options.RetryOptions.MaxRetryAttempts = 3;
options.RetryOptions.DelayStrategy = RetryDelayStrategy.ExponentialBackoff;
});
// 可选:添加通配符匹配器
services.AddLocalEventBus()
.AddWildcardMatcher()
.AddRegexMatcher();
var serviceProvider = services.BuildServiceProvider();
var eventBus = serviceProvider.GetRequiredService<IEventBus>();// 使用 record 定义事件(推荐)
public record OrderCreatedEvent(int OrderId, string CustomerName, decimal Amount);
public record PaymentReceivedEvent(int OrderId, decimal Amount, DateTime PaidAt);// 异步发布(推荐)
await eventBus.PublishAsync(new OrderCreatedEvent(123, "张三", 99.99m));
// 同步发布(入队不等待处理)
eventBus.Publish(new OrderCreatedEvent(456, "李四", 199.99m));
// 批量发布
var orders = new[]
{
new OrderCreatedEvent(1, "Customer1", 100m),
new OrderCreatedEvent(2, "Customer2", 200m),
new OrderCreatedEvent(3, "Customer3", 300m)
};
await eventBus.PublishBatchAsync(orders);
// 直接调用(不经过队列,同步等待处理完成)
await eventBus.InvokeAsync(new OrderCreatedEvent(100, "DirectCall", 500m));
// 通过 Topic 直接调用
await eventBus.InvokeByTopicAsync("system/shutdown");// 异步处理
var subscription = eventBus.Subscribe<OrderCreatedEvent>(async (order, ct) =>
{
Console.WriteLine($"收到订单: {order.OrderId}, 客户: {order.CustomerName}");
await ProcessOrderAsync(order, ct);
});
// 同步处理
eventBus.Subscribe<OrderCreatedEvent>(order =>
{
Console.WriteLine($"订单 {order.OrderId} 已创建");
});
// 取消订阅
subscription.Dispose();public class OrderHandler
{
// 带事件参数的订阅
[Subscribe]
public async ValueTask HandleOrderCreated(OrderCreatedEvent e, CancellationToken ct)
{
Console.WriteLine($"处理订单: {e.OrderId}");
await Task.Delay(100, ct);
}
[Subscribe]
public void HandlePayment(PaymentReceivedEvent e)
{
Console.WriteLine($"收到支付: {e.Amount}");
}
// 无参订阅者(必须指定 Topic)
[Subscribe("system/shutdown")]
public void OnSystemShutdown()
{
Console.WriteLine("系统即将关闭...");
}
// 无参订阅者 + CancellationToken
[Subscribe("system/refresh")]
public async ValueTask OnRefreshAsync(CancellationToken ct)
{
await RefreshCacheAsync(ct);
}
}
// 注册订阅者
var handler = new OrderHandler();
var subscription = eventBus.Subscribe(handler);
// 取消所有订阅
subscription.Dispose();var eventBus = EventBusFactory.Create(options =>
{
// 通道容量(0 = 无界通道)
options.ChannelCapacity = 10000;
// 通道满时的行为
options.ChannelFullMode = BoundedChannelFullMode.Wait;
// 默认处理超时
options.DefaultTimeout = TimeSpan.FromSeconds(30);
// 分片数量(用于哈希分片,默认1)
// - 单线程模式(全局顺序):1(默认)
// - 低负载(< 1K 事件/秒):4-8
// - 中等负载(1K-10K 事件/秒):16
// - 高负载(> 10K 事件/秒):32-64
options.PartitionCount = 16;
// 重试配置
options.RetryOptions.MaxRetryAttempts = 3;
options.RetryOptions.InitialDelay = TimeSpan.FromSeconds(1);
options.RetryOptions.MaxDelay = TimeSpan.FromSeconds(30);
options.RetryOptions.DelayStrategy = RetryDelayStrategy.ExponentialBackoff;
});eventBus.Subscribe<OrderCreatedEvent>(handler, new SubscribeOptions
{
// 主题名称(可选,不指定则使用事件类型全名)
Topic = "orders/created",
// 优先级(0-10,越大越先执行,默认5)
Priority = 10,
// 是否允许并发处理(默认true)
AllowConcurrency = true,
// 处理超时(默认使用 EventBusOptions.DefaultTimeout)
Timeout = TimeSpan.FromSeconds(10)
});await eventBus.PublishAsync(
new OrderCreatedEvent(123, "Alice", 100m),
new PublishOptions
{
// 主题名称(可选,不指定则使用事件类型全名)
Topic = "orders/created",
// 分区键(相同分区键的事件保证有序处理)
PartitionKey = "user:alice",
// 优先级(0-10,默认5)
Priority = 8
});public class MyHandler
{
// 使用主题过滤
[Subscribe("orders/vip")]
public void HandleVipOrder(OrderCreatedEvent e) { }
// 设置优先级和超时
[Subscribe(Priority = 10, Timeout = 5000, AllowConcurrency = false)]
public void HandleHighPriority(OrderCreatedEvent e) { }
}public class VipOnlyFilter : IEventFilter
{
public int Order => 1;
public ValueTask<bool> ShouldProcessAsync<TEvent>(TEvent @event, CancellationToken ct)
where TEvent : notnull
{
if (@event is OrderCreatedEvent order)
{
return ValueTask.FromResult(order.Amount > 1000);
}
return ValueTask.FromResult(true);
}
}
// 添加过滤器
eventBus.AddFilter(new VipOnlyFilter());public class LoggingInterceptor : IEventInterceptor
{
public int Order => 0;
public ValueTask OnHandlingAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, CancellationToken ct)
where TEvent : notnull
{
Console.WriteLine($"[开始] 处理事件: {typeof(TEvent).Name}");
return ValueTask.CompletedTask;
}
public ValueTask OnHandledAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, TimeSpan elapsed, CancellationToken ct)
where TEvent : notnull
{
Console.WriteLine($"[完成] 处理事件: {typeof(TEvent).Name}, 耗时: {elapsed.TotalMilliseconds}ms");
return ValueTask.CompletedTask;
}
public ValueTask OnHandlerFailedAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, Exception ex, CancellationToken ct)
where TEvent : notnull
{
Console.WriteLine($"[失败] 处理事件: {typeof(TEvent).Name}, 错误: {ex.Message}");
return ValueTask.CompletedTask;
}
}
// 添加拦截器
eventBus.AddInterceptor(new LoggingInterceptor());LocalEventBus 使用哈希分片架构来提升并发性能,同时保证同一分区键的事件顺序性。
// 配置分片数量
var eventBus = EventBusFactory.Create(options =>
{
options.PartitionCount = 32 // 默认1,推荐设置为CPU核心数的2-4倍
});
// 使用分区键发布事件
await eventBus.PublishAsync(
new OrderCreatedEvent(123, "Alice", 100m),
new PublishOptions { PartitionKey = "user:alice" }
);
// 相同分区键的事件保证按顺序处理
await eventBus.PublishAsync(event1, new PublishOptions { PartitionKey = "user:123" });
await eventBus.PublishAsync(event2, new PublishOptions { PartitionKey = "user:123" });
await eventBus.PublishAsync(event3, new PublishOptions { PartitionKey = "user:123" });
// ✅ event1 -> event2 -> event3 保证顺序
// 不同分区键的事件可以并发处理
await eventBus.PublishAsync(event1, new PublishOptions { PartitionKey = "user:123" });
await eventBus.PublishAsync(event2, new PublishOptions { PartitionKey = "user:456" });
// ⚡ event1 和 event2 可能并发处理分片特性:
- ✅ 固定数量的分片通道,在初始化时创建
- ✅ 使用哈希算法将分区键映射到分片
- ✅ 同一分区键的事件保证顺序处理
- ✅ 不同分片的事件可以并发处理
- ✅ 减少锁竞争,提升吞吐量
详见:哈希分片架构文档
除了通过事件队列异步处理,还可以直接同步调用所有订阅者:
// 直接调用订阅者(不经过事件队列)
await eventBus.InvokeAsync(new OrderCreatedEvent(123, "Alice", 100m));
// 通过 Topic 直接调用
await eventBus.InvokeByTopicAsync("system/shutdown");
// 带事件数据的 Topic 调用
await eventBus.InvokeByTopicAsync("user/notification",
new NotificationEvent("Welcome!"));适用场景:
- 需要立即执行的操作(不希望异步延迟)
- 需要等待所有订阅者完成
- 测试场景下验证订阅者行为
注意事项:
InvokeAsync会同步调用所有订阅者(按优先级顺序)- ✅ 会应用过滤器(可以过滤不需要处理的事件)
- ❌ 不会触发拦截器(OnHandlingAsync、OnHandledAsync、OnHandlerFailedAsync 都不会被调用)
- ✅ 支持超时机制(会应用 SubscribeOptions.Timeout 或 EventBusOptions.DefaultTimeout)
- ❌ 不支持重试机制(异常会直接抛出,需要调用方自行处理)
与 PublishAsync 的区别:
| 特性 | PublishAsync | InvokeAsync |
|---|---|---|
| 执行方式 | 异步队列 | 同步直接调用 |
| 过滤器 | ✅ 支持 | ✅ 支持 |
| 拦截器 | ✅ 支持 | ❌ 不支持 |
| 超时 | ✅ 支持 | ✅ 支持 |
| 重试 | ✅ 支持 | ❌ 不支持 |
| 异常处理 | 拦截器处理 | 调用方处理 |
| 顺序保证 | ✅ 同分区键保证 | ✅ 按优先级顺序 |
LocalEventBus 支持灵活的 Topic 匹配机制,可以实现精确匹配、通配符匹配和正则表达式匹配。
// 发布事件
await eventBus.PublishAsync("orders/created", new OrderCreatedEvent(...));
// 订阅事件(精确匹配)
[Subscribe(Topic = "orders/created")]
public void HandleOrderCreated(OrderCreatedEvent e) { }// 注册通配符匹配器
services.AddLocalEventBus()
.AddWildcardMatcher();
// 订阅示例(订阅方使用字面量 Topic)
[Subscribe(Topic = "orders/created")]
public void HandleOrderCreated(object e) { }
[Subscribe(Topic = "orders/updated")]
public void HandleOrderUpdated(object e) { }
[Subscribe(Topic = "users/created")]
public void HandleUserCreated(object e) { }
// 发布侧使用通配符匹配多个订阅者
await eventBus.PublishAsync("orders/*"); // 命中 orders/created、orders/updated
await eventBus.PublishAsync("*/created"); // 命中 orders/created、users/created
await eventBus.PublishAsync("**"); // 命中所有订阅仅发布方 Topic 支持通配符匹配订阅方:发布时可使用
*/?将同一消息广播到符合模式的订阅者;订阅方 Topic 默认按字面量匹配(若需更灵活可使用正则匹配器)。
// 发布侧使用通配符广播
await eventBus.PublishAsync("orders/*"); // 触发 orders/created、orders/updated 等订阅
await eventBus.PublishAsync("user-?/login"); // 触发 user-a/login、user-b/login 等订阅// 注册正则匹配器
services.AddLocalEventBus()
.AddRegexMatcher();
// 订阅示例(订阅方使用字面量 Topic)
[Subscribe(Topic = "orders/created")]
public void HandleOrderCreated(object e) { }
[Subscribe(Topic = "orders/updated")]
public void HandleOrderUpdated(object e) { }
// 发布侧使用正则模式匹配多个订阅
await eventBus.PublishAsync(@"^orders/(created|updated)$");
await eventBus.PublishAsync(@"^user-\d+/login$");// 实现自定义匹配器
public class CustomMatcher : IEventMatcher
{
public int Order => 100; // 执行顺序(越小越先执行)
public bool IsMatch(string publishedTopic, string subscribedPattern)
{
// 自定义匹配逻辑
return publishedTopic.Contains(subscribedPattern);
}
}
// 注册自定义匹配器
services.AddLocalEventBus()
.AddMatcher<CustomMatcher>();// 获取诊断信息
if (eventBus is IEventBusDiagnostics diagnostics)
{
Console.WriteLine($"订阅者数量: {diagnostics.GetSubscriberCount()}");
Console.WriteLine($"待处理事件: {diagnostics.GetPendingEventCount()}");
foreach (var subscriber in diagnostics.GetSubscribers<OrderCreatedEvent>())
{
Console.WriteLine($" - {subscriber}");
}
}// 根据负载和CPU核心数选择合适的分片数量
var eventBus = EventBusFactory.Create(options =>
{
// 单线程模式(保证全局顺序)
options.PartitionCount = 1;
// 多核并发(推荐:CPU核心数的2-4倍)
// 例如:8核CPU可以设置为16-32
options.PartitionCount = Environment.ProcessorCount * 2;
});// 同一用户的事件使用相同的分区键,保证按顺序处理
await eventBus.PublishAsync(
new UserLoginEvent(userId),
new PublishOptions { PartitionKey = $"user:{userId}" });
await eventBus.PublishAsync(
new UserLogoutEvent(userId),
new PublishOptions { PartitionKey = $"user:{userId}" });
// ✅ 保证先处理登录,再处理登出public class MonitoringInterceptor : IEventInterceptor
{
private readonly ILogger _logger;
private readonly IMetrics _metrics;
public int Order => 0;
public ValueTask OnHandlingAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, CancellationToken ct)
where TEvent : notnull
{
_logger.LogInformation("开始处理事件: {EventType}", typeof(TEvent).Name);
_metrics.IncrementCounter("event_handling_started");
return ValueTask.CompletedTask;
}
public ValueTask OnHandledAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, TimeSpan elapsed, CancellationToken ct)
where TEvent : notnull
{
_logger.LogInformation("事件处理完成: {EventType}, 耗时: {Elapsed}ms",
typeof(TEvent).Name, elapsed.TotalMilliseconds);
_metrics.RecordHistogram("event_handling_duration", elapsed.TotalMilliseconds);
return ValueTask.CompletedTask;
}
public ValueTask OnHandlerFailedAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, Exception ex, CancellationToken ct)
where TEvent : notnull
{
_logger.LogError(ex, "事件处理失败: {EventType}", typeof(TEvent).Name);
_metrics.IncrementCounter("event_handling_failed");
return ValueTask.CompletedTask;
}
}
// 注册拦截器
services.AddLocalEventBus()
.AddInterceptor<MonitoringInterceptor>();// 在应用关闭时,确保所有事件都被处理完毕
public async Task ShutdownAsync(IEventBus eventBus, CancellationToken ct)
{
// 1. 停止接收新事件(如果有入口控制)
// ...
// 2. 等待当前队列中的事件处理完成
if (eventBus is IEventBusDiagnostics diagnostics)
{
while (diagnostics.GetPendingEventCount() > 0)
{
await Task.Delay(100, ct);
}
}
// 3. 释放资源
await eventBus.DisposeAsync();
}// 方式1:使用拦截器处理异常
public class ErrorHandlingInterceptor : IEventInterceptor
{
public int Order => -100; // 优先执行
public ValueTask OnHandlerFailedAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, Exception ex, CancellationToken ct)
where TEvent : notnull
{
// 根据异常类型决定处理策略
if (ex is TimeoutException)
{
// 超时异常,可能需要报警
AlertSystem.SendAlert($"Event handler timeout: {typeof(TEvent).Name}");
}
else if (ex is InvalidOperationException)
{
// 业务异常,记录日志即可
_logger.LogWarning(ex, "Business exception in event handler");
}
return ValueTask.CompletedTask;
}
}
// 方式2:在订阅者中使用 try-catch
[Subscribe]
public async ValueTask HandleOrderCreated(OrderCreatedEvent e, CancellationToken ct)
{
try
{
await ProcessOrderAsync(e, ct);
}
catch (Exception ex)
{
// 处理异常,避免影响其他订阅者
_logger.LogError(ex, "Failed to process order: {OrderId}", e.OrderId);
}
}| 优化项 | 技术方案 | 性能提升 |
|---|---|---|
| 订阅者调用 | Expression 树编译委托 | 比反射快 15 倍 |
| 并发控制 | ConcurrentDictionary + ImmutableArray | 无锁设计,零拷贝 |
| 事件分发 | Channel 异步队列 | 高吞吐量,低延迟 |
| 分片处理 | 哈希分片(固定通道数) | 并发性能提升 2-5 倍 |
| 内存分配 | 不可变集合 + 对象池 | 极低 GC 压力 |
| 场景 | 吞吐量 | 平均延迟 | P99 延迟 |
|---|---|---|---|
| 单订阅者 | ~100K ops/s | ~10μs | ~50μs |
| 10 订阅者 | ~50K ops/s | ~20μs | ~100μs |
| 100 订阅者 | ~10K ops/s | ~100μs | ~500μs |
| 分片模式(16分片) | ~200K ops/s | ~5μs | ~30μs |
测试环境: .NET 8.0, Intel i7-12700, 16GB RAM, Windows 11
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
[MemoryDiagnoser]
public class EventBusBenchmark
{
private IEventBus _eventBus = null!;
[GlobalSetup]
public void Setup()
{
_eventBus = EventBusFactory.Create(options =>
{
options.PartitionCount = 16;
options.ChannelCapacity = 10000;
});
// 注册订阅者
_eventBus.Subscribe<TestEvent>(e => { /* 处理逻辑 */ });
}
[Benchmark]
public async Task PublishAsync_1K_Events()
{
for (int i = 0; i < 1000; i++)
{
await _eventBus.PublishAsync(new TestEvent(i));
}
}
[Benchmark]
public void Publish_1K_Events()
{
for (int i = 0; i < 1000; i++)
{
_eventBus.Publish(new TestEvent(i));
}
}
}
// 运行基准测试
// BenchmarkRunner.Run<EventBusBenchmark>();┌─────────────────────────────────────────────────────────────┐
│ IEventBus │
│ ┌──────────────────┐ ┌─────────────────────┐ │
│ │ IEventPublisher │ │ IEventSubscriber │ │
│ └────────┬─────────┘ └──────────┬──────────┘ │
└───────────┼────────────────────────────────┼────────────────┘
│ │
▼ ▼
┌───────────────────────────────────────────────────────────────┐
│ DefaultEventBus │
│ ┌──────────────────────────────────────────────────────────┐│
│ │ EventSubscriberRegistry (ConcurrentDict + ImmutableArray)││
│ └──────────────────────────────────────────────────────────┘│
│ ┌──────────────────────────────────────────────────────────┐│
│ │ EventChannelManager (Channel<EventEnvelope>) ││
│ └──────────────────────────────────────────────────────────┘│
│ ┌──────────────────────────────────────────────────────────┐│
│ │ Pipeline: Filter → Interceptor → Invoker → Retry ││
│ └──────────────────────────────────────────────────────────┘│
└───────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ 异常由拦截器处理 (Interceptor) │
└─────────────────────────────────┘
- .NET 8.0 或更高版本
- Microsoft.Extensions.DependencyInjection.Abstractions 8.0+
- System.Collections.Immutable 8.0+
- System.Threading.Channels 8.0+
A: PublishAsync 将事件放入队列异步处理,支持拦截器和重试;InvokeAsync 直接同步调用订阅者,不触发拦截器,适用于需要立即执行的场景。
A: 使用相同的 PartitionKey 发布事件,相同分区键的事件会被路由到同一个分片通道,保证顺序处理。
A:
- 单线程模式(全局顺序):1(默认)
- 低负载(< 1K 事件/秒):4-8
- 中等负载(1K-10K 事件/秒):16
- 高负载(> 10K 事件/秒):32-64
- 推荐值:CPU核心数的 2-4 倍
A: 注册 WildcardEventMatcher,然后使用通配符 Topic:
services.AddLocalEventBus().AddWildcardMatcher();
[Subscribe(Topic = "orders/*")]
public void HandleAllOrderEvents(object e) { }A: 不会。每个订阅者独立处理,一个订阅者抛出异常不会影响其他订阅者。异常会被拦截器捕获,或者在启用重试时进行重试。
A: 使用 IEventBusDiagnostics 接口获取诊断信息,或者通过拦截器记录日志和指标。
A: LocalEventBus 是进程内事件总线,不支持跨进程通信。如需分布式场景,请考虑 MassTransit、NServiceBus 等分布式消息框架。
A: Subscribe 方法返回 IDisposable,调用 Dispose() 即可取消订阅:
var subscription = eventBus.Subscribe<MyEvent>(handler);
// ...
subscription.Dispose(); // 取消订阅MIT License - 详见 LICENSE 文件
- CHANGELOG.md - 版本更新日志
- 示例代码 - 更多使用示例
- NuGet Package - NuGet 包下载
欢迎提交 Issue 和 Pull Request!
贡献指南:
- Fork 本仓库
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 开启 Pull Request
如有问题或建议,请提交 Issue。
⭐ 如果这个项目对你有帮助,请给个 Star!