diff --git a/vivid/actor.go b/vivid/actor.go index c9bd062..07cb553 100644 --- a/vivid/actor.go +++ b/vivid/actor.go @@ -15,6 +15,7 @@ type Actor interface { OnReceived(ctx MessageContext) (err error) // OnDestroy 当 Actor 被要求销毁时将会调用该函数,需要在该函数中释放 Actor 的资源 + // - 该函数可能会在重启前被调用,被用于重置 Actor 的状态 OnDestroy(ctx ActorContext) (err error) // OnChildTerminated 当 Actor 的子 Actor 被销毁时将会调用该函数 diff --git a/vivid/actor_context.go b/vivid/actor_context.go index f67ddbc..ce5b728 100644 --- a/vivid/actor_context.go +++ b/vivid/actor_context.go @@ -23,11 +23,8 @@ type ActorContext interface { // NotifyTerminated 当 Actor 主动销毁时,务必调用该函数,以便在整个 Actor 系统中得到完整的释放 NotifyTerminated(v ...Message) - // Spawn 创建一个 Actor,该 Actor 是当前 Actor 的子 Actor,该函数是 ActorOf 的简化版 - Spawn(actor Actor, opts ...*ActorOptions) (ActorRef, error) - // ActorOf 创建一个 Actor,该 Actor 是当前 Actor 的子 Actor - ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error) + ActorOf(actor Actor, opts ...*ActorOptions) (ActorRef, error) // GetActor 获取 Actor 的引用 GetActor() Query @@ -51,7 +48,6 @@ type actorContext struct { parent *actorCore // 父 Actor children map[ActorName]*actorCore // 子 Actor isEnd bool // 是否是末级 Actor - tof reflect.Type // Actor 的类型 } // restart 重启上下文 @@ -80,9 +76,12 @@ func (c *actorContext) restart(reEntry bool) (recovery func(), err error) { c.core.state = backupState c.behaviors = backupBehaviors } - c.core.Actor = reflect.New(c.core.tof).Interface().(Actor) c.core.state = actorContextStatePreStart c.core.actorContext.behaviors = make(map[reflect.Type]reflect.Value) + if err = c.core.Actor.OnDestroy(c.core); err != nil { + recoveryFunc() + return nil, err + } if err = c.core.onPreStart(); err != nil { recoveryFunc() return nil, err @@ -94,20 +93,17 @@ func (c *actorContext) bindChildren(core *actorCore) { c.children[core.GetOptions().Name] = core } -func (c *actorContext) ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error) { +func (c *actorContext) ActorOf(actor Actor, opts ...*ActorOptions) (ActorRef, error) { var opt *ActorOptions if len(opts) > 0 { opt = opts[0] - } else { + } + if opt == nil { opt = NewActorOptions() } opt = opt.WithParent(c) - return c.system.generateActor(typ, opt) -} - -func (c *actorContext) Spawn(actor Actor, opts ...*ActorOptions) (ActorRef, error) { - return c.ActorOf(reflect.TypeOf(actor), opts...) + return c.system.generateActor(actor, opt) } func (c *actorContext) GetActor() Query { diff --git a/vivid/actor_core.go b/vivid/actor_core.go index de884ba..9975d55 100644 --- a/vivid/actor_core.go +++ b/vivid/actor_core.go @@ -18,7 +18,7 @@ type ActorCoreExpose interface { GetOptions() *ActorOptions } -func newActorCore(system *ActorSystem, actorId ActorId, actor Actor, typ reflect.Type, opts *ActorOptions) *actorCore { +func newActorCore(system *ActorSystem, actorId ActorId, actor Actor, opts *ActorOptions) *actorCore { core := &actorCore{ Actor: actor, opts: opts, @@ -31,9 +31,7 @@ func newActorCore(system *ActorSystem, actorId ActorId, actor Actor, typ reflect state: actorContextStatePreStart, behaviors: make(map[reflect.Type]reflect.Value), children: map[ActorName]*actorCore{}, - tof: typ, } - return core } diff --git a/vivid/actor_generator.go b/vivid/actor_generator.go deleted file mode 100644 index 7332fd8..0000000 --- a/vivid/actor_generator.go +++ /dev/null @@ -1,7 +0,0 @@ -package vivid - -import "reflect" - -type ActorGenerator interface { - ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error) -} diff --git a/vivid/actor_id_test.go b/vivid/actor_id_test.go index bc0e6fc..6b4601f 100644 --- a/vivid/actor_id_test.go +++ b/vivid/actor_id_test.go @@ -60,6 +60,17 @@ func TestActorId_System(t *testing.T) { } } +func TestActorId_Path(t *testing.T) { + actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef") + + path := actorId.Path() + t.Log(path) + + if path != "my-localActorRef" { + t.Fail() + } +} + func TestActorId_Name(t *testing.T) { actorId := vivid.NewActorId("tcp", "my-cluster", "localhost", 1234, "my-system", "my-localActorRef") diff --git a/vivid/actor_system.go b/vivid/actor_system.go index 4ba58d0..24f2635 100644 --- a/vivid/actor_system.go +++ b/vivid/actor_system.go @@ -5,7 +5,6 @@ import ( "github.com/kercylan98/minotaur/toolkit/log" "golang.org/x/net/context" "path" - "reflect" "sync" "sync/atomic" ) @@ -40,7 +39,7 @@ type ActorSystem struct { // Run 非阻塞的运行 ActorSystem func (s *ActorSystem) Run() (err error) { - s.user, err = s.generateActor(reflect.TypeOf((*userGuardianActor)(nil)), NewActorOptions().WithName("user")) + s.user, err = s.generateActor(new(userGuardianActor), NewActorOptions().WithName("user")) if err != nil { return err } @@ -62,20 +61,18 @@ func (s *ActorSystem) Shutdown() error { s.unregisterActor(s.user, true) delete(s.actors, s.user.GetId()) + for _, d := range s.opts.Dispatchers { + d.Stop() + } + s.cancel() return nil } -// ActorOf 创建一个 Actor -func ActorOf[T Actor](generator ActorGenerator, opts ...*ActorOptions) (ActorRef, error) { - typ := reflect.TypeOf((*T)(nil)).Elem() - return generator.ActorOf(typ, opts...) -} - // ActorOf 创建一个 Actor // - 推荐使用 ActorOf 函数来创建 Actor,这样可以保证 Actor 的类型安全 -func (s *ActorSystem) ActorOf(typ reflect.Type, opts ...*ActorOptions) (ActorRef, error) { - return s.user.ActorOf(typ, opts...) +func (s *ActorSystem) ActorOf(actor Actor, opts ...*ActorOptions) (ActorRef, error) { + return s.user.ActorOf(actor, opts...) } // GetActor 获取 ActorRef @@ -213,8 +210,8 @@ func (s *ActorSystem) handleRemoteMessage(ctx context.Context, c <-chan []byte) func (s *ActorSystem) unregisterActor(core *actorCore, reEnter bool) { if !reEnter { s.actorsRW.RLock() - defer s.actorsRW.RUnlock() } + for key, child := range core.children { if err := child.OnDestroy(child.core); err != nil { log.Error(fmt.Sprintf("unregister actor destroy error: %s", err.Error())) @@ -229,6 +226,10 @@ func (s *ActorSystem) unregisterActor(core *actorCore, reEnter bool) { delete(core.parent.children, core.GetOptions().Name) } + if !reEnter { + s.actorsRW.RUnlock() + } + dispatcher := s.getActorDispatcher(core) if err := dispatcher.Detach(core); err != nil { log.Error(fmt.Sprintf("unregister actor detach error: %s", err.Error())) @@ -236,13 +237,7 @@ func (s *ActorSystem) unregisterActor(core *actorCore, reEnter bool) { } } -func (s *ActorSystem) generateActor(typ reflect.Type, opts ...*ActorOptions) (*actorCore, error) { - // 检查类型是否实现了 Actor 接口 - if !typ.Implements(actorType) { - return nil, fmt.Errorf("%w: %s", ErrActorNotImplementActorRef, typ.String()) - } - typ = typ.Elem() - +func (s *ActorSystem) generateActor(actorImpl Actor, opts ...*ActorOptions) (*actorCore, error) { // 应用可选项 opt := NewActorOptions().Apply(opts...) @@ -275,7 +270,7 @@ func (s *ActorSystem) generateActor(typ reflect.Type, opts ...*ActorOptions) (*a } // 创建 Actor - actor = newActorCore(s, actorId, reflect.New(typ).Interface().(Actor), typ, opt) + actor = newActorCore(s, actorId, actorImpl, opt) // 分发器 dispatcher := s.getActorDispatcher(actor) diff --git a/vivid/dispatcher.go b/vivid/dispatcher.go index 3274f22..e8b34ca 100644 --- a/vivid/dispatcher.go +++ b/vivid/dispatcher.go @@ -14,20 +14,26 @@ type Dispatcher interface { // Detach 用于将一个 Actor 从调度器中移除 Detach(actor ActorCore) error + + // Stop 用于停止调度器 + Stop() } // newDispatcher 创建一个新的消息调度器 func newDispatcher() Dispatcher { d := &dispatcher{ - mailboxes: make(map[ActorId]*Mailbox), + mailboxes: make(map[ActorId]*Mailbox), + mailboxWait: map[ActorId]chan struct{}{}, } return d } type dispatcher struct { - mailboxes map[ActorId]*Mailbox // ActorId -> Mailbox - mailboxesRW sync.RWMutex // 保护 mailboxes 的读写锁 + mailboxes map[ActorId]*Mailbox // ActorId -> Mailbox + mailboxWait map[ActorId]chan struct{} // ActorId -> chan struct{} + mailboxesRW sync.RWMutex // 保护 mailboxes 的读写锁 + wait sync.WaitGroup // 等待所有 Actor 关闭 } func (d *dispatcher) Send(receiver ActorCore, msg MessageContext) error { @@ -47,12 +53,14 @@ func (d *dispatcher) Attach(actor ActorCore) error { // 为 Actor 创建一个邮箱 mailbox := opts.Mailbox() + wait := make(chan struct{}) d.mailboxesRW.Lock() d.mailboxes[actor.GetId()] = mailbox + d.mailboxWait[actor.GetId()] = wait d.mailboxesRW.Unlock() go mailbox.Start() - go d.watchReceive(actor, mailbox.Dequeue()) + go d.watchReceive(actor, wait, mailbox.Dequeue()) return nil } @@ -64,15 +72,27 @@ func (d *dispatcher) Detach(actor ActorCore) error { d.mailboxesRW.Unlock() return nil } + wait := d.mailboxWait[actorId] delete(d.mailboxes, actorId) + delete(d.mailboxWait, actorId) d.mailboxesRW.Unlock() + d.wait.Add(1) + go func() { // 异步等待邮箱关闭 + defer d.wait.Done() + <-wait + }() mailbox.Stop() return nil } -func (d *dispatcher) watchReceive(actor ActorCore, dequeue <-chan MessageContext) { +func (d *dispatcher) Stop() { + d.wait.Wait() +} + +func (d *dispatcher) watchReceive(actor ActorCore, wait chan struct{}, dequeue <-chan MessageContext) { + defer close(wait) for ctx := range dequeue { ctx.(*messageContext).ActorContext = actor actor.OnReceived(ctx) diff --git a/vivid/test/local/main.go b/vivid/test/local/main.go index 8228d21..d4b94be 100644 --- a/vivid/test/local/main.go +++ b/vivid/test/local/main.go @@ -2,24 +2,42 @@ package main import ( "fmt" - "github.com/kercylan98/minotaur/toolkit/chrono" - "github.com/kercylan98/minotaur/toolkit/random" "github.com/kercylan98/minotaur/vivid" "time" ) +type LocalTestActorChild struct { + vivid.BasicActor + state int +} + +func (l *LocalTestActorChild) OnPreStart(ctx vivid.ActorContext) error { + + return nil +} + +func (l *LocalTestActorChild) OnReceived(ctx vivid.MessageContext) error { + l.state++ + return nil +} + +func (l *LocalTestActorChild) OnDestroy(ctx vivid.ActorContext) error { + l.state = 0 + return nil +} + type LocalTestActor struct { vivid.BasicActor - vm map[int]int } func (l *LocalTestActor) OnPreStart(ctx vivid.ActorContext) error { - l.vm = make(map[int]int) + return nil } + func (l *LocalTestActor) OnReceived(ctx vivid.MessageContext) error { - l.vm[ctx.GetMessage().(int)] += ctx.GetMessage().(int) - fmt.Println(l.vm[ctx.GetMessage().(int)]) + time.Sleep(time.Second * 3) + fmt.Println(ctx.GetMessage()) return nil } @@ -29,18 +47,16 @@ func main() { panic(err) } - for i := 0; i < 10; i++ { - ref, _ := vivid.ActorOf[*LocalTestActor](system) - for j := 0; j < 1000; j++ { - _ = ref.Tell(random.Int(0, 100)) - } + ref, err := system.ActorOf(&LocalTestActor{}, nil) + if err != nil { + panic(err) } - time.Sleep(time.Second * 5) + ref.Tell(123) + if err := system.Shutdown(); err != nil { panic(err) } fmt.Println("Shutdown") - time.Sleep(chrono.Week) }