Skip to content

Commit

Permalink
other: v2 actor 子 Actor、查询
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 15, 2024
1 parent d39d3e9 commit bcf8d66
Show file tree
Hide file tree
Showing 18 changed files with 539 additions and 351 deletions.
3 changes: 3 additions & 0 deletions vivid/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ type Actor interface {

// OnReceived 当 Actor 接收到消息时执行的逻辑
OnReceived(ctx MessageContext) error

// OnDestroy 当 Actor 被要求销毁时将会调用该函数,需要在该函数中释放 Actor 的资源
OnDestroy(ctx ActorContext) error
}
72 changes: 72 additions & 0 deletions vivid/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vivid

import (
"fmt"
"path"
"reflect"
)

Expand All @@ -19,15 +20,82 @@ type ActorContext interface {
// MatchBehavior 匹配 Actor 的行为,当匹配不到时将返回 nil,否则返回匹配到的行为执行器
// - 推荐使用 MatchBehavior 函数来匹配 Actor 的行为,这样可以保证行为的类型安全
MatchBehavior(messageType reflect.Type, ctx MessageContext) ActorBehaviorExecutor

// NotifyTerminated 当 Actor 主动销毁时,务必调用该函数,以便在整个 Actor 系统中得到完整的释放
NotifyTerminated()

// ActorOf 创建一个 Actor,该 Actor 是当前 Actor 的子 Actor
ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error)

// GetActor 获取 Actor 的引用
GetActor() Query
}

type actorContextState = uint8 // actorContext 的状态

// actorContext 是 Actor 的上下文
type actorContext struct {
system *ActorSystem // Actor 所属的 Actor 系统
id ActorId // Actor 的 ID
core *actorCore // Actor 的核心
state actorContextState // 上下文的状态
behaviors map[reflect.Type]reflect.Value // Actor 的行为,由消息类型到行为的映射
children map[ActorName]*actorCore // 子 Actor
isEnd bool // 是否是末级 Actor
}

func (c *actorContext) ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error) {
// 检查类型是否实现了 Actor 接口
if !typ.Implements(actorType) {
return nil, fmt.Errorf("%w: %s", ErrActorNotImplementActorRef, typ.String())
}
typ = typ.Elem()

// 应用可选项
opt := NewActorOptions().Apply(opts...)
if opt.Name == "" {
opt.Name = fmt.Sprintf("%s-%d", c.id.Name(), c.system.guid.Add(1))
}
actorPath := path.Join(c.id.Name(), opt.Name)
actorName := opt.Name

// 创建 Actor ID
actorId := NewActorId(c.id.Network(), c.id.Cluster(), c.id.Host(), c.id.Port(), c.id.System(), actorPath)

// 检查 Actor 是否已经存在
c.system.actorsRW.Lock()
defer c.system.actorsRW.Unlock()
actor, exist := c.system.actors[actorId]
if exist {
return nil, fmt.Errorf("%w: %s", ErrActorAlreadyExists, actorId.Name())
}

// 创建 Actor
actor = newActorCore(c.system, actorId, reflect.New(typ).Interface().(Actor), opt)

// 分发器
dispatcher := c.system.getActorDispatcher(actor)
if err := dispatcher.Attach(actor); err != nil {
return nil, err
}

// 启动 Actor
if err := actor.onPreStart(); err != nil {
return nil, err
}

// 绑定 Actor
c.system.actors[actorId] = actor
c.children[actorName] = actor

if opt.hookActorOf != nil {
opt.hookActorOf(actor)
}
return actor, nil
}

func (c *actorContext) GetActor() Query {
return newQuery(c.system, c.core)
}

// RegisterBehavior 注册 Actor 的行为
Expand All @@ -37,6 +105,10 @@ func RegisterBehavior[T any](ctx ActorContext, behavior ActorBehavior[T]) {
ctx.RegisterBehavior(messageType, behavior)
}

func (c *actorContext) NotifyTerminated() {
c.system.unregisterActor(c.id)
}

func (c *actorContext) RegisterBehavior(messageType reflect.Type, behavior any) {
if c.state != actorContextStatePreStart {
return
Expand Down
11 changes: 7 additions & 4 deletions vivid/actor_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ func newActorCore(system *ActorSystem, actorId ActorId, actor Actor, opts *Actor
}
core.ActorRef = newLocalActorRef(system, actorId)
core.actorContext = &actorContext{
system: system,
id: actorId,
core: core,
state: actorContextStatePreStart,
behaviors: make(map[reflect.Type]reflect.Value),
children: map[ActorName]*actorCore{},
}

return core
}

type actorCore struct {
Actor
ActorRef
*actorContext
opts *ActorOptions
Actor // 外部 Actor 实现
ActorRef // Actor 的引用
*actorContext // Actor 的上下文
opts *ActorOptions // Actor 的配置项
}

// onPreStart 在 Actor 启动之前执行的逻辑
Expand Down
61 changes: 46 additions & 15 deletions vivid/actor_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import (
"encoding/binary"
"fmt"
"github.com/kercylan98/minotaur/toolkit/convert"
"path/filepath"
"regexp"
"strconv"
"strings"
)

const (
actorIdPrefix = "minotaur"
actorIdPrefix = "minotaur"
actorIdParseLocal = `^` + actorIdPrefix + `://([^/@]+@)?([^/:]+):(\d+)/([^/]+)/([^/]+)$`
actorIdParseTcp = `^` + actorIdPrefix + `\.tcp://([^/:]+):(\d+)/([^/]+)/([^/]+)$`
actorIdParseTcpCluster = `^` + actorIdPrefix + `\.tcp://([^/:]+):(\d+)/([^/]+)/([^/]+)/([^/]+)$`
actorIdMinLength = 12
)

// ActorId 是一个 Actor 的唯一标识符,该标识符是由紧凑的不可读字符串组成,其中包含了 Actor 完整的资源定位信息
Expand All @@ -19,15 +24,18 @@ const (
// - minotaur.tcp://my-cluster@localhost:1234/user/my-localActorRef
type ActorId string

func NewActorId(network, cluster, host string, port uint16, system, name string) ActorId {
type ActorName = string
type ActorPath = string

func NewActorId(network, cluster, host string, port uint16, system, path ActorPath) ActorId {
networkLen := uint16(len(network))
clusterLen := uint16(len(cluster))
hostLen := uint16(len(host))
systemLen := uint16(len(system))
nameLen := uint16(len(name))
pathLen := uint16(len(path))

// 计算需要的字节数
size := networkLen + clusterLen + hostLen + systemLen + nameLen + 12 // 添加端口号和长度信息
size := networkLen + clusterLen + hostLen + systemLen + pathLen + 12 // 添加端口号和长度信息

// 分配内存
actorId := make([]byte, size)
Expand All @@ -42,7 +50,7 @@ func NewActorId(network, cluster, host string, port uint16, system, name string)
offset += 2
binary.BigEndian.PutUint16(actorId[offset:], systemLen)
offset += 2
binary.BigEndian.PutUint16(actorId[offset:], nameLen)
binary.BigEndian.PutUint16(actorId[offset:], pathLen)
offset += 2

// 写入网络信息
Expand All @@ -61,9 +69,9 @@ func NewActorId(network, cluster, host string, port uint16, system, name string)
copy(actorId[offset:], system)
offset += systemLen

// 写入名称信息
copy(actorId[offset:], name)
offset += nameLen
// 写入路径信息
copy(actorId[offset:], path)
offset += pathLen

// 写入端口信息
binary.BigEndian.PutUint16(actorId[offset:], port)
Expand All @@ -82,9 +90,9 @@ func ParseActorId(actorId string) (ActorId, error) {
var portStr string

// 定义正则表达式来匹配不同格式的 ActorId
re1 := regexp.MustCompile(`^` + actorIdPrefix + `://([^/@]+@)?([^/:]+):(\d+)/([^/]+)/([^/]+)$`)
re2 := regexp.MustCompile(`^` + actorIdPrefix + `\.tcp://([^/:]+):(\d+)/([^/]+)/([^/]+)$`)
re3 := regexp.MustCompile(`^` + actorIdPrefix + `://([^/]+)/([^/]+)/([^/]+)$`)
re1 := regexp.MustCompile(actorIdParseLocal)
re2 := regexp.MustCompile(actorIdParseTcp)
re3 := regexp.MustCompile(actorIdParseTcpCluster)

if matches := re1.FindStringSubmatch(actorId); matches != nil {
cluster = matches[1]
Expand Down Expand Up @@ -119,6 +127,24 @@ func ParseActorId(actorId string) (ActorId, error) {
return NewActorId(network, cluster, host, uint16(port), system, name), nil
}

// Invalid 检查 ActorId 是否无效
func (a ActorId) Invalid() bool {
if len(a) < actorIdMinLength {
return true
}
networkLen := binary.BigEndian.Uint16([]byte(a[:2]))
clusterLen := binary.BigEndian.Uint16([]byte(a[2:4]))
hostLen := binary.BigEndian.Uint16([]byte(a[4:6]))
systemLen := binary.BigEndian.Uint16([]byte(a[6:8]))
nameLen := binary.BigEndian.Uint16([]byte(a[8:10]))
totalLen := actorIdMinLength + networkLen + clusterLen + hostLen + systemLen + nameLen
if uint16(len(a)) < totalLen {
return true
}

return networkLen == 0 || hostLen == 0 || systemLen == 0 || nameLen == 0
}

// Network 获取 ActorId 的网络信息
func (a ActorId) Network() string {
length := binary.BigEndian.Uint16([]byte(a[:2]))
Expand Down Expand Up @@ -159,15 +185,20 @@ func (a ActorId) System() string {
return string(v)
}

// Name 获取 ActorId 的名称信息
func (a ActorId) Name() string {
// Path 获取 ActorId 的路径信息
func (a ActorId) Path() ActorPath {
networkLen := binary.BigEndian.Uint16([]byte(a[:2]))
clusterLen := binary.BigEndian.Uint16([]byte(a[2:4]))
hostLen := binary.BigEndian.Uint16([]byte(a[4:6]))
systemLen := binary.BigEndian.Uint16([]byte(a[6:8]))
nameLen := binary.BigEndian.Uint16([]byte(a[8:10]))
v := a[10+networkLen+clusterLen+hostLen+systemLen : 10+networkLen+clusterLen+hostLen+systemLen+nameLen]
return string(v)
return ActorPath(v)
}

// Name 获取 ActorId 的名称信息
func (a ActorId) Name() ActorName {
return filepath.Base(a.Path())
}

// String 获取 ActorId 的字符串表示
Expand All @@ -194,6 +225,6 @@ func (a ActorId) String() string {
builder.WriteString("/")
builder.WriteString(a.System())
builder.WriteString("/")
builder.WriteString(a.Name())
builder.WriteString(a.Path())
return builder.String()
}
8 changes: 8 additions & 0 deletions vivid/actor_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type ActorOptions struct {
Name string // Actor 名称
Mailbox func() *Mailbox // Actor 使用的邮箱
DispatcherName string // Actor 使用的调度器名称,如果为空则使用默认调度器

hookActorOf func(core *actorCore) // 内部 ActorOf 钩子
}

// Apply 应用配置项
Expand Down Expand Up @@ -55,3 +57,9 @@ func (o *ActorOptions) WithDispatcherName(name string) *ActorOptions {
o.DispatcherName = name
return o
}

// withHookActorOf 设置内部 ActorOf 钩子
func (o *ActorOptions) withHookActorOf(hook func(core *actorCore)) *ActorOptions {
o.hookActorOf = hook
return o
}
6 changes: 6 additions & 0 deletions vivid/actor_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package vivid

// actorStorage 是内部使用的 Actor 的存储器,用来存储当前系统中的所有 Actor
// - 存储器需要按照资源
type actorStorage struct {
}
Loading

0 comments on commit bcf8d66

Please sign in to comment.