diff --git a/pulse/event_bus.go b/pulse/event_bus.go index 5ead2864..e4e3f9ef 100644 --- a/pulse/event_bus.go +++ b/pulse/event_bus.go @@ -12,6 +12,8 @@ type EventBus struct { func (e *EventBus) OnReceive(ctx vivid.MessageContext) { switch m := ctx.GetMessage().(type) { + case vivid.OnOptionApply[*EventBus]: + m.Options.WithMailboxFactory(vivid.PriorityMailboxFactoryId) case vivid.OnPreStart: e.onStart() case eventSubscribeMessage: diff --git a/pulse/event_bus_test.go b/pulse/event_bus_test.go index 0bf76e02..a9dd1dba 100644 --- a/pulse/event_bus_test.go +++ b/pulse/event_bus_test.go @@ -5,10 +5,16 @@ import ( "github.com/kercylan98/minotaur/pulse" "github.com/kercylan98/minotaur/vivid" "testing" + "time" ) +var tester *testing.T +var receiveEventCount int +var produceEventCount int + type TestActor struct { eventBus vivid.ActorRef + producer vivid.ActorRef } func (t *TestActor) OnReceive(ctx vivid.MessageContext) { @@ -16,12 +22,23 @@ func (t *TestActor) OnReceive(ctx vivid.MessageContext) { case TestActorBindEventBus: t.eventBus = m.EventBus case vivid.ActorRef: + t.producer = m pulse.Subscribe[string](t.eventBus, m, ctx.GetReceiver(), "event-string") + m.Tell(true) case string: - fmt.Println("receive event:", m) + receiveEventCount++ + tester.Log("receive event:", m) + pulse.Unsubscribe[string](t.eventBus, t.producer, ctx.GetReceiver(), "event-string") + tester.Log("unsubscribe event") case int: // producer message + produceEventCount++ pulse.Publish(t.eventBus, ctx.GetReceiver(), fmt.Sprintf("event-%d", m)) + case bool: + for i := 0; i < 10; i++ { + ctx.GetReceiver().Tell(i) + tester.Log("produce event:", i) + } } } @@ -30,6 +47,7 @@ type TestActorBindEventBus struct { } func TestNewEventBus(t *testing.T) { + tester = t system := vivid.NewActorSystem("test") eventBus := vivid.ActorOf[*pulse.EventBus](&system, vivid.NewActorOptions[*pulse.EventBus]().WithName("event_bus")) @@ -40,9 +58,9 @@ func TestNewEventBus(t *testing.T) { subscriber.Tell(TestActorBindEventBus{EventBus: eventBus}) subscriber.Tell(producer) - for i := 0; i < 100; i++ { - producer.Tell(i) - } - + time.Sleep(time.Second) system.Shutdown() + + t.Log("produce event count:", produceEventCount) + t.Log("receive event count:", receiveEventCount) } diff --git a/pulse/pulse.go b/pulse/pulse.go index 08c1b151..860fc331 100644 --- a/pulse/pulse.go +++ b/pulse/pulse.go @@ -2,9 +2,15 @@ package pulse import ( "github.com/kercylan98/minotaur/vivid" + "math" "reflect" + "sync/atomic" ) +var eventSeq atomic.Int64 // 全局事件序号,仅标记事件产生顺序 + +// Subscribe 订阅消息总线中来自特定生产者的特定事件,该事件可在 SubscribeId 不同的情况下重复订阅 +// - 由于订阅的过程是异步的,订阅不会立即生效,而是在下一个事件循环中生效 func Subscribe[T Event](eventBus vivid.ActorRef, producer Producer, subscriber Subscriber, subscribeId SubscribeId) { eventBus.Tell(eventSubscribeMessage{ producer: producer, @@ -12,21 +18,25 @@ func Subscribe[T Event](eventBus vivid.ActorRef, producer Producer, subscriber S event: reflect.TypeOf((*T)(nil)).Elem(), subscribeId: subscribeId, priority: 0, - }) + }, vivid.WithPriority(math.MinInt64)) } +// Unsubscribe 取消订阅消息总线中来自特定生产者的特定事件 +// - 由于取消订阅的过程是异步的,取消订阅不会立即生效,而是在下一个事件循环中生效,例如可能期望在收到第一个事件后取消订阅,但实际上可能会收到多个事件后才取消订阅。这是由于在取消订阅的过程中已经产生了多个事件并已经投递到了订阅者的邮箱中。 +// - 如要确保取消订阅的实时性,可在订阅者中实现过滤器。 func Unsubscribe[T Event](eventBus vivid.ActorRef, producer Producer, subscriber Subscriber, subscribeId SubscribeId) { eventBus.Tell(eventUnsubscribeMessage{ producer: producer, subscriber: subscriber, event: reflect.TypeOf((*T)(nil)).Elem(), subscribeId: subscribeId, - }) + }, vivid.WithPriority(math.MinInt64)) } +// Publish 发布事件到消息总线,消息总线会将事件投递给所有订阅者 func Publish(eventBus vivid.ActorRef, producer Producer, event Event) { eventBus.Tell(eventPublishMessage{ producer: producer, event: event, - }) + }, vivid.WithPriority(eventSeq.Add(1))) } diff --git a/vivid/actor_option.go b/vivid/actor_option.go index 2f9c0e24..84c0e947 100644 --- a/vivid/actor_option.go +++ b/vivid/actor_option.go @@ -108,7 +108,7 @@ func parseActorOptions[T Actor](options ...*ActorOptions[T]) *ActorOptions[T] { opts.DispatcherId = DefaultDispatcherId } if opts.MailboxFactoryId == 0 { - opts.MailboxFactoryId = DefaultMailboxFactoryId + opts.MailboxFactoryId = FIFOMailboxFactoryId } return opts } diff --git a/vivid/actor_system.go b/vivid/actor_system.go index 5c2a8efb..c923a370 100644 --- a/vivid/actor_system.go +++ b/vivid/actor_system.go @@ -30,24 +30,8 @@ func NewActorSystem(name string) ActorSystem { } s.ctx, s.cancel = context.WithCancel(context.Background()) s.BindDispatcher(new(_Dispatcher)) // default dispatcher - s.BindMailboxFactory(NewFIFOFactory(func(message MessageContext) { - // received message - core := message.GetReceiver().(*_LocalActorRef).core - defer func() { - core.messageGroup.Done() - if r := recover(); r != nil { - s.deadLetters.DeadLetter(NewDeadLetterEvent(DeadLetterEventTypeMessage, DeadLetterEventMessage{ - Error: fmt.Errorf("%w: %v", ErrActorPanic, r), - To: core.GetId(), - Message: message, - })) - } - }() - if core.messageHook != nil && !core.messageHook(message) { - return - } - core.OnReceive(message) - })) + s.BindMailboxFactory(NewFIFOFactory(s.onProcessMailboxMessage)) + s.BindMailboxFactory(NewPriorityFactory(s.onProcessMailboxMessage)) var err error s.userGuard, err = generateActor(&s, new(UserGuardActor), parseActorOptions(NewActorOptions[*UserGuardActor]().WithName("user"))) if err != nil { @@ -115,7 +99,7 @@ func (s *ActorSystem) BindMailboxFactory(f MailboxFactory) MailboxFactoryId { } func (s *ActorSystem) UnbindMailboxFactory(id MailboxFactoryId) { - if id == DefaultMailboxFactoryId { + if id == FIFOMailboxFactoryId { return } s.mailboxFactorRW.Lock() @@ -194,7 +178,7 @@ func (s *ActorSystem) getContext() *_ActorCore { func (s *ActorSystem) sendMessage(receiver ActorRef, message Message, options ...MessageOption) Message { var opts = new(MessageOptions).apply(options) - ctx := newMessageContext(s, message) + ctx := newMessageContext(s, message, opts.Priority) switch ref := receiver.(type) { case *_LocalActorRef: ctx = ctx.withLocal(ref.core, opts.Sender) @@ -297,11 +281,34 @@ func (s *ActorSystem) onProcessServerMessage(bytes []byte) { } } +func (s *ActorSystem) onProcessMailboxMessage(message MessageContext) { + // received message + core := message.GetReceiver().(*_LocalActorRef).core + defer func() { + core.messageGroup.Done() + if r := recover(); r != nil { + s.deadLetters.DeadLetter(NewDeadLetterEvent(DeadLetterEventTypeMessage, DeadLetterEventMessage{ + Error: fmt.Errorf("%w: %v", ErrActorPanic, r), + To: core.GetId(), + Message: message, + })) + } + }() + if core.messageHook != nil && !core.messageHook(message) { + return + } + core.OnReceive(message) +} + func generateActor[T Actor](system *ActorSystem, actor T, options *ActorOptions[T]) (*_ActorCore, error) { if options.Name == charproc.None { options.Name = uuid.NewString() } + optionsNum := len(options.options) + actor.OnReceive(newMessageContext(system, OnOptionApply[T]{Options: options}, 0).withLocal(nil, nil)) + options.applyOption(options.options[optionsNum:]...) + var actorPath = options.Name if options.Parent != nil { actorPath = path.Join(options.Parent.GetId().Path(), options.Name) diff --git a/vivid/mailbox.go b/vivid/mailbox.go index 67bcc0fb..670e8792 100644 --- a/vivid/mailbox.go +++ b/vivid/mailbox.go @@ -1,6 +1,7 @@ package vivid -const DefaultMailboxFactoryId MailboxFactoryId = 1 +const FIFOMailboxFactoryId MailboxFactoryId = 1 +const PriorityMailboxFactoryId MailboxFactoryId = 2 type MailboxFactoryId = uint64 diff --git a/vivid/mailbox_priority.go b/vivid/mailbox_priority.go new file mode 100644 index 00000000..30fc2bb4 --- /dev/null +++ b/vivid/mailbox_priority.go @@ -0,0 +1,114 @@ +package vivid + +import ( + "container/heap" + "sync" +) + +const ( + priorityStateNone = priorityState(iota) // 未启动状态 + priorityStateRunning // 运行中状态 + priorityStateStopping // 停止中状态 + priorityStateStopped // 已停止状态 +) + +const ( + PriorityStopModeInstantly = PriorityStopMode(iota) // 立刻停止消息队列,新消息将不再接收,缓冲区内未处理的消息将被丢弃 + PriorityStopModeGraceful // 优雅停止消息队列,新消息将不再接收,等待未处理的消息处理完毕后再停止 + PriorityStopModeDrain // 新消息将继续被接收,等待消息队列处理完毕且没有新消息后再停止 +) + +type ( + priorityState = int32 // 状态 + PriorityStopMode = int8 // Priority 消息队列的停止模式,目前支持 PriorityStopModeInstantly、 PriorityStopModeGraceful、 PriorityStopModeDrain +) + +func NewPriority(handler func(message MessageContext), opts ...*PriorityOptions) *Priority { + p := &Priority{ + opts: NewPriorityOptions().Apply(opts...), + status: priorityStateNone, + cond: sync.NewCond(&sync.Mutex{}), + closed: make(chan struct{}), + handler: handler, + } + p.buffer = make(priorityHeap, 0, p.opts.BufferSize) + return p +} + +type Priority struct { + opts *PriorityOptions // 配置 + status priorityState // 队列状态 + cond *sync.Cond // 消息队列条件变量 + buffer priorityHeap // 消息缓冲区 + closed chan struct{} // 关闭信号 + handler func(message MessageContext) // 消息处理函数 +} + +func (p *Priority) Start() { + p.cond.L.Lock() + if p.status != priorityStateNone { + p.cond.L.Unlock() + return + } + p.status = priorityStateRunning + p.cond.L.Unlock() + + p.closed = make(chan struct{}) + go func(p *Priority) { + defer func(p *Priority) { + close(p.closed) + if err := recover(); err != nil { + panic(err) + } + }(p) + + for { + p.cond.L.Lock() + if p.buffer.Len() == 0 { + if p.status == priorityStateStopping { + p.status = priorityStateStopped + p.cond.L.Unlock() + break + } + p.cond.Wait() + if p.buffer.Len() == 0 { + p.cond.L.Unlock() + continue + } + } + msg := heap.Pop(&p.buffer).(MessageContext) + p.cond.L.Unlock() + + p.handler(msg) + } + }(p) +} + +func (p *Priority) Stop() { + p.cond.L.Lock() + if p.status != priorityStateRunning { + p.cond.L.Unlock() + return + } + p.status = priorityStateStopping + p.cond.L.Unlock() + p.cond.Signal() +} + +func (p *Priority) Enqueue(message MessageContext) bool { + p.cond.L.Lock() + if p.status != priorityStateRunning { + p.cond.L.Unlock() + return false + } + heap.Push(&p.buffer, message) + p.cond.L.Unlock() + p.cond.Signal() + return true +} + +func (p *Priority) reset() { + p.cond.L.Lock() + p.buffer = p.buffer[:0] + p.cond.L.Unlock() +} diff --git a/vivid/mailbox_priority_factory.go b/vivid/mailbox_priority_factory.go new file mode 100644 index 00000000..d6a4cedc --- /dev/null +++ b/vivid/mailbox_priority_factory.go @@ -0,0 +1,31 @@ +package vivid + +import ( + "github.com/kercylan98/minotaur/toolkit/pools" +) + +func NewPriorityFactory(handler func(message MessageContext), opts ...*PriorityOptions) MailboxFactory { + var pool = pools.NewObjectPool[Priority](func() *Priority { + return NewPriority(handler, opts...) + }, func(data *Priority) { + data.reset() + }) + + return &PriorityFactory{pool: pool} +} + +type PriorityFactory struct { + pool *pools.ObjectPool[*Priority] +} + +func (P *PriorityFactory) Get() Mailbox { + return P.pool.Get() +} + +func (P *PriorityFactory) Put(mailbox Mailbox) { + priority, ok := mailbox.(*Priority) + if !ok { + return + } + P.pool.Release(priority) +} diff --git a/vivid/mailbox_priority_heap.go b/vivid/mailbox_priority_heap.go new file mode 100644 index 00000000..a5bf4f23 --- /dev/null +++ b/vivid/mailbox_priority_heap.go @@ -0,0 +1,27 @@ +package vivid + +type priorityHeap []MessageContext + +func (h *priorityHeap) Len() int { + return len(*h) +} + +func (h *priorityHeap) Less(i, j int) bool { + return (*h)[i].GetPriority() < (*h)[j].GetPriority() +} + +func (h *priorityHeap) Swap(i, j int) { + (*h)[i], (*h)[j] = (*h)[j], (*h)[i] +} + +func (h *priorityHeap) Push(x any) { + *h = append(*h, x.(MessageContext)) +} + +func (h *priorityHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/vivid/mailbox_priority_options.go b/vivid/mailbox_priority_options.go new file mode 100644 index 00000000..17fb60fb --- /dev/null +++ b/vivid/mailbox_priority_options.go @@ -0,0 +1,42 @@ +package vivid + +func NewPriorityOptions() *PriorityOptions { + return &PriorityOptions{ + BufferSize: 1024, + StopMode: PriorityStopModeGraceful, + } +} + +type PriorityOption func(*PriorityOptions) + +type PriorityOptions struct { + options []PriorityOption + BufferSize uint // 消息队列的缓冲区大小 + StopMode PriorityStopMode // 消息队列的停止模式 +} + +// Apply 用于应用 Priority 配置 +func (o *PriorityOptions) Apply(opts ...*PriorityOptions) *PriorityOptions { + for _, opt := range opts { + for _, option := range opt.options { + option(o) + } + } + return o +} + +// WithBufferSize 用于设置消息队列的缓冲区大小 +func (o *PriorityOptions) WithBufferSize(size uint) *PriorityOptions { + o.options = append(o.options, func(o *PriorityOptions) { + o.BufferSize = size + }) + return o +} + +// WithStopMode 用于设置消息队列的停止模式 +func (o *PriorityOptions) WithStopMode(mode PriorityStopMode) *PriorityOptions { + o.options = append(o.options, func(o *PriorityOptions) { + o.StopMode = mode + }) + return o +} diff --git a/vivid/message.go b/vivid/message.go index a15737a9..3ea89e08 100644 --- a/vivid/message.go +++ b/vivid/message.go @@ -2,6 +2,12 @@ package vivid type Message = any +// OnOptionApply 该消息将在 Actor 可选项应用前发送,可用于 Actor 对可选项的检查要求 +// - 该阶段 Actor 尚未初始化,不要期待在该阶段能够处理任何 ActorOptions 以外的内容 +type OnOptionApply[T Actor] struct { + Options *ActorOptions[T] +} + // OnPreStart 是 Actor 生命周期的开始阶段,通常用于初始化 Actor 的状态 type OnPreStart struct { } diff --git a/vivid/message_context.go b/vivid/message_context.go index 59fc0beb..c131934b 100644 --- a/vivid/message_context.go +++ b/vivid/message_context.go @@ -29,28 +29,33 @@ type MessageContext interface { // GetActor 获取 Actor 对象,该函数是 ActorContext.GetActor 的快捷方式 GetActor() Actor + + // GetPriority 获取消息的优先级 + GetPriority() int64 } -func newMessageContext(system *ActorSystem, message Message) *_MessageContext { +func newMessageContext(system *ActorSystem, message Message, priority int64) *_MessageContext { return &_MessageContext{ - system: system, - Seq: system.messageSeq.Add(1), - Network: system.network, - Host: system.host, - Port: system.port, - Message: message, + system: system, + Seq: system.messageSeq.Add(1), + Network: system.network, + Host: system.host, + Port: system.port, + Message: message, + Priority: priority, } } // _MessageContext 消息上下文,消息上下文实现了兼容本地及远程消息的上下文 // - 该结构体中,除开公共信息外,内部字段被用于本地消息,公开字段被用于远程消息,需要保证公共及公开字段的可序列化 type _MessageContext struct { - system *ActorSystem // 创建上下文的 Actor 系统 - Seq uint64 // 消息序号 - Network string // 产生消息的网络 - Host string // 产生消息的主机 - Port uint16 // 产生消息的端口 - Message Message // 消息内容 + system *ActorSystem // 创建上下文的 Actor 系统 + Seq uint64 // 消息序号 + Network string // 产生消息的网络 + Host string // 产生消息的主机 + Port uint16 // 产生消息的端口 + Message Message // 消息内容 + Priority int64 // 消息优先级 // 本地消息是直接根据实现了 ActorRef 的 _ActorCore 来投递的,所以可以直接将消息投递到 ActorCore 绑定的 Dispatcher 中 actorContext ActorContext // 本地接收者的上下文 @@ -190,3 +195,7 @@ func (c *_MessageContext) Reply(message Message) { func (c *_MessageContext) GetActor() Actor { return c.GetContext().GetActor() } + +func (c *_MessageContext) GetPriority() int64 { + return c.Priority +} diff --git a/vivid/message_options.go b/vivid/message_options.go index 2b001e84..93e16d48 100644 --- a/vivid/message_options.go +++ b/vivid/message_options.go @@ -10,6 +10,7 @@ type MessageOptions struct { Sender ActorRef ReplyTimeout time.Duration ContextHook func(MessageContext) + Priority int64 } func (o *MessageOptions) apply(options []MessageOption) *MessageOptions { @@ -19,6 +20,15 @@ func (o *MessageOptions) apply(options []MessageOption) *MessageOptions { return o } +// WithPriority 设置消息优先级,优先级越高的消息将会被优先处理 +// - 当 priority 的数值越小时,优先级越高 +// - 当邮箱类型为非优先级邮箱 PriorityMailboxFactoryId 时,该可选项会被忽略 +func WithPriority(priority int64) MessageOption { + return func(options *MessageOptions) { + options.Priority = priority + } +} + // WithSender 设置消息发送者,发送者可以有利于对消息流向的追踪 func WithSender(sender ActorRef) MessageOption { return func(options *MessageOptions) {