Skip to content

Commit

Permalink
other: v2 actor 优化
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 15, 2024
1 parent d83738b commit a246abd
Show file tree
Hide file tree
Showing 24 changed files with 555 additions and 380 deletions.
56 changes: 2 additions & 54 deletions vivid/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,60 +9,8 @@ var actorType = reflect.TypeOf((*Actor)(nil)).Elem()
// Actor 是 Actor 模型的接口,该接口用于定义一个 Actor
type Actor interface {
// OnPreStart 在 Actor 启动之前执行的逻辑,适用于对 Actor 状态的初始化
OnPreStart(ctx *ActorContext) error
OnPreStart(ctx ActorContext) error

// OnReceived 当 Actor 接收到消息时执行的逻辑
OnReceived(msg Message) error
}

// localActor 实现 Actor 模型的核心逻辑
type localActor struct {
opts *ActorOptions
actor Actor
ctx *ActorContext
mailbox *Mailbox
}

func (a *localActor) init(opts *ActorOptions, id ActorId, actor Actor, systemGetter actorSystemGetter) *localActor {
a.opts = opts
a.actor = actor
a.ctx = new(ActorContext).init(id, a, systemGetter)
a.mailbox = opts.Mailbox()

go a.mailbox.Start()
return a
}

func (a *localActor) GetId() ActorId {
return a.ctx.id
}

func (a *localActor) Tell(msg Message, opts ...MessageOption) error {
system := a.ctx.GetSystem()
return system.tell(a, msg, opts...)
}

func (a *localActor) Stop() error {
a.mailbox.Stop()
return nil
}

// remoteActor 实现 Actor 模型的远程调用逻辑
type remoteActor struct {
id ActorId
system *ActorSystem // 仅用于调用 tell 方法,并非 Actor 真正的所属系统
}

func (a *remoteActor) init(system *ActorSystem, id ActorId) *remoteActor {
a.id = id
a.system = system
return a
}

func (a *remoteActor) GetId() ActorId {
return a.id
}

func (a *remoteActor) Tell(msg Message, opts ...MessageOption) error {
return a.system.tell(a, msg, opts...)
OnReceived(ctx MessageContext) error
}
2 changes: 1 addition & 1 deletion vivid/actor_behavior.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package vivid

// ActorBehavior 是 Actor 的行为,用于处理 Actor 接收到的消息
type ActorBehavior[T any] func(ctx *ActorContext, msg T) error
type ActorBehavior[T any] func(ctx MessageContext) error

// ActorBehaviorExecutor 是 Actor 的行为执行器
type ActorBehaviorExecutor func() error
Expand Down
61 changes: 25 additions & 36 deletions vivid/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,36 @@ const (
actorContextStateStarted // 已启动
)

type actorContextState = uint8 // ActorContext 的状态

