Skip to content

Commit

Permalink
other: v2 vivid 增加事件总线
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Jun 12, 2024
1 parent fa08dab commit ffd79f3
Show file tree
Hide file tree
Showing 27 changed files with 232 additions and 208 deletions.
22 changes: 10 additions & 12 deletions minotaur/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package minotaur

import (
"context"
"github.com/kercylan98/minotaur/minotaur/pulse"
"github.com/kercylan98/minotaur/minotaur/transport"
"github.com/kercylan98/minotaur/minotaur/vivid"
"os"
Expand All @@ -14,15 +13,13 @@ func NewApplication(options ...Option) *Application {
opts := new(Options).apply(options...)

actorSystem := vivid.NewActorSystem(opts.ActorSystemName)
eventBus := pulse.NewPulse(&actorSystem, vivid.NewActorOptions[*pulse.EventBusActor]().WithName(opts.EventBusActorName))
ctx, cancel := context.WithCancel(actorSystem.Context())
return &Application{
options: opts,
ctx: ctx,
cancel: cancel,
closed: make(chan struct{}),
actorSystem: &actorSystem,
eventBus: &eventBus,
}
}

Expand All @@ -33,14 +30,20 @@ type Application struct {
cancel context.CancelFunc
closed chan struct{}
actorSystem *vivid.ActorSystem
eventBus *pulse.Pulse
server *transport.Server
server vivid.TypedActorRef[transport.ServerActorTyped]
}

func (a *Application) GetSystem() *vivid.ActorSystem {
return a.actorSystem
}

func (a *Application) GetServer() vivid.TypedActorRef[transport.ServerActorTyped] {
if a.server == nil {
panic("server actor not initialized or not launched, please with WithNetwork option to initialize it")
}
return a.server
}

func (a *Application) GetContext() vivid.ActorContext {
return a.actorSystem.GetContext()
}
Expand All @@ -51,9 +54,8 @@ func (a *Application) Launch() {
}(a)

if a.options.Network != nil {
srv := transport.NewServer(a.actorSystem, vivid.NewActorOptions[*transport.ServerActor]().WithName("server"))
a.server = &srv
a.server.Launch(a.eventBus, a.options.Network)
a.server = transport.NewServerActor(a.actorSystem, vivid.NewActorOptions[*transport.ServerActor]().WithName("server"))
a.server.Protocol().Launch(a.options.Network)
}

var systemSignal = make(chan os.Signal, 1)
Expand All @@ -70,10 +72,6 @@ func (a *Application) Shutdown() {
<-a.closed
}

func (a *Application) EventBus() *pulse.Pulse {
return a.eventBus
}

func (a *Application) ActorSystem() *vivid.ActorSystem {
return a.actorSystem
}
Expand Down
40 changes: 3 additions & 37 deletions minotaur/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,13 @@ import (
"github.com/kercylan98/minotaur/minotaur"
"github.com/kercylan98/minotaur/minotaur/transport"
"github.com/kercylan98/minotaur/minotaur/transport/network"
"github.com/kercylan98/minotaur/minotaur/vivid"
"testing"
)

type AccountManager struct {
*minotaur.Application
}

func (e *AccountManager) OnReceive(ctx vivid.MessageContext) {
switch ctx.GetMessage().(type) {
case vivid.OnBoot:
ctx.Become(vivid.BehaviorOf(e.onConnOpened))
e.EventBus().Subscribe("conn-opened", ctx, transport.ServerConnOpenedEvent{})
}
}

func (e *AccountManager) onConnOpened(ctx vivid.MessageContext, message transport.ServerConnOpenedEvent) {
e.ActorSystem().ActorOf(vivid.OfO(func(actorOptions *vivid.ActorOptions[*Account]) {
actorOptions.WithInit(func(account *Account) {
account.ConnActor = message.ConnActor
})
}))
}

type Account struct {
*transport.ConnActor
}

func (c *Account) OnReceive(ctx vivid.MessageContext) {
switch m := ctx.GetMessage().(type) {
case vivid.OnBoot:
c.ConnActor.OnReceive(ctx)
case transport.ConnReceivePacketMessage:
c.Write(m.Packet)
}
}

func TestNewApplication(t *testing.T) {
app := minotaur.NewApplication(minotaur.WithNetwork(network.WebSocket(":9988")))
vivid.ActorOf[*AccountManager](app.ActorSystem(), vivid.NewActorOptions[*AccountManager]().WithInit(func(manager *AccountManager) {
manager.Application = app
}))
app.GetServer().Protocol().SubscribeConnOpenedEvent("conn_opened", app.ActorSystem(), func(event transport.ServerConnOpenedEvent) {
t.Log("conn opened")
})
app.Launch()
}
8 changes: 0 additions & 8 deletions minotaur/pulse/producer.go

This file was deleted.

48 changes: 0 additions & 48 deletions minotaur/pulse/pulse.go

This file was deleted.

21 changes: 0 additions & 21 deletions minotaur/pulse/pulse_generic.go

This file was deleted.

7 changes: 0 additions & 7 deletions minotaur/pulse/subscriber.go

This file was deleted.

5 changes: 1 addition & 4 deletions minotaur/transport/conn_actor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package transport

import (
"github.com/kercylan98/minotaur/minotaur/pulse"
"github.com/kercylan98/minotaur/minotaur/vivid"
"net"
)
Expand All @@ -12,9 +11,8 @@ type (
}
)

