Skip to content

Commit

Permalink
feat(vivid): add typed actor support and improve ask pattern
Browse files Browse the repository at this point in the history
Implement support for typed actors in the vivid actor system, allowing
for more type-safe interactions between actors and their references. This
update introduces a new message type for handling actor reference
type conversions and enhances the ask pattern to include typed responses.

Additionally, the changes improve the dead letter handling by adding
more detailed logging, and refactor the actor system shutdown process
to ensure all pending messages are properly processed before closure.
  • Loading branch information
kercylan98 committed Jun 18, 2024
1 parent 929d706 commit 4d12837
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 40 deletions.
3 changes: 3 additions & 0 deletions minotaur/vivid/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func onReceive(actor Actor, ctx MessageContext) {

// 空闲时间重置
switch ctx.GetMessage().(type) {
case onActorRefTyped:
ctx.Reply(ctx.GetActor())
return
case OnTerminate:
default:
core.refreshIdleTimeout()
Expand Down
4 changes: 3 additions & 1 deletion minotaur/vivid/actor_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package vivid

import "time"
import (
"time"
)

type ActorOption[T Actor] func(opts *ActorOptions[T])

Expand Down
11 changes: 9 additions & 2 deletions minotaur/vivid/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func (s *ActorSystem) Shutdown() {
s.scheduler.Close()
}

// AwaitShutdown 等待 Actor 系统关闭
func (s *ActorSystem) AwaitShutdown() {
s.waitGroup.Wait()
}

// GetSystem 获取 Actor 系统
func (s *ActorSystem) GetSystem() *ActorSystem {
return s
Expand Down Expand Up @@ -373,7 +378,6 @@ func (s *ActorSystem) sendMessage(receiver ActorRef, message Message, options ..
if opts.ContextHook != nil {
opts.ContextHook(ctx)
}
receiver.send(ctx)

// 等待回复
if opts.reply {
Expand All @@ -395,7 +399,7 @@ func (s *ActorSystem) sendMessage(receiver ActorRef, message Message, options ..
opts.ReplyTimeout = time.Second
}

waiter := make(chan Message)
waiter := make(chan Message, 1)
timeoutCtx, cancel := context.WithTimeout(s.ctx, opts.ReplyTimeout)
defer func(seq uint64, waiter chan Message, cancel context.CancelFunc) {
cancel()
Expand All @@ -408,6 +412,7 @@ func (s *ActorSystem) sendMessage(receiver ActorRef, message Message, options ..
s.askWaitsLock.Lock()
s.askWaits[ctx.GetSeq()] = waiter
s.askWaitsLock.Unlock()
receiver.send(ctx)

select {
case <-timeoutCtx.Done():
Expand All @@ -417,6 +422,8 @@ func (s *ActorSystem) sendMessage(receiver ActorRef, message Message, options ..
case reply := <-waiter:
return reply
}
} else {
receiver.send(ctx)
}

return nil
Expand Down
7 changes: 7 additions & 0 deletions minotaur/vivid/actor_typed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package vivid

// ActorTyped 是用于将 Actor 消息类型化的接口,该接口继承了 Actor 和 ActorRef
type ActorTyped interface {
Actor
ActorRef
}
2 changes: 2 additions & 0 deletions minotaur/vivid/dead_letter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package vivid
import (
"errors"
"github.com/kercylan98/minotaur/toolkit/buffer"
"github.com/kercylan98/minotaur/toolkit/log"
"runtime/debug"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -52,6 +53,7 @@ func (s *_DeadLetterStream) getEvents(typ DeadLetterEventType) *deadLetterEvents
}

func (s *_DeadLetterStream) DeadLetter(deadLetter DeadLetterEvent) {
log.Error("DeadLetter", log.Any("deadLetter", deadLetter))
events := s.getEvents(deadLetter.Type)

deadLetter.Seq = s.seq.Add(1)
Expand Down
5 changes: 5 additions & 0 deletions minotaur/vivid/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ type OnRestart struct {
type OnTerminate struct {
restart bool // 是否重启
}

// onActorRefTyped 该消息将在 ActorOfT 时发送,用于 ActorRef 的类型转换
type onActorRefTyped struct {
ref ActorRef
}
37 changes: 0 additions & 37 deletions minotaur/vivid/typed.go

This file was deleted.

24 changes: 24 additions & 0 deletions minotaur/vivid/vivid.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func ActorOfI[T Actor](actorOf ActorOwner, actor T, options ...func(options *Act
return ActorOf(actorOf, opts)
}

func ActorOfIT[A Actor, T ActorTyped](actorOf ActorOwner, actor A, options ...func(options *ActorOptions[A])) T {
ref := ActorOfI(actorOf, actor, options...)
ask := ref.Ask(onActorRefTyped{
ref: ref,
}, WithInstantly(true))
return ask.(T)
}

func ActorOfF[T Actor](actorOf ActorOwner, options ...func(options *ActorOptions[T])) ActorRef {
var opts = NewActorOptions[T]()
for _, opt := range options {
Expand All @@ -34,6 +42,22 @@ func ActorOfF[T Actor](actorOf ActorOwner, options ...func(options *ActorOptions
return ActorOf(actorOf, opts)
}

func ActorOfFT[A Actor, T ActorTyped](actorOf ActorOwner, actor A, options ...func(options *ActorOptions[A])) T {
ref := ActorOfI(actorOf, actor, options...)
ask := ref.Ask(onActorRefTyped{
ref: ref,
}, WithInstantly(true))
return ask.(T)
}

func ActorOfT[A Actor, T ActorTyped](actorOf ActorOwner, options ...*ActorOptions[A]) T {
ref := ActorOf(actorOf, options...)
ask := ref.Ask(onActorRefTyped{
ref: ref,
}, WithInstantly(true))
return ask.(T)
}

func ActorOf[T Actor](actorOf ActorOwner, options ...*ActorOptions[T]) ActorRef {
var opts = parseActorOptions(options...)
var ins = opts.Construct
Expand Down
38 changes: 38 additions & 0 deletions minotaur/vivid/vivid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package vivid_test

import (
"fmt"
"github.com/kercylan98/minotaur/minotaur/vivid"
"testing"
)

type TestActor struct {
vivid.ActorRef
}

type TestActorTyped interface {
vivid.ActorTyped
Println(string)
}

func (t *TestActor) OnReceive(ctx vivid.MessageContext) {
switch m := ctx.GetMessage().(type) {
case vivid.OnBoot:
t.ActorRef = ctx
case string:
fmt.Println(m)
}
}

func (t *TestActor) Println(s string) {
t.Tell(s)
}

func TestActorOfT(t *testing.T) {
system := vivid.NewActorSystem("test")
ref := vivid.ActorOfT[*TestActor, TestActorTyped](&system)

ref.Println("Hello, World!")

system.Shutdown()
}

0 comments on commit 4d12837

Please sign in to comment.