Skip to content

Commit

Permalink
other: v2 vivid 实现优先级邮箱、 pulse 采用优先级邮箱
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 25, 2024
1 parent 9f92fd4 commit 96d6da9
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 43 deletions.
2 changes: 2 additions & 0 deletions pulse/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type EventBus struct {

func (e *EventBus) OnReceive(ctx vivid.MessageContext) {
switch m := ctx.GetMessage().(type) {
case vivid.OnOptionApply[*EventBus]:
m.Options.WithMailboxFactory(vivid.PriorityMailboxFactoryId)
case vivid.OnPreStart:
e.onStart()
case eventSubscribeMessage:
Expand Down
28 changes: 23 additions & 5 deletions pulse/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,40 @@ import (
"github.com/kercylan98/minotaur/pulse"
"github.com/kercylan98/minotaur/vivid"
"testing"
"time"
)

var tester *testing.T
var receiveEventCount int
var produceEventCount int

type TestActor struct {
eventBus vivid.ActorRef
producer vivid.ActorRef
}

func (t *TestActor) OnReceive(ctx vivid.MessageContext) {
switch m := ctx.GetMessage().(type) {
case TestActorBindEventBus:
t.eventBus = m.EventBus
case vivid.ActorRef:
t.producer = m
pulse.Subscribe[string](t.eventBus, m, ctx.GetReceiver(), "event-string")
m.Tell(true)
case string:
fmt.Println("receive event:", m)
receiveEventCount++
tester.Log("receive event:", m)
pulse.Unsubscribe[string](t.eventBus, t.producer, ctx.GetReceiver(), "event-string")
tester.Log("unsubscribe event")
case int:
// producer message
produceEventCount++
pulse.Publish(t.eventBus, ctx.GetReceiver(), fmt.Sprintf("event-%d", m))
case bool:
for i := 0; i < 10; i++ {
ctx.GetReceiver().Tell(i)
tester.Log("produce event:", i)
}
}
}

Expand All @@ -30,6 +47,7 @@ type TestActorBindEventBus struct {
}

func TestNewEventBus(t *testing.T) {
tester = t
system := vivid.NewActorSystem("test")

eventBus := vivid.ActorOf[*pulse.EventBus](&system, vivid.NewActorOptions[*pulse.EventBus]().WithName("event_bus"))
Expand All @@ -40,9 +58,9 @@ func TestNewEventBus(t *testing.T) {
subscriber.Tell(TestActorBindEventBus{EventBus: eventBus})
subscriber.Tell(producer)

for i := 0; i < 100; i++ {
producer.Tell(i)
}

time.Sleep(time.Second)
system.Shutdown()

t.Log("produce event count:", produceEventCount)
t.Log("receive event count:", receiveEventCount)
}
16 changes: 13 additions & 3 deletions pulse/pulse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,41 @@ package pulse

import (
"github.com/kercylan98/minotaur/vivid"
"math"
"reflect"
"sync/atomic"
)

var eventSeq atomic.Int64 // 全局事件序号,仅标记事件产生顺序

// Subscribe 订阅消息总线中来自特定生产者的特定事件,该事件可在 SubscribeId 不同的情况下重复订阅
// - 由于订阅的过程是异步的,订阅不会立即生效,而是在下一个事件循环中生效
func Subscribe[T Event](eventBus vivid.ActorRef, producer Producer, subscriber Subscriber, subscribeId SubscribeId) {
eventBus.Tell(eventSubscribeMessage{
producer: producer,
subscriber: subscriber,
event: reflect.TypeOf((*T)(nil)).Elem(),
subscribeId: subscribeId,
priority: 0,
})
}, vivid.WithPriority(math.MinInt64))
}

// Unsubscribe 取消订阅消息总线中来自特定生产者的特定事件
// - 由于取消订阅的过程是异步的,取消订阅不会立即生效,而是在下一个事件循环中生效,例如可能期望在收到第一个事件后取消订阅,但实际上可能会收到多个事件后才取消订阅。这是由于在取消订阅的过程中已经产生了多个事件并已经投递到了订阅者的邮箱中。
// - 如要确保取消订阅的实时性,可在订阅者中实现过滤器。
func Unsubscribe[T Event](eventBus vivid.ActorRef, producer Producer, subscriber Subscriber, subscribeId SubscribeId) {
eventBus.Tell(eventUnsubscribeMessage{
producer: producer,
subscriber: subscriber,
event: reflect.TypeOf((*T)(nil)).Elem(),
subscribeId: subscribeId,
})
}, vivid.WithPriority(math.MinInt64))
}