func newConn(eventBus *pulse.Pulse, server vivid.ActorContext, c net.Conn, writer ConnWriter) *ConnActor {
func newConn(server vivid.ActorContext, c net.Conn, writer ConnWriter) *ConnActor {
conn := &ConnActor{
eventBus: eventBus,
server: server,
conn: c,
connWriter: writer,
Expand All @@ -35,7 +33,6 @@ type ConnCore interface {
}

type ConnActor struct {
eventBus *pulse.Pulse
server vivid.ActorContext
conn net.Conn
connWriter ConnWriter
Expand Down
7 changes: 7 additions & 0 deletions minotaur/transport/conn_typed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package transport

type ConnTyped interface {
Write(packet Packet)

Close()
}
3 changes: 3 additions & 0 deletions minotaur/transport/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ func NewPacket(data []byte) Packet {

// Packet 写入连接的数据包接口
type Packet interface {

// GetBytes 获取数据包字节流
GetBytes() []byte

// SetContext 设置数据包上下文,上下文通常是受特定 Network 实现所限制的
// - 在内置的 network.WebSocket 实现中,上下文被用于指定连接发送数据的操作码
SetContext(ctx any) Packet

// GetContext 获取数据包上下文
GetContext() any
}
Expand Down
26 changes: 0 additions & 26 deletions minotaur/transport/server.go

This file was deleted.

28 changes: 16 additions & 12 deletions minotaur/transport/server_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package transport

import (
"fmt"
"github.com/kercylan98/minotaur/minotaur/pulse"
"github.com/kercylan98/minotaur/minotaur/vivid"
"github.com/kercylan98/minotaur/toolkit/log"
"github.com/kercylan98/minotaur/toolkit/network"
Expand All @@ -13,8 +12,7 @@ import (
type (
// ServerLaunchMessage 服务器启动消息,服务器在收到该消息后将开始运行,运行过程是异步的
ServerLaunchMessage struct {
Network Network // 网络接口
EventBus *pulse.Pulse // 事件总线
Network Network // 网络接口
}

// ServerShutdownMessage 服务器关闭消息,服务器在收到该消息后将停止运行
Expand All @@ -40,18 +38,25 @@ type (
}
)

func NewServerActor(system *vivid.ActorSystem, options ...*vivid.ActorOptions[*ServerActor]) vivid.TypedActorRef[ServerActorTyped] {
ref := vivid.ActorOf[*ServerActor](system, options...)
return vivid.Typed[ServerActorTyped](ref, &ServerActorTypedImpl{
ref: ref,
})
}

type ServerActor struct {
ServerActorTyped
system *vivid.ActorSystem
network Network
pulse *pulse.Pulse
actor vivid.ActorRef
core *serverCore
connections map[net.Conn]*ConnActor
}

func (s *ServerActor) OnReceive(ctx vivid.MessageContext) {
switch m := ctx.GetMessage().(type) {
case vivid.OnBoot:
s.onPreStart(ctx)
s.onBoot(ctx)
case vivid.OnTerminate:
s.onServerShutdown(ctx, ServerShutdownMessage{})
case ServerLaunchMessage:
Expand All @@ -65,15 +70,14 @@ func (s *ServerActor) OnReceive(ctx vivid.MessageContext) {
}
}

func (s *ServerActor) onPreStart(ctx vivid.MessageContext) {
func (s *ServerActor) onBoot(ctx vivid.MessageContext) {
s.system = ctx.GetSystem()
s.connections = make(map[net.Conn]*ConnActor)
s.actor = ctx.GetRef()
s.core = new(serverCore).init(ctx.GetRef())
}

func (s *ServerActor) onServerLaunch(ctx vivid.MessageContext, m ServerLaunchMessage) {
s.network = m.Network
s.pulse = m.EventBus
ip, err := network.IP()
if err != nil {
panic(err)
Expand All @@ -96,10 +100,10 @@ func (s *ServerActor) onServerShutdown(ctx vivid.MessageContext, m ServerShutdow
}

func (s *ServerActor) onServerConnOpened(ctx vivid.MessageContext, m ServerConnOpenedMessage) {
conn := newConn(s.pulse, ctx.GetContext(), m.conn, m.writer)
conn := newConn(ctx.GetContext(), m.conn, m.writer)
s.connections[m.conn] = conn
ctx.Reply(ConnCore(conn))
s.pulse.Publish(s.actor, ServerConnOpenedEvent{ConnActor: conn})
ctx.GetSystem().Publish(ctx, ServerConnOpenedEvent{ConnActor: conn})
}

func (s *ServerActor) onServerConnClosed(ctx vivid.MessageContext, m ServerConnClosedMessage) {
Expand All @@ -113,6 +117,6 @@ func (s *ServerActor) onServerConnClosed(ctx vivid.MessageContext, m ServerConnC
conn.writer.Tell(vivid.OnTerminate{})
}

s.pulse.Publish(s.actor, ServerConnClosedEvent{ConnActor: conn})
ctx.GetSystem().Publish(ctx, ServerConnClosedEvent{ConnActor: conn})
}
}
Loading

0 comments on commit ffd79f3

Please sign in to comment.