From bd92df019d28288fed86c3e066fb442ac8f8433c Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 12 Jun 2024 20:13:58 +0800 Subject: [PATCH] =?UTF-8?q?other:=20v2=20vivid=20=E4=BD=93=E9=AA=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- minotaur/application.go | 4 ++ minotaur/application_test.go | 18 ++++++-- minotaur/options.go | 8 ++++ minotaur/transport/conn_actor.go | 15 +++--- minotaur/transport/server_actor.go | 23 +++++++++- minotaur/transport/server_actor_typed.go | 12 ++--- minotaur/transport/server_test.go | 58 ------------------------ 7 files changed, 62 insertions(+), 76 deletions(-) delete mode 100644 minotaur/transport/server_test.go diff --git a/minotaur/application.go b/minotaur/application.go index 4f24b76..8b694ca 100644 --- a/minotaur/application.go +++ b/minotaur/application.go @@ -58,6 +58,10 @@ func (a *Application) Launch() { a.server.Protocol().Launch(a.options.Network) } + for _, hook := range a.options.LaunchedHooks { + hook(a) + } + var systemSignal = make(chan os.Signal, 1) signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) select { diff --git a/minotaur/application_test.go b/minotaur/application_test.go index 8e91ee4..9aa3467 100644 --- a/minotaur/application_test.go +++ b/minotaur/application_test.go @@ -4,13 +4,23 @@ import ( "github.com/kercylan98/minotaur/minotaur" "github.com/kercylan98/minotaur/minotaur/transport" "github.com/kercylan98/minotaur/minotaur/transport/network" + "github.com/kercylan98/minotaur/minotaur/vivid" "testing" ) func TestNewApplication(t *testing.T) { - app := minotaur.NewApplication(minotaur.WithNetwork(network.WebSocket(":9988"))) - app.GetServer().Protocol().SubscribeConnOpenedEvent("conn_opened", app.ActorSystem(), func(event transport.ServerConnOpenedEvent) { - t.Log("conn opened") - }) + app := minotaur.NewApplication( + minotaur.WithNetwork(network.WebSocket(":9988")), + minotaur.WithLaunchedHook(func(app *minotaur.Application) { + app.GetServer(). + Protocol(). + SubscribeConnOpenedEvent("conn_opened", + func(ctx vivid.MessageContext, event transport.ServerConnOpenedEvent) { + t.Log("conn opened") + }, + ) + }), + ) + app.Launch() } diff --git a/minotaur/options.go b/minotaur/options.go index 40866a1..0a10877 100644 --- a/minotaur/options.go +++ b/minotaur/options.go @@ -12,6 +12,8 @@ type Options struct { ActorSystemName string // Actor 系统名称 EventBusActorName string // 事件总线 Actor 名称 Network transport.Network // 网络 + + LaunchedHooks []func(app *Application) // 启动钩子 } // defaultApply 设置缺省值 @@ -35,6 +37,12 @@ func (o *Options) apply(options ...Option) *Options { return o.defaultApply() } +func WithLaunchedHook(hooks ...func(app *Application)) Option { + return func(o *Options) { + o.LaunchedHooks = append(o.LaunchedHooks, hooks...) + } +} + func WithLogger(logger *log.Logger) Option { return func(o *Options) { o.Logger = logger diff --git a/minotaur/transport/conn_actor.go b/minotaur/transport/conn_actor.go index 5fcf847..fe06d60 100644 --- a/minotaur/transport/conn_actor.go +++ b/minotaur/transport/conn_actor.go @@ -1,6 +1,7 @@ package transport import ( + "fmt" "github.com/kercylan98/minotaur/minotaur/vivid" "net" ) @@ -44,12 +45,14 @@ func (c *ConnActor) OnReceive(ctx vivid.MessageContext) { switch ctx.GetMessage().(type) { case vivid.OnBoot: c.reader = ctx.GetRef() - c.writer = vivid.ActorOf[*ConnWriteActor](c.server, vivid.NewActorOptions[*ConnWriteActor]().WithConstruct(func() *ConnWriteActor { - return &ConnWriteActor{ - conn: c.conn, - writer: c.connWriter, - } - }())) + c.writer = vivid.ActorOf[*ConnWriteActor](c.server, vivid.NewActorOptions[*ConnWriteActor](). + WithName(fmt.Sprintf("conn-write-%s", c.conn.RemoteAddr().String())). + WithConstruct(func() *ConnWriteActor { + return &ConnWriteActor{ + conn: c.conn, + writer: c.connWriter, + } + }())) case ConnReceivePacketMessage: } diff --git a/minotaur/transport/server_actor.go b/minotaur/transport/server_actor.go index 4086358..c9b83ed 100644 --- a/minotaur/transport/server_actor.go +++ b/minotaur/transport/server_actor.go @@ -26,6 +26,18 @@ type ( ServerConnClosedMessage struct { conn net.Conn } + + ServerSubscribeConnOpenedMessage struct { + SubscribeId vivid.SubscribeId + Handler func(ctx vivid.MessageContext, event ServerConnOpenedEvent) + Options []vivid.SubscribeOption + } + + ServerSubscribeConnClosedMessage struct { + SubscribeId vivid.SubscribeId + Handler func(ctx vivid.MessageContext, event ServerConnClosedEvent) + Options []vivid.SubscribeOption + } ) type ( @@ -47,7 +59,6 @@ func NewServerActor(system *vivid.ActorSystem, options ...*vivid.ActorOptions[*S type ServerActor struct { ServerActorTyped - system *vivid.ActorSystem network Network core *serverCore connections map[net.Conn]*ConnActor @@ -67,11 +78,16 @@ func (s *ServerActor) OnReceive(ctx vivid.MessageContext) { s.onServerConnOpened(ctx, m) case ServerConnClosedMessage: s.onServerConnClosed(ctx, m) + case ServerSubscribeConnOpenedMessage: + ctx.Become(vivid.BehaviorOf[ServerConnOpenedEvent](m.Handler)) + ctx.GetSystem().Subscribe(m.SubscribeId, ctx, ServerConnOpenedEvent{}) + case ServerSubscribeConnClosedMessage: + ctx.Become(vivid.BehaviorOf[ServerConnClosedEvent](m.Handler)) + ctx.GetSystem().Subscribe(m.SubscribeId, ctx, ServerConnClosedEvent{}) } } func (s *ServerActor) onBoot(ctx vivid.MessageContext) { - s.system = ctx.GetSystem() s.connections = make(map[net.Conn]*ConnActor) s.core = new(serverCore).init(ctx.GetRef()) } @@ -101,6 +117,9 @@ func (s *ServerActor) onServerShutdown(ctx vivid.MessageContext, m ServerShutdow func (s *ServerActor) onServerConnOpened(ctx vivid.MessageContext, m ServerConnOpenedMessage) { conn := newConn(ctx.GetContext(), m.conn, m.writer) + vivid.ActorOfI(ctx, conn, func(options *vivid.ActorOptions[*ConnActor]) { + options.WithName(fmt.Sprintf("conn-%s", m.conn.RemoteAddr().String())) + }) s.connections[m.conn] = conn ctx.Reply(ConnCore(conn)) ctx.GetSystem().Publish(ctx, ServerConnOpenedEvent{ConnActor: conn}) diff --git a/minotaur/transport/server_actor_typed.go b/minotaur/transport/server_actor_typed.go index 547361c..f2463a5 100644 --- a/minotaur/transport/server_actor_typed.go +++ b/minotaur/transport/server_actor_typed.go @@ -12,10 +12,10 @@ type ServerActorTyped interface { Shutdown() // SubscribeConnOpenedEvent 订阅连接打开事件 - SubscribeConnOpenedEvent(subscribeId vivid.SubscribeId, subscriber vivid.Subscriber, handler func(ServerConnOpenedEvent), options ...vivid.SubscribeOption) + SubscribeConnOpenedEvent(subscribeId vivid.SubscribeId, handler func(ctx vivid.MessageContext, event ServerConnOpenedEvent), options ...vivid.SubscribeOption) // SubscribeConnClosedEvent 订阅连接关闭事件 - SubscribeConnClosedEvent(subscribeId vivid.SubscribeId, subscriber vivid.Subscriber, handler func(ServerConnClosedEvent), options ...vivid.SubscribeOption) + SubscribeConnClosedEvent(subscribeId vivid.SubscribeId, handler func(ctx vivid.MessageContext, event ServerConnClosedEvent), options ...vivid.SubscribeOption) } type ServerActorTypedImpl struct { @@ -30,10 +30,10 @@ func (s *ServerActorTypedImpl) Shutdown() { s.ref.Tell(ServerShutdownMessage{}) } -func (s *ServerActorTypedImpl) SubscribeConnOpenedEvent(subscribeId vivid.SubscribeId, subscriber vivid.Subscriber, handler func(ServerConnOpenedEvent), options ...vivid.SubscribeOption) { - s.ref.GetSystem().Subscribe(subscribeId, subscriber, handler, options...) +func (s *ServerActorTypedImpl) SubscribeConnOpenedEvent(subscribeId vivid.SubscribeId, handler func(ctx vivid.MessageContext, event ServerConnOpenedEvent), options ...vivid.SubscribeOption) { + s.ref.Tell(ServerSubscribeConnOpenedMessage{SubscribeId: subscribeId, Handler: handler, Options: options}) } -func (s *ServerActorTypedImpl) SubscribeConnClosedEvent(subscribeId vivid.SubscribeId, subscriber vivid.Subscriber, handler func(ServerConnClosedEvent), options ...vivid.SubscribeOption) { - s.ref.GetSystem().Subscribe(subscribeId, subscriber, handler, options...) +func (s *ServerActorTypedImpl) SubscribeConnClosedEvent(subscribeId vivid.SubscribeId, handler func(ctx vivid.MessageContext, event ServerConnClosedEvent), options ...vivid.SubscribeOption) { + s.ref.Tell(ServerSubscribeConnClosedMessage{SubscribeId: subscribeId, Handler: handler, Options: options}) } diff --git a/minotaur/transport/server_test.go b/minotaur/transport/server_test.go deleted file mode 100644 index daeb0b0..0000000 --- a/minotaur/transport/server_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package transport_test - -import ( - "github.com/kercylan98/minotaur/minotaur/transport" - "github.com/kercylan98/minotaur/minotaur/transport/network" - "github.com/kercylan98/minotaur/minotaur/vivid" - "os" - "os/signal" - "syscall" - "testing" -) - -type Conn struct { - *transport.ConnActor -} - -func (c *Conn) OnReceive(ctx vivid.MessageContext) { - switch m := ctx.GetMessage().(type) { - case vivid.OnBoot: - c.ConnActor.OnReceive(ctx) - case transport.ConnReceivePacketMessage: - c.Write(m.Packet) - } -} - -type AccountManager struct { - eventBus *vivid.Pulse -} - -func (e *AccountManager) OnReceive(ctx vivid.MessageContext) { - switch m := ctx.GetMessage().(type) { - case vivid.OnBoot: - e.eventBus.Subscribe(vivid.SubscribeId(ctx.GetRef().Id()), ctx.GetRef(), transport.ServerConnOpenedEvent{}) - case transport.ServerConnOpenedEvent: - vivid.ActorOf[*Conn](ctx, vivid.NewActorOptions[*Conn]().WithInit(func(conn *Conn) { - conn.ConnActor = m.ConnActor - })) - - } -} - -func TestNewServer(t *testing.T) { - system := vivid.NewActorSystem("test-srv") - eventBus := vivid.NewPulse(&system) - srv := transport.NewServer(&system) - srv.Launch(&eventBus, network.WebSocket(":8899")) - - vivid.ActorOf[*AccountManager](&system, vivid.NewActorOptions[*AccountManager]().WithInit(func(manager *AccountManager) { - manager.eventBus = &eventBus - })) - - var s = make(chan os.Signal, 1) - signal.Notify(s, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) - <-s - - system.Shutdown() - t.Log("Server shutdown") -}