Skip to content

Commit

Permalink
other: v2 vivid 增加消息重定向 FlowOf
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 28, 2024
1 parent 804810e commit 363ea4a
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 15 deletions.
2 changes: 1 addition & 1 deletion minotaur/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type AccountManager struct {
func (e *AccountManager) OnReceive(ctx vivid.MessageContext) {
switch ctx.GetMessage().(type) {
case vivid.OnPreStart:
ctx.GetContext().BindBehavior(vivid.BehaviorOf[transport.ConnOpenedEvent](e.onConnOpened))
ctx.BindBehavior(vivid.BehaviorOf[transport.ConnOpenedEvent](e.onConnOpened))
e.app.EventBus().Subscribe(pulse.SubscribeId(ctx.GetReceiver().Id()), ctx.GetReceiver(), transport.ConnOpenedEvent{})
}
}
Expand Down
6 changes: 4 additions & 2 deletions minotaur/pulse/event_bus_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ func (e *EventBusActor) onPublish(ctx vivid.MessageContext, m PublishMessage) {
})

// 处理优先级订阅者
vivid.ActorOf(ctx, vivid.NewActorOptions[*priorityEventActor]().WithConstruct(func() *priorityEventActor {
eventActor := vivid.ActorOf(ctx, vivid.NewActorOptions[*priorityEventActor]().WithConstruct(func() *priorityEventActor {
return &priorityEventActor{
subscribes: subscribePriorityList,
}
}())).Tell(priorityEventMessage{event: m.Event})
}()))
eventActor.Tell(priorityEventMessage{event: m.Event})
eventActor.Tell(vivid.OnDestroy{})
}
}
13 changes: 5 additions & 8 deletions minotaur/pulse/priority_event_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@ type priorityEventActor struct {
}

