Skip to content

Commit

Permalink
other: v2 actor 体验优化
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 16, 2024
1 parent 4da7377 commit a08d445
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 60 deletions.
1 change: 1 addition & 0 deletions vivid/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Actor interface {
OnReceived(ctx MessageContext) (err error)

// OnDestroy 当 Actor 被要求销毁时将会调用该函数,需要在该函数中释放 Actor 的资源
// - 该函数可能会在重启前被调用,被用于重置 Actor 的状态
OnDestroy(ctx ActorContext) (err error)

// OnChildTerminated 当 Actor 的子 Actor 被销毁时将会调用该函数
Expand Down
22 changes: 9 additions & 13 deletions vivid/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ type ActorContext interface {
// NotifyTerminated 当 Actor 主动销毁时,务必调用该函数,以便在整个 Actor 系统中得到完整的释放
NotifyTerminated(v ...Message)

// Spawn 创建一个 Actor,该 Actor 是当前 Actor 的子 Actor,该函数是 ActorOf 的简化版
Spawn(actor Actor, opts ...*ActorOptions) (ActorRef, error)

// ActorOf 创建一个 Actor,该 Actor 是当前 Actor 的子 Actor
ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error)
ActorOf(actor Actor, opts ...*ActorOptions) (ActorRef, error)

// GetActor 获取 Actor 的引用
GetActor() Query
Expand All @@ -51,7 +48,6 @@ type actorContext struct {
parent *actorCore // 父 Actor
children map[ActorName]*actorCore // 子 Actor
isEnd bool // 是否是末级 Actor
tof reflect.Type // Actor 的类型
}

// restart 重启上下文
Expand Down Expand Up @@ -80,9 +76,12 @@ func (c *actorContext) restart(reEntry bool) (recovery func(), err error) {
c.core.state = backupState
c.behaviors = backupBehaviors
}
c.core.Actor = reflect.New(c.core.tof).Interface().(Actor)
c.core.state = actorContextStatePreStart
c.core.actorContext.behaviors = make(map[reflect.Type]reflect.Value)
if err = c.core.Actor.OnDestroy(c.core); err != nil {
recoveryFunc()
return nil, err
}
if err = c.core.onPreStart(); err != nil {
recoveryFunc()
return nil, err
Expand All @@ -94,20 +93,17 @@ func (c *actorContext) bindChildren(core *actorCore) {
c.children[core.GetOptions().Name] = core
}

func (c *actorContext) ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error) {
func (c *actorContext) ActorOf(actor Actor, opts ...*ActorOptions) (ActorRef, error) {
var opt *ActorOptions
if len(opts) > 0 {
opt = opts[0]
} else {
}
if opt == nil {
opt = NewActorOptions()
}
opt = opt.WithParent(c)

return c.system.generateActor(typ, opt)
}

func (c *actorContext) Spawn(actor Actor, opts ...*ActorOptions) (ActorRef, error) {
return c.ActorOf(reflect.TypeOf(actor), opts...)
return c.system.generateActor(actor, opt)
}

func (c *actorContext) GetActor() Query {
Expand Down
4 changes: 1 addition & 3 deletions vivid/actor_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ActorCoreExpose interface {
GetOptions() *ActorOptions
}

func newActorCore(system *ActorSystem, actorId ActorId, actor Actor, typ reflect.Type, opts *ActorOptions) *actorCore {
func newActorCore(system *ActorSystem, actorId ActorId, actor Actor, opts *ActorOptions) *actorCore {
core := &actorCore{
Actor: actor,
opts: opts,
Expand All @@ -31,9 +31,7 @@ func newActorCore(system *ActorSystem, actorId ActorId, actor Actor, typ reflect
state: actorContextStatePreStart,
behaviors: make(map[reflect.Type]reflect.Value),
children: map[ActorName]*actorCore{},
tof: typ,
}

return core
}

Expand Down
7 changes: 0 additions & 7 deletions vivid/actor_generator.go

This file was deleted.

11 changes: 11 additions & 0 deletions vivid/actor_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ func TestActorId_System(t *testing.T) {
}
}

func TestActorId_Path(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

path := actorId.Path()
t.Log(path)

if path != "my-localActorRef" {
t.Fail()
}
}

func TestActorId_Name(t *testing.T) {
actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef")

Expand Down
33 changes: 14 additions & 19 deletions vivid/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/kercylan98/minotaur/toolkit/log"
"golang.org/x/net/context"
"path"
"reflect"
"sync"
"sync/atomic"
)
Expand Down Expand Up @@ -40,7 +39,7 @@ type ActorSystem struct {

// Run 非阻塞的运行 ActorSystem
func (s *ActorSystem) Run() (err error) {
s.user, err = s.generateActor(reflect.TypeOf((*userGuardianActor)(nil)), NewActorOptions().WithName("user"))
s.user, err = s.generateActor(new(userGuardianActor), NewActorOptions().WithName("user"))
if err != nil {
return err
}
Expand All @@ -62,20 +61,18 @@ func (s *ActorSystem) Shutdown() error {
s.unregisterActor(s.user, true)
delete(s.actors, s.user.GetId())

for _, d := range s.opts.Dispatchers {
d.Stop()
}

s.cancel()
return nil
}

// ActorOf 创建一个 Actor
func ActorOf[T Actor](generator ActorGenerator, opts ...*ActorOptions) (ActorRef, error) {
typ := reflect.TypeOf((*T)(nil)).Elem()
return generator.ActorOf(typ, opts...)
}

// ActorOf 创建一个 Actor
// - 推荐使用 ActorOf 函数来创建 Actor,这样可以保证 Actor 的类型安全
func (s *ActorSystem) ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error) {
return s.user.ActorOf(typ, opts...)
func (s *ActorSystem) ActorOf(actor Actor, opts ...*ActorOptions) (ActorRef, error) {
return s.user.ActorOf(actor, opts...)
}

// GetActor 获取 ActorRef
Expand Down Expand Up @@ -213,8 +210,8 @@ func (s *ActorSystem) handleRemoteMessage(ctx context.Context, c <-chan []byte)
func (s *ActorSystem) unregisterActor(core *actorCore, reEnter bool) {
if !reEnter {
s.actorsRW.RLock()
defer s.actorsRW.RUnlock()
}

for key, child := range core.children {
if err := child.OnDestroy(child.core); err != nil {
log.Error(fmt.Sprintf("unregister actor destroy error: %s", err.Error()))
Expand All @@ -229,20 +226,18 @@ func (s *ActorSystem) unregisterActor(core *actorCore, reEnter bool) {
delete(core.parent.children, core.GetOptions().Name)
}

if !reEnter {
s.actorsRW.RUnlock()
}

dispatcher := s.getActorDispatcher(core)
if err := dispatcher.Detach(core); err != nil {
log.Error(fmt.Sprintf("unregister actor detach error: %s", err.Error()))
return
}
}

func (s *ActorSystem) generateActor(typ reflect.Type, opts ...*ActorOptions) (*actorCore, error) {
// 检查类型是否实现了 Actor 接口
if !typ.Implements(actorType) {
return nil, fmt.Errorf("%w: %s", ErrActorNotImplementActorRef, typ.String())
}
typ = typ.Elem()

func (s *ActorSystem) generateActor(actorImpl Actor, opts ...*ActorOptions) (*actorCore, error) {
// 应用可选项
opt := NewActorOptions().Apply(opts...)

Expand Down Expand Up @@ -275,7 +270,7 @@ func (s *ActorSystem) generateActor(typ reflect.Type, opts ...*ActorOptions) (*a
}

// 创建 Actor
actor = newActorCore(s, actorId, reflect.New(typ).Interface().(Actor), typ, opt)
actor = newActorCore(s, actorId, actorImpl, opt)

// 分发器
dispatcher := s.getActorDispatcher(actor)
Expand Down
30 changes: 25 additions & 5 deletions vivid/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@ type Dispatcher interface {

// Detach 用于将一个 Actor 从调度器中移除
Detach(actor ActorCore) error

// Stop 用于停止调度器
Stop()
}

// newDispatcher 创建一个新的消息调度器
func newDispatcher() Dispatcher {
d := &dispatcher{
mailboxes: make(map[ActorId]*Mailbox),
mailboxes: make(map[ActorId]*Mailbox),
mailboxWait: map[ActorId]chan struct{}{},
}

return d
}

type dispatcher struct {
mailboxes map[ActorId]*Mailbox // ActorId -> Mailbox
mailboxesRW sync.RWMutex // 保护 mailboxes 的读写锁
mailboxes map[ActorId]*Mailbox // ActorId -> Mailbox
mailboxWait map[ActorId]chan struct{} // ActorId -> chan struct{}
mailboxesRW sync.RWMutex // 保护 mailboxes 的读写锁
wait sync.WaitGroup // 等待所有 Actor 关闭
}

func (d *dispatcher) Send(receiver ActorCore, msg MessageContext) error {
Expand All @@ -47,12 +53,14 @@ func (d *dispatcher) Attach(actor ActorCore) error {
// 为 Actor 创建一个邮箱
mailbox := opts.Mailbox()

wait := make(chan struct{})
d.mailboxesRW.Lock()
d.mailboxes[actor.GetId()] = mailbox
d.mailboxWait[actor.GetId()] = wait
d.mailboxesRW.Unlock()

go mailbox.Start()
go d.watchReceive(actor, mailbox.Dequeue())
go d.watchReceive(actor, wait, mailbox.Dequeue())
return nil
}

Expand All @@ -64,15 +72,27 @@ func (d *dispatcher) Detach(actor ActorCore) error {
d.mailboxesRW.Unlock()
return nil
}
wait := d.mailboxWait[actorId]
delete(d.mailboxes, actorId)
delete(d.mailboxWait, actorId)
d.mailboxesRW.Unlock()

d.wait.Add(1)
go func() { // 异步等待邮箱关闭
defer d.wait.Done()
<-wait
}()
mailbox.Stop()

return nil
}

func (d *dispatcher) watchReceive(actor ActorCore, dequeue <-chan MessageContext) {
func (d *dispatcher) Stop() {
d.wait.Wait()
}

func (d *dispatcher) watchReceive(actor ActorCore, wait chan struct{}, dequeue <-chan MessageContext) {
defer close(wait)
for ctx := range dequeue {
ctx.(*messageContext).ActorContext = actor
actor.OnReceived(ctx)
Expand Down
42 changes: 29 additions & 13 deletions vivid/test/local/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,42 @@ package main

import (
"fmt"
"github.com/kercylan98/minotaur/toolkit/chrono"
"github.com/kercylan98/minotaur/toolkit/random"
"github.com/kercylan98/minotaur/vivid"
"time"
)

type LocalTestActorChild struct {
vivid.BasicActor
state int
}

func (l *LocalTestActorChild) OnPreStart(ctx vivid.ActorContext) error {

return nil
}

func (l *LocalTestActorChild) OnReceived(ctx vivid.MessageContext) error {
l.state++
return nil
}

func (l *LocalTestActorChild) OnDestroy(ctx vivid.ActorContext) error {
l.state = 0
return nil
}

type LocalTestActor struct {
vivid.BasicActor
vm map[int]int
}

func (l *LocalTestActor) OnPreStart(ctx vivid.ActorContext) error {
l.vm = make(map[int]int)

return nil
}

func (l *LocalTestActor) OnReceived(ctx vivid.MessageContext) error {
l.vm[ctx.GetMessage().(int)] += ctx.GetMessage().(int)
fmt.Println(l.vm[ctx.GetMessage().(int)])
time.Sleep(time.Second * 3)
fmt.Println(ctx.GetMessage())
return nil
}

Expand All @@ -29,18 +47,16 @@ func main() {
panic(err)
}

for i := 0; i < 10; i++ {
ref, _ := vivid.ActorOf[*LocalTestActor](system)
for j := 0; j < 1000; j++ {
_ = ref.Tell(random.Int(0, 100))
}
ref, err := system.ActorOf(&LocalTestActor{}, nil)
if err != nil {
panic(err)
}

time.Sleep(time.Second * 5)
ref.Tell(123)

if err := system.Shutdown(); err != nil {
panic(err)
}

fmt.Println("Shutdown")
time.Sleep(chrono.Week)
}

0 comments on commit a08d445

Please sign in to comment.