// Publish 发布事件到消息总线,消息总线会将事件投递给所有订阅者
func Publish(eventBus vivid.ActorRef, producer Producer, event Event) {
eventBus.Tell(eventPublishMessage{
producer: producer,
event: event,
})
}, vivid.WithPriority(eventSeq.Add(1)))
}
2 changes: 1 addition & 1 deletion vivid/actor_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func parseActorOptions[T Actor](options ...*ActorOptions[T]) *ActorOptions[T] {
opts.DispatcherId = DefaultDispatcherId
}
if opts.MailboxFactoryId == 0 {
opts.MailboxFactoryId = DefaultMailboxFactoryId
opts.MailboxFactoryId = FIFOMailboxFactoryId
}
return opts
}
47 changes: 27 additions & 20 deletions vivid/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,8 @@ func NewActorSystem(name string) ActorSystem {
}
s.ctx, s.cancel = context.WithCancel(context.Background())
s.BindDispatcher(new(_Dispatcher)) // default dispatcher
s.BindMailboxFactory(NewFIFOFactory(func(message MessageContext) {
// received message
core := message.GetReceiver().(*_LocalActorRef).core
defer func() {
core.messageGroup.Done()
if r := recover(); r != nil {
s.deadLetters.DeadLetter(NewDeadLetterEvent(DeadLetterEventTypeMessage, DeadLetterEventMessage{
Error: fmt.Errorf("%w: %v", ErrActorPanic, r),
To: core.GetId(),
Message: message,
}))
}
}()
if core.messageHook != nil && !core.messageHook(message) {
return
}
core.OnReceive(message)
}))
s.BindMailboxFactory(NewFIFOFactory(s.onProcessMailboxMessage))
s.BindMailboxFactory(NewPriorityFactory(s.onProcessMailboxMessage))
var err error
s.userGuard, err = generateActor(&s, new(UserGuardActor), parseActorOptions(NewActorOptions[*UserGuardActor]().WithName("user")))
if err != nil {
Expand Down Expand Up @@ -115,7 +99,7 @@ func (s *ActorSystem) BindMailboxFactory(f MailboxFactory) MailboxFactoryId {
}

func (s *ActorSystem) UnbindMailboxFactory(id MailboxFactoryId) {
if id == DefaultMailboxFactoryId {
if id == FIFOMailboxFactoryId {
return
}
s.mailboxFactorRW.Lock()
Expand Down Expand Up @@ -194,7 +178,7 @@ func (s *ActorSystem) getContext() *_ActorCore {
func (s *ActorSystem) sendMessage(receiver ActorRef, message Message, options ...MessageOption) Message {
var opts = new(MessageOptions).apply(options)

ctx := newMessageContext(s, message)
ctx := newMessageContext(s, message, opts.Priority)
switch ref := receiver.(type) {
case *_LocalActorRef:
ctx = ctx.withLocal(ref.core, opts.Sender)
Expand Down Expand Up @@ -297,11 +281,34 @@ func (s *ActorSystem) onProcessServerMessage(bytes []byte) {
}
}

func (s *ActorSystem) onProcessMailboxMessage(message MessageContext) {
// received message
core := message.GetReceiver().(*_LocalActorRef).core
defer func() {
core.messageGroup.Done()
if r := recover(); r != nil {
s.deadLetters.DeadLetter(NewDeadLetterEvent(DeadLetterEventTypeMessage, DeadLetterEventMessage{
Error: fmt.Errorf("%w: %v", ErrActorPanic, r),
To: core.GetId(),
Message: message,
}))
}
}()
if core.messageHook != nil && !core.messageHook(message) {
return
}
core.OnReceive(message)
}

func generateActor[T Actor](system *ActorSystem, actor T, options *ActorOptions[T]) (*_ActorCore, error) {
if options.Name == charproc.None {
options.Name = uuid.NewString()
}

optionsNum := len(options.options)
actor.OnReceive(newMessageContext(system, OnOptionApply[T]{Options: options}, 0).withLocal(nil, nil))
options.applyOption(options.options[optionsNum:]...)

var actorPath = options.Name
if options.Parent != nil {
actorPath = path.Join(options.Parent.GetId().Path(), options.Name)
Expand Down
3 changes: 2 additions & 1 deletion vivid/mailbox.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vivid

const DefaultMailboxFactoryId MailboxFactoryId = 1
const FIFOMailboxFactoryId MailboxFactoryId = 1
const PriorityMailboxFactoryId MailboxFactoryId = 2

type MailboxFactoryId = uint64

Expand Down
114 changes: 114 additions & 0 deletions vivid/mailbox_priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package vivid

import (
"container/heap"
"sync"
)

const (
priorityStateNone = priorityState(iota) // 未启动状态
priorityStateRunning // 运行中状态
priorityStateStopping // 停止中状态
priorityStateStopped // 已停止状态
)

const (
PriorityStopModeInstantly = PriorityStopMode(iota) // 立刻停止消息队列,新消息将不再接收,缓冲区内未处理的消息将被丢弃
PriorityStopModeGraceful // 优雅停止消息队列,新消息将不再接收,等待未处理的消息处理完毕后再停止
PriorityStopModeDrain // 新消息将继续被接收,等待消息队列处理完毕且没有新消息后再停止
)

type (
priorityState = int32 // 状态
PriorityStopMode = int8 // Priority 消息队列的停止模式,目前支持 PriorityStopModeInstantly、 PriorityStopModeGraceful、 PriorityStopModeDrain
)

func NewPriority(handler func(message MessageContext), opts ...*PriorityOptions) *Priority {
p := &Priority{
opts: NewPriorityOptions().Apply(opts...),
status: priorityStateNone,
cond: sync.NewCond(&sync.Mutex{}),
closed: make(chan struct{}),
handler: handler,
}
p.buffer = make(priorityHeap, 0, p.opts.BufferSize)
return p
}

type Priority struct {
opts *PriorityOptions // 配置
status priorityState // 队列状态
cond *sync.Cond // 消息队列条件变量
buffer priorityHeap // 消息缓冲区
closed chan struct{} // 关闭信号
handler func(message MessageContext) // 消息处理函数
}

func (p *Priority) Start() {
p.cond.L.Lock()
if p.status != priorityStateNone {
p.cond.L.Unlock()
return
}
p.status = priorityStateRunning
p.cond.L.Unlock()

p.closed = make(chan struct{})
go func(p *Priority) {
defer func(p *Priority) {
close(p.closed)
if err := recover(); err != nil {
panic(err)
}
}(p)

for {
p.cond.L.Lock()
if p.buffer.Len() == 0 {
if p.status == priorityStateStopping {
p.status = priorityStateStopped
p.cond.L.Unlock()
break
}
p.cond.Wait()
if p.buffer.Len() == 0 {
p.cond.L.Unlock()
continue
}
}
msg := heap.Pop(&p.buffer).(MessageContext)
p.cond.L.Unlock()

p.handler(msg)
}
}(p)
}

func (p *Priority) Stop() {
p.cond.L.Lock()
if p.status != priorityStateRunning {
p.cond.L.Unlock()
return
}
p.status = priorityStateStopping
p.cond.L.Unlock()
p.cond.Signal()
}

func (p *Priority) Enqueue(message MessageContext) bool {
p.cond.L.Lock()
if p.status != priorityStateRunning {
p.cond.L.Unlock()
return false
}
heap.Push(&p.buffer, message)
p.cond.L.Unlock()
p.cond.Signal()
return true
}

func (p *Priority) reset() {
p.cond.L.Lock()
p.buffer = p.buffer[:0]
p.cond.L.Unlock()
}
31 changes: 31 additions & 0 deletions vivid/mailbox_priority_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package vivid

import (
"github.com/kercylan98/minotaur/toolkit/pools"
)

func NewPriorityFactory(handler func(message MessageContext), opts ...*PriorityOptions) MailboxFactory {
var pool = pools.NewObjectPool[Priority](func() *Priority {
return NewPriority(handler, opts...)
}, func(data *Priority) {
data.reset()
})

return &PriorityFactory{pool: pool}
}

type PriorityFactory struct {
pool *pools.ObjectPool[*Priority]
}

func (P *PriorityFactory) Get() Mailbox {
return P.pool.Get()
}

func (P *PriorityFactory) Put(mailbox Mailbox) {
priority, ok := mailbox.(*Priority)
if !ok {
return
}
P.pool.Release(priority)
}
27 changes: 27 additions & 0 deletions vivid/mailbox_priority_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package vivid

type priorityHeap []MessageContext

func (h *priorityHeap) Len() int {
return len(*h)
}

func (h *priorityHeap) Less(i, j int) bool {
return (*h)[i].GetPriority() < (*h)[j].GetPriority()
}

func (h *priorityHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}

func (h *priorityHeap) Push(x any) {
*h = append(*h, x.(MessageContext))
}

func (h *priorityHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
Loading

0 comments on commit 96d6da9

Please sign in to comment.