// ActorContext 是 Actor 的上下文
type ActorContext struct {
id ActorId // Actor 的 ID
ref ActorRef // Actor 的引用
vof reflect.Value // 上下文的值类型
state actorContextState // 上下文的状态
behaviors map[reflect.Type]reflect.Value // Actor 的行为,由消息类型到行为的映射
systemGetter actorSystemGetter // 获取 ActorSystem 的函数
type ActorContext interface {
// RegisterBehavior 注册 Actor 的行为,行为是一个函数,用于处理 Actor 接收到的消息
// - 推荐使用 RegisterBehavior 函数来注册 Actor 的行为,这样可以保证行为的类型安全
// - 该函数仅在 Actor.OnPreStart 阶段有效
RegisterBehavior(messageType reflect.Type, behavior any)

// MatchBehavior 匹配 Actor 的行为,当匹配不到时将返回 nil,否则返回匹配到的行为执行器
// - 推荐使用 MatchBehavior 函数来匹配 Actor 的行为,这样可以保证行为的类型安全
MatchBehavior(messageType reflect.Type, message any) ActorBehaviorExecutor
}

func (ctx *ActorContext) init(id ActorId, ref ActorRef, systemGetter actorSystemGetter) *ActorContext {
ctx.id = id
ctx.ref = ref
ctx.vof = reflect.ValueOf(ctx)
ctx.state = actorContextStatePreStart
ctx.behaviors = make(map[reflect.Type]reflect.Value)
ctx.systemGetter = systemGetter
return ctx
}
type actorContextState = uint8 // actorContext 的状态

// GetSystem 获取 ActorSystem
func (ctx *ActorContext) GetSystem() *ActorSystem {
return ctx.systemGetter()
// actorContext 是 Actor 的上下文
type actorContext struct {
id ActorId // Actor 的 ID
vof reflect.Value // 上下文的值类型
state actorContextState // 上下文的状态
behaviors map[reflect.Type]reflect.Value // Actor 的行为,由消息类型到行为的映射
}

// RegisterBehavior 注册 Actor 的行为
// - 该函数仅在 Actor.OnPreStart 阶段有效
func RegisterBehavior[T any](ctx *ActorContext, behavior ActorBehavior[T]) {
func RegisterBehavior[T any](ctx ActorContext, behavior ActorBehavior[T]) {
messageType := reflect.TypeOf((*T)(nil)).Elem()
ctx.RegisterBehavior(messageType, behavior)
}

// RegisterBehavior 注册 Actor 的行为,行为是一个函数,用于处理 Actor 接收到的消息
// - 推荐使用 RegisterBehavior 函数来注册 Actor 的行为,这样可以保证行为的类型安全
// - 该函数仅在 Actor.OnPreStart 阶段有效
func (ctx *ActorContext) RegisterBehavior(messageType reflect.Type, behavior any) {
if ctx.state != actorContextStatePreStart {
func (c *actorContext) RegisterBehavior(messageType reflect.Type, behavior any) {
if c.state != actorContextStatePreStart {
return
}

Expand All @@ -61,7 +52,7 @@ func (ctx *ActorContext) RegisterBehavior(messageType reflect.Type, behavior any
}

// 检查行为参数是否符合要求
if behaviorType.NumIn() != 2 || behaviorType.In(0) != reflect.TypeOf(ctx) {
if behaviorType.NumIn() != 1 || behaviorType.In(0) != messageContextType {
panic(fmt.Errorf("%w: %s", ErrActorBehaviorInvalid, behaviorType))
}

Expand All @@ -70,26 +61,24 @@ func (ctx *ActorContext) RegisterBehavior(messageType reflect.Type, behavior any
panic(fmt.Errorf("%w: %s", ErrActorBehaviorInvalid, behaviorType))
}

ctx.behaviors[messageType] = behaviorValue
c.behaviors[messageType] = behaviorValue
}

// MatchBehavior 匹配 Actor 的行为,当匹配不到时将返回 nil,否则返回匹配到的行为执行器
func MatchBehavior[T any](ctx *ActorContext, message T) ActorBehaviorExecutor {
func MatchBehavior[T any](ctx ActorContext, message T) ActorBehaviorExecutor {
messageType := reflect.TypeOf(message)
return ctx.MatchBehavior(messageType, message)
}

// MatchBehavior 匹配 Actor 的行为,当匹配不到时将返回 nil,否则返回匹配到的行为执行器
// - 推荐使用 MatchBehavior 函数来匹配 Actor 的行为,这样可以保证行为的类型安全
func (ctx *ActorContext) MatchBehavior(messageType reflect.Type, message any) ActorBehaviorExecutor {
behaviorValue, ok := ctx.behaviors[messageType]
func (c *actorContext) MatchBehavior(messageType reflect.Type, message any) ActorBehaviorExecutor {
behaviorValue, ok := c.behaviors[messageType]

if !ok {
return nil
}

return func() error {
result := behaviorValue.Call([]reflect.Value{ctx.vof, reflect.ValueOf(message)})
result := behaviorValue.Call([]reflect.Value{c.vof, reflect.ValueOf(message)})
switch v := result[0].Interface().(type) {
case nil:
return nil
Expand Down
55 changes: 55 additions & 0 deletions vivid/actor_core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package vivid

import "reflect"

// ActorCore 是 Actor 的核心接口定义,被用于高级功能的实现
type ActorCore interface {
Actor
ActorRef
ActorContext
ActorCoreExpose
}

// ActorCoreExpose 额外的暴露项
type ActorCoreExpose interface {
GetOptions() *ActorOptions
}

func newActorCore(system *ActorSystem, actorId ActorId, actor Actor, opts *ActorOptions) *actorCore {
core := &actorCore{
Actor: actor,
opts: opts,
}
core.ActorRef = newLocalActorRef(system, actorId)
core.actorContext = &actorContext{
id: actorId,
vof: reflect.Value{},
state: actorContextStatePreStart,
behaviors: make(map[reflect.Type]reflect.Value),
}
core.actorContext.vof = reflect.ValueOf(core.actorContext)

return core
}

type actorCore struct {
Actor
ActorRef
*actorContext
opts *ActorOptions
}

// onPreStart 在 Actor 启动之前执行的逻辑
func (a *actorCore) onPreStart() error {
if err := a.Actor.OnPreStart(a); err != nil {
return err
}

a.state = actorContextStateStarted
return nil
}

// GetOptions 获取 Actor 的配置项
func (a *actorCore) GetOptions() *ActorOptions {
return a.opts
}
12 changes: 6 additions & 6 deletions vivid/actor_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ const (
)

// ActorId 是一个 Actor 的唯一标识符,该标识符是由紧凑的不可读字符串组成,其中包含了 Actor 完整的资源定位信息
// - minotaur://my-system/user/my-localActor
// - minotaur.tcp://localhost:1234/user/my-localActor
// - minotaur.tcp://my-cluster@localhost:1234/user/my-localActor
// - minotaur://my-system/user/my-localActorRef
// - minotaur.tcp://localhost:1234/user/my-localActorRef
// - minotaur.tcp://my-cluster@localhost:1234/user/my-localActorRef
type ActorId string

func NewActorId(network, cluster, host string, port uint16, system, name string) ActorId {
Expand Down Expand Up @@ -73,9 +73,9 @@ func NewActorId(network, cluster, host string, port uint16, system, name string)
}

// ParseActorId 用于解析可读的 ActorId 字符串为 ActorId 对象
// - minotaur://my-system/user/my-localActor
// - minotaur.tcp://localhost:1234/user/my-localActor
// - minotaur.tcp://my-cluster@localhost:1234/user/my-localActor
// - minotaur://my-system/user/my-localActorRef
// - minotaur.tcp://localhost:1234/user/my-localActorRef
// - minotaur.tcp://my-cluster@localhost:1234/user/my-localActorRef
func ParseActorId(actorId string) (ActorId, error) {
var network, cluster, host, system, name string
var port int
Expand Down
4 changes: 2 additions & 2 deletions vivid/actor_id_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (

func BenchmarkNewActorId(b *testing.B) {
for i := 0; i < b.N; i++ {
vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")
}
}

func BenchmarkActorIdInfo(b *testing.B) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

b.Run("Network", func(b *testing.B) {
for i := 0; i < b.N; i++ {
Expand Down
20 changes: 10 additions & 10 deletions vivid/actor_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestActorId_Network(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

network := actorId.Network()
t.Log(network)
Expand All @@ -17,7 +17,7 @@ func TestActorId_Network(t *testing.T) {
}

func TestActorId_Cluster(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

cluster := actorId.Cluster()
t.Log(cluster)
Expand All @@ -28,7 +28,7 @@ func TestActorId_Cluster(t *testing.T) {
}

func TestActorId_Host(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

host := actorId.Host()
t.Log(host)
Expand All @@ -39,7 +39,7 @@ func TestActorId_Host(t *testing.T) {
}

func TestActorId_Port(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

port := actorId.Port()
t.Log(port)
Expand All @@ -50,7 +50,7 @@ func TestActorId_Port(t *testing.T) {
}

func TestActorId_System(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

system := actorId.System()
t.Log(system)
Expand All @@ -61,30 +61,30 @@ func TestActorId_System(t *testing.T) {
}

func TestActorId_Name(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

name := actorId.Name()
t.Log(name)

if name != "my-localActor" {
if name != "my-localActorRef" {
t.Fail()
}
}

func TestActorId_String(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActor")
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

str := actorId.String()
t.Log(str)

if str != "minotaur.tcp://my-cluster@localhost:1234/my-system/my-localActor" {
if str != "minotaur.tcp://my-cluster@localhost:1234/my-system/my-localActorRef" {
t.Fail()

}
}

func TestActorId_Parse(t *testing.T) {
actorId := "minotaur.tcp://my-cluster@localhost:1234/my-system/my-localActor"
actorId := "minotaur.tcp://my-cluster@localhost:1234/my-system/my-localActorRef"

parsed, err := vivid.ParseActorId(actorId)
if err != nil {
Expand Down
16 changes: 13 additions & 3 deletions vivid/actor_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ func NewActorOptions() *ActorOptions {

func defaultMailBox() func() *Mailbox {
return func() *Mailbox {
return NewMailbox(queues.NewFIFO[Message]())
return NewMailbox(queues.NewFIFO[MessageContext]())
}
}

// ActorOptions 是 Actor 的配置项
type ActorOptions struct {
Name string // Actor 名称
Mailbox func() *Mailbox // Actor 使用的邮箱
Name string // Actor 名称
Mailbox func() *Mailbox // Actor 使用的邮箱
DispatcherName string // Actor 使用的调度器名称,如果为空则使用默认调度器
}

// Apply 应用配置项
Expand All @@ -30,6 +31,9 @@ func (o *ActorOptions) Apply(opts ...*ActorOptions) *ActorOptions {
if opt.Mailbox != nil {
o.Mailbox = opt.Mailbox
}
if opt.DispatcherName != "" {
o.DispatcherName = opt.DispatcherName
}
}
return o
}
Expand All @@ -45,3 +49,9 @@ func (o *ActorOptions) WithMailbox(mailbox func() *Mailbox) *ActorOptions {
o.Mailbox = mailbox
return o
}

// WithDispatcherName 设置 Actor 使用的调度器名称
func (o *ActorOptions) WithDispatcherName(name string) *ActorOptions {
o.DispatcherName = name
return o
}
Loading

0 comments on commit a246abd

Please sign in to comment.