Skip to content

Commit

Permalink
其他更改
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Jun 19, 2024
1 parent 671ed64 commit 1d1cb49
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 10 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Minotaur

Minotaur 是一个通过 Actor 思想来设计的天然支持分布式的服务端开发支持库,主要目的是提供简单
的并发及分布式实现。其中不仅包含了对 Actor 的抽象,同时也包含了大量常用的工具函数,尽管主要是
面向于游戏开发,但是其中的大量功能也同样适用于 WEB 开发。

Minotaur 是一个用于服务端开发的支持库,其中采用了大量泛型设计,主要被用于游戏服务器开发,但由于拥有大量通用的功能,也常被用于 WEB 开发。
***

Expand Down
File renamed without changes.
File renamed without changes.
20 changes: 10 additions & 10 deletions minotaur/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ func NewApplication(options ...Option) *Application {
// Application 基于 Minotaur 的应用程序结构
type Application struct {
vivid.ActorRef
options *Options
ctx context.Context
cancel context.CancelFunc
closed chan struct{}
actorSystem *vivid.ActorSystem
server transport.ServerActorTyped
handlers []func(app *Application, ctx vivid.MessageContext)
options *Options
ctx context.Context
cancel context.CancelFunc
closed chan struct{}
actorSystem *vivid.ActorSystem
server transport.ServerActorTyped
onReceiveHandler []func(app *Application, ctx vivid.MessageContext)
}

func (a *Application) onReceive(ctx vivid.MessageContext) {
for _, handler := range a.handlers {
for _, handler := range a.onReceiveHandler {
handler(a, ctx)
}
}
Expand Down Expand Up @@ -72,11 +72,11 @@ func (a *Application) GetContext() vivid.ActorContext {
}

// Launch 启动应用程序
func (a *Application) Launch(handlers ...func(app *Application, ctx vivid.MessageContext)) {
func (a *Application) Launch(onReceive ...func(app *Application, ctx vivid.MessageContext)) {
defer func(a *Application) {
close(a.closed)
}(a)
a.handlers = handlers
a.onReceiveHandler = onReceive

if a.options.Network != nil {
a.server = transport.NewServerActor(a.actorSystem, vivid.NewActorOptions[*transport.ServerActor]().WithName("server"))
Expand Down
27 changes: 27 additions & 0 deletions minotaur/gateway/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package gateway

import (
"net"
"strconv"
)

type EndpointId = int64

type Endpoint struct {
Id EndpointId `json:"id"` // 端点标识
Host Host `json:"addr"` // 端点地址
Port Port `json:"port"` // 端点端口
Weight uint64 `json:"weight"` // 端点权重
}

func (e *Endpoint) GetId() EndpointId {
return e.Id
}

func (e *Endpoint) GetWeight() int {
return int(e.Weight)
}

func (e *Endpoint) GetAddress() Address {
return net.JoinHostPort(e.Host, strconv.Itoa(int(e.Port)))
}
85 changes: 85 additions & 0 deletions minotaur/gateway/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package gateway

import (
"context"
"sync"
"sync/atomic"
)

const (
gatewayStatusNone = status(iota)
gatewayStatusRunning
gatewayStatusStopped
)

type (
status = int32 // 状态
Address = string // 完整的网络地址
Host = string // 主机地址
Port = uint16 // 端口号
)

func New() *Gateway {
gateway := &Gateway{
listeners: make(map[Address]Listener),
wg: &sync.WaitGroup{},
status: gatewayStatusNone,
events: make(chan any, 1024),
}

gateway.ctx, gateway.cancel = context.WithCancel(context.Background())

return gateway
}

type Gateway struct {
ctx context.Context // 上下文
cancel context.CancelFunc // 取消函数
status status // 状态
events chan any // 事件
wg *sync.WaitGroup // 等待组
listeners map[Address]Listener // 监听器
}

func (g *Gateway) BindListener(listener Listener, cb ...func(err error)) {
g.events <- listenerBindEvent{
listener: listener,
callback: cb,
}
}

func (g *Gateway) Run() {
if !atomic.CompareAndSwapInt32(&g.status, gatewayStatusNone, gatewayStatusRunning) {
return
}
for {
select {
case <-g.ctx.Done():
return
case event := <-g.events:
switch e := event.(type) {
case listenerBindEvent:
g.onListenerBind(e)

}
}
}
}

func (g *Gateway) Stop() {
if !atomic.CompareAndSwapInt32(&g.status, gatewayStatusRunning, gatewayStatusStopped) {
return
}
g.cancel()
g.wg.Wait()
}

func (g *Gateway) onListenerBind(e listenerBindEvent) {
if err := e.listener.Start(g.ctx); err != nil {
for _, cb := range e.callback {
cb(err)
}
} else {
g.listeners[e.listener.Address()] = e.listener
}
}
23 changes: 23 additions & 0 deletions minotaur/gateway/gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package gateway_test

import (
"github.com/kercylan98/minotaur/minotaur/gateway"
"github.com/kercylan98/minotaur/minotaur/vivid"
"github.com/kercylan98/minotaur/toolkit/balancer"
"testing"
)

func TestGateway_BindListener(t *testing.T) {
system := vivid.NewActorSystem("gateway")
ref := system.ActorOf(vivid.OfO[*gateway.L4TCPListenerActor]())
ref.Tell(gateway.ListenerActorBindAddressMessage{Address: ":8080"})
ref.Tell(gateway.ListenerActorBindBalancerMessage{Balancer: balancer.NewConsistentHashWeight[gateway.EndpointId, *gateway.Endpoint](1)})
ref.Tell(gateway.ListenerActorBindEndpointMessage{Endpoint: &gateway.Endpoint{
Id: 1,
Host: "192.168.2.112",
Port: 10000,
Weight: 10,
}})

system.AwaitShutdown()
}
96 changes: 96 additions & 0 deletions minotaur/gateway/l4.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package gateway

import (
"errors"
"github.com/kercylan98/minotaur/minotaur/transport"
"github.com/kercylan98/minotaur/minotaur/transport/network"
"github.com/kercylan98/minotaur/minotaur/vivid"
"github.com/kercylan98/minotaur/toolkit/balancer"
"github.com/kercylan98/minotaur/toolkit/log"
"net"
)

type L4TCPListenerActor struct {
address Address
srv vivid.TypedActorRef[transport.ServerActorTyped]
endpoints balancer.Balancer[EndpointId, *Endpoint]
}

func (l *L4TCPListenerActor) OnReceive(ctx vivid.MessageContext) {
switch ctx.GetMessage().(type) {
case vivid.OnBoot:
ctx.Become(vivid.BehaviorOf(l.onBindAddress))
ctx.Become(vivid.BehaviorOf(l.onBindBalancer))
ctx.Become(vivid.BehaviorOf(l.onBindEndpoint))
ctx.Become(vivid.BehaviorOf(l.onConnectionOpened))
}
}

func (l *L4TCPListenerActor) onBindAddress(ctx vivid.MessageContext, message ListenerActorBindAddressMessage) {
if l.address == message.Address {
return
}
l.address = message.Address

if l.srv != nil {
l.srv.Stop()
l.srv = nil
}

l.srv = transport.NewServerActor(ctx.GetSystem(), vivid.NewActorOptions[*transport.ServerActor]().
WithSupervisor(func(message, reason vivid.Message) vivid.Directive {
return vivid.DirectiveRestart
}),
)

l.srv.Tell(transport.ServerLaunchMessage{
Network: network.Tcp(message.Address),
})

l.srv.Api().SubscribeConnOpenedEvent(ctx)
}

func (l *L4TCPListenerActor) onConnectionOpened(ctx vivid.MessageContext, message transport.ServerConnectionOpenedEvent) {
// 选择端点
endpoint, err := l.endpoints.Select()
if err != nil {
ctx.GetSystem().GetLogger().Error("SelectEndpoint", log.Err(err))
message.Conn.Stop()
return
}

// 连接到端点
tcpAddr, err := net.ResolveTCPAddr("tcp", endpoint.GetAddress())
if err != nil {
ctx.GetSystem().GetLogger().Error("ResolveTcpAddr", log.Err(err))
message.Conn.Stop()
return
}

endpointConn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
ctx.GetSystem().GetLogger().Error("DialTcp", log.Err(err))
message.Conn.Stop()
return
}

message.Conn.Api().SetTerminateHandler(func(ctx vivid.MessageContext, conn transport.Conn, message vivid.OnTerminate) {
endpointConn.Close()
})
message.Conn.Api().SetPacketHandler(func(ctx vivid.MessageContext, conn transport.Conn, packet transport.ConnectionReactPacketMessage) {
endpointConn.Write(packet.Packet.GetBytes())
})
}

func (l *L4TCPListenerActor) onBindBalancer(ctx vivid.MessageContext, message ListenerActorBindBalancerMessage) {
l.endpoints = message.Balancer
}

func (l *L4TCPListenerActor) onBindEndpoint(ctx vivid.MessageContext, message ListenerActorBindEndpointMessage) {
if l.endpoints == nil {
ctx.GetSystem().GetLogger().Error("BindEndpoint", log.Err(errors.New("endpoints balancer is nil")))
return
}

l.endpoints.Add(message.Endpoint)
}
43 changes: 43 additions & 0 deletions minotaur/gateway/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package gateway

import (
"context"
"github.com/kercylan98/minotaur/toolkit/balancer"
)

type ListenerActorBindAddressMessage struct {
Address Address
}

type ListenerActorBindBalancerMessage struct {
Balancer balancer.Balancer[EndpointId, *Endpoint]
}

type ListenerActorBindEndpointMessage struct {
Endpoint *Endpoint
}

type Listener interface {
// Start 启动监听器开始监听端口活动
Start(ctx context.Context) error

// Stop 停止监听器
Stop() error

// SetAddress 设置监听器的地址
SetAddress(addr Address)

// Address 获取监听器的地址
Address() Address

// AddEndpoint 添加一个端点
AddEndpoint(endpoint *Endpoint)

// RemoveEndpoint 移除一个端点
RemoveEndpoint(id EndpointId)
}

type listenerBindEvent struct {
listener Listener
callback []func(err error)
}

0 comments on commit 1d1cb49

Please sign in to comment.