func (p *priorityEventActor) OnReceive(ctx vivid.MessageContext) {
switch m := ctx.GetMessage().(type) {
case Event:
for _, info := range p.subscribes {
var timeout time.Duration
if info.priorityTimeout != nil {
timeout = *info.priorityTimeout
}
info.subscriber.Ask(m, vivid.WithReplyTimeout(timeout))
for _, info := range p.subscribes {
var timeout time.Duration
if info.priorityTimeout != nil {
timeout = *info.priorityTimeout
}
info.subscriber.Ask(ctx.GetMessage(), vivid.WithReplyTimeout(timeout))
}
}
3 changes: 2 additions & 1 deletion minotaur/transport/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func newConn(eventBus *pulse.Pulse, server vivid.ActorContext, c net.Conn, write
conn := &conn{}
conn.reader = vivid.ActorOf[*ConnReadActor](server, vivid.NewActorOptions[*ConnReadActor]().WithConstruct(func() *ConnReadActor {
return &ConnReadActor{
conn: c,
conn: conn,
eventBus: eventBus,
}
}()))
Expand All @@ -24,6 +24,7 @@ func newConn(eventBus *pulse.Pulse, server vivid.ActorContext, c net.Conn, write
}

type Conn interface {
// Write 向连接内写入数据包
Write(packet Packet)
}

Expand Down
6 changes: 3 additions & 3 deletions minotaur/transport/conn_read_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package transport
import (
"github.com/kercylan98/minotaur/minotaur/pulse"
"github.com/kercylan98/minotaur/minotaur/vivid"
"net"
)

type (
Expand All @@ -14,13 +13,14 @@ type (

type (
ConnReceiveEvent struct {
Conn Conn
Packet Packet
}
)

type ConnReadActor struct {
actor vivid.ActorRef
conn net.Conn
conn Conn
eventBus *pulse.Pulse
}

Expand All @@ -34,5 +34,5 @@ func (c *ConnReadActor) OnReceive(ctx vivid.MessageContext) {
}

func (c *ConnReadActor) onConnReceiveMessage(ctx vivid.MessageContext, m connReceivePacketMessage) {
c.eventBus.Publish(c.actor, ConnReceiveEvent{Packet: m.Packet})
c.eventBus.Publish(c.actor, ConnReceiveEvent{Conn: c.conn, Packet: m.Packet})
}
18 changes: 18 additions & 0 deletions minotaur/vivid/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/kercylan98/minotaur/toolkit"
"github.com/kercylan98/minotaur/toolkit/charproc"
"path"
"reflect"
"sync"
"sync/atomic"
"time"
Expand All @@ -27,6 +28,8 @@ func NewActorSystem(name string) ActorSystem {
messageSeq: new(atomic.Uint64),
waitGroup: toolkit.NewDynamicWaitGroup(),
name: name,
flows: map[ActorId]map[reflect.Type]Flow{},
flowRW: new(sync.RWMutex),
}
s.core = new(_ActorSystemCore).init(&s)
s.ctx, s.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -62,6 +65,8 @@ type ActorSystem struct {
askWaits map[uint64]chan<- Message
askWaitsLock *sync.RWMutex
waitGroup *toolkit.DynamicWaitGroup
flows map[ActorId]map[reflect.Type]Flow // actor id -> message type -> flows
flowRW *sync.RWMutex

name string // ActorSystem 名称
network string // 网络类型
Expand Down Expand Up @@ -187,6 +192,7 @@ func (s *ActorSystem) sendToDispatcher(dispatcher Dispatcher, actor *_ActorCore,
if !dispatcher.Send(s.core, actor, message) {
actor.messageGroup.Done()
}

switch m := message.GetMessage().(type) {
case OnDestroy:
if !m.internal {
Expand All @@ -196,6 +202,18 @@ func (s *ActorSystem) sendToDispatcher(dispatcher Dispatcher, actor *_ActorCore,
}

func (s *ActorSystem) sendMessage(receiver ActorRef, message Message, options ...MessageOption) Message {
// 转发处理
messageType := reflect.TypeOf(message)
s.flowRW.RLock()
flows, exist := s.flows[receiver.Id()]
s.flowRW.RUnlock()
if exist {
flow, exist := flows[messageType]
if exist && flow.forward(message) {
return s.sendMessage(flow.dest(), message, options...)
}
}

var opts = new(MessageOptions).apply(options)

ctx := newMessageContext(s, message, opts.Priority, opts.Instantly, opts.reply)
Expand Down
48 changes: 48 additions & 0 deletions minotaur/vivid/flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package vivid

import "reflect"

// Flow 消息流,可用于控制消息的过滤及流向
type Flow interface {
// forward 消息是否需要转发
forward(m Message) bool

// dest 获取目标 Actor
dest() ActorRef
}

func FlowOf[T Message](system *ActorSystem, source ActorRef, target ActorRef, filter ...func(T) bool) Flow {
f := &flow[T]{
source: source,
target: target,
}
if len(filter) > 0 {
f.filter = filter[0]
}

system.flowRW.Lock()
defer system.flowRW.Unlock()

flows, exist := system.flows[source.Id()]
if !exist {
flows = map[reflect.Type]Flow{}
system.flows[source.Id()] = flows
}

flows[reflect.TypeOf((*T)(nil)).Elem()] = f
return f
}

type flow[T Message] struct {
source ActorRef
target ActorRef
filter func(T) bool
}

func (f *flow[T]) forward(m Message) bool {
return f.filter == nil || f.filter(m.(T))
}

func (f *flow[T]) dest() ActorRef {
return f.target
}
7 changes: 7 additions & 0 deletions minotaur/vivid/message_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type MessageContext interface {

// Instantly 是否是立即执行的消息
Instantly() bool

// BindBehavior 该函数是 ActorContext.BindBehavior 的快捷方式
BindBehavior(behavior Behavior)
}

func newMessageContext(system *ActorSystem, message Message, priority int64, instantly, hasReply bool) *_MessageContext {
Expand Down Expand Up @@ -213,3 +216,7 @@ func (c *_MessageContext) GetPriority() int64 {
func (c *_MessageContext) Instantly() bool {
return c.InstantlyExec
}

func (c *_MessageContext) BindBehavior(behavior Behavior) {
c.GetContext().BindBehavior(behavior)
}

0 comments on commit 363ea4a

Please sign in to comment.