diff --git a/README.md b/README.md index 801d25b5..bdd7d258 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # Minotaur +Minotaur 是一个通过 Actor 思想来设计的天然支持分布式的服务端开发支持库,主要目的是提供简单 +的并发及分布式实现。其中不仅包含了对 Actor 的抽象,同时也包含了大量常用的工具函数,尽管主要是 +面向于游戏开发,但是其中的大量功能也同样适用于 WEB 开发。 + Minotaur 是一个用于服务端开发的支持库,其中采用了大量泛型设计,主要被用于游戏服务器开发,但由于拥有大量通用的功能,也常被用于 WEB 开发。 *** diff --git a/example/internal/login/main.go b/examples/internal/login/main.go similarity index 100% rename from example/internal/login/main.go rename to examples/internal/login/main.go diff --git a/example/internal/login/mods/account/account.go b/examples/internal/login/mods/account/account.go similarity index 100% rename from example/internal/login/mods/account/account.go rename to examples/internal/login/mods/account/account.go diff --git a/example/internal/login/mods/router/router.go b/examples/internal/login/mods/router/router.go similarity index 100% rename from example/internal/login/mods/router/router.go rename to examples/internal/login/mods/router/router.go diff --git a/example/internal/login/types/account.go b/examples/internal/login/types/account.go similarity index 100% rename from example/internal/login/types/account.go rename to examples/internal/login/types/account.go diff --git a/minotaur/application.go b/minotaur/application.go index 0d63f0e0..1c1c6a96 100644 --- a/minotaur/application.go +++ b/minotaur/application.go @@ -28,17 +28,17 @@ func NewApplication(options ...Option) *Application { // Application 基于 Minotaur 的应用程序结构 type Application struct { vivid.ActorRef - options *Options - ctx context.Context - cancel context.CancelFunc - closed chan struct{} - actorSystem *vivid.ActorSystem - server transport.ServerActorTyped - handlers []func(app *Application, ctx vivid.MessageContext) + options *Options + ctx context.Context + cancel context.CancelFunc + closed chan struct{} + actorSystem *vivid.ActorSystem + server transport.ServerActorTyped + onReceiveHandler []func(app *Application, ctx vivid.MessageContext) } func (a *Application) onReceive(ctx vivid.MessageContext) { - for _, handler := range a.handlers { + for _, handler := range a.onReceiveHandler { handler(a, ctx) } } @@ -72,11 +72,11 @@ func (a *Application) GetContext() vivid.ActorContext { } // Launch 启动应用程序 -func (a *Application) Launch(handlers ...func(app *Application, ctx vivid.MessageContext)) { +func (a *Application) Launch(onReceive ...func(app *Application, ctx vivid.MessageContext)) { defer func(a *Application) { close(a.closed) }(a) - a.handlers = handlers + a.onReceiveHandler = onReceive if a.options.Network != nil { a.server = transport.NewServerActor(a.actorSystem, vivid.NewActorOptions[*transport.ServerActor]().WithName("server")) diff --git a/minotaur/gateway/endpoint.go b/minotaur/gateway/endpoint.go new file mode 100644 index 00000000..016c8a4b --- /dev/null +++ b/minotaur/gateway/endpoint.go @@ -0,0 +1,27 @@ +package gateway + +import ( + "net" + "strconv" +) + +type EndpointId = int64 + +type Endpoint struct { + Id EndpointId `json:"id"` // 端点标识 + Host Host `json:"addr"` // 端点地址 + Port Port `json:"port"` // 端点端口 + Weight uint64 `json:"weight"` // 端点权重 +} + +func (e *Endpoint) GetId() EndpointId { + return e.Id +} + +func (e *Endpoint) GetWeight() int { + return int(e.Weight) +} + +func (e *Endpoint) GetAddress() Address { + return net.JoinHostPort(e.Host, strconv.Itoa(int(e.Port))) +} diff --git a/minotaur/gateway/gateway.go b/minotaur/gateway/gateway.go new file mode 100644 index 00000000..09bd211d --- /dev/null +++ b/minotaur/gateway/gateway.go @@ -0,0 +1,85 @@ +package gateway + +import ( + "context" + "sync" + "sync/atomic" +) + +const ( + gatewayStatusNone = status(iota) + gatewayStatusRunning + gatewayStatusStopped +) + +type ( + status = int32 // 状态 + Address = string // 完整的网络地址 + Host = string // 主机地址 + Port = uint16 // 端口号 +) + +func New() *Gateway { + gateway := &Gateway{ + listeners: make(map[Address]Listener), + wg: &sync.WaitGroup{}, + status: gatewayStatusNone, + events: make(chan any, 1024), + } + + gateway.ctx, gateway.cancel = context.WithCancel(context.Background()) + + return gateway +} + +type Gateway struct { + ctx context.Context // 上下文 + cancel context.CancelFunc // 取消函数 + status status // 状态 + events chan any // 事件 + wg *sync.WaitGroup // 等待组 + listeners map[Address]Listener // 监听器 +} + +func (g *Gateway) BindListener(listener Listener, cb ...func(err error)) { + g.events <- listenerBindEvent{ + listener: listener, + callback: cb, + } +} + +func (g *Gateway) Run() { + if !atomic.CompareAndSwapInt32(&g.status, gatewayStatusNone, gatewayStatusRunning) { + return + } + for { + select { + case <-g.ctx.Done(): + return + case event := <-g.events: + switch e := event.(type) { + case listenerBindEvent: + g.onListenerBind(e) + + } + } + } +} + +func (g *Gateway) Stop() { + if !atomic.CompareAndSwapInt32(&g.status, gatewayStatusRunning, gatewayStatusStopped) { + return + } + g.cancel() + g.wg.Wait() +} + +func (g *Gateway) onListenerBind(e listenerBindEvent) { + if err := e.listener.Start(g.ctx); err != nil { + for _, cb := range e.callback { + cb(err) + } + } else { + g.listeners[e.listener.Address()] = e.listener + } +} diff --git a/minotaur/gateway/gateway_test.go b/minotaur/gateway/gateway_test.go new file mode 100644 index 00000000..c92cb2fa --- /dev/null +++ b/minotaur/gateway/gateway_test.go @@ -0,0 +1,23 @@ +package gateway_test + +import ( + "github.com/kercylan98/minotaur/minotaur/gateway" + "github.com/kercylan98/minotaur/minotaur/vivid" + "github.com/kercylan98/minotaur/toolkit/balancer" + "testing" +) + +func TestGateway_BindListener(t *testing.T) { + system := vivid.NewActorSystem("gateway") + ref := system.ActorOf(vivid.OfO[*gateway.L4TCPListenerActor]()) + ref.Tell(gateway.ListenerActorBindAddressMessage{Address: ":8080"}) + ref.Tell(gateway.ListenerActorBindBalancerMessage{Balancer: balancer.NewConsistentHashWeight[gateway.EndpointId, *gateway.Endpoint](1)}) + ref.Tell(gateway.ListenerActorBindEndpointMessage{Endpoint: &gateway.Endpoint{ + Id: 1, + Host: "192.168.2.112", + Port: 10000, + Weight: 10, + }}) + + system.AwaitShutdown() +} diff --git a/minotaur/gateway/l4.go b/minotaur/gateway/l4.go new file mode 100644 index 00000000..ff69cf3c --- /dev/null +++ b/minotaur/gateway/l4.go @@ -0,0 +1,96 @@ +package gateway + +import ( + "errors" + "github.com/kercylan98/minotaur/minotaur/transport" + "github.com/kercylan98/minotaur/minotaur/transport/network" + "github.com/kercylan98/minotaur/minotaur/vivid" + "github.com/kercylan98/minotaur/toolkit/balancer" + "github.com/kercylan98/minotaur/toolkit/log" + "net" +) + +type L4TCPListenerActor struct { + address Address + srv vivid.TypedActorRef[transport.ServerActorTyped] + endpoints balancer.Balancer[EndpointId, *Endpoint] +} + +func (l *L4TCPListenerActor) OnReceive(ctx vivid.MessageContext) { + switch ctx.GetMessage().(type) { + case vivid.OnBoot: + ctx.Become(vivid.BehaviorOf(l.onBindAddress)) + ctx.Become(vivid.BehaviorOf(l.onBindBalancer)) + ctx.Become(vivid.BehaviorOf(l.onBindEndpoint)) + ctx.Become(vivid.BehaviorOf(l.onConnectionOpened)) + } +} + +func (l *L4TCPListenerActor) onBindAddress(ctx vivid.MessageContext, message ListenerActorBindAddressMessage) { + if l.address == message.Address { + return + } + l.address = message.Address + + if l.srv != nil { + l.srv.Stop() + l.srv = nil + } + + l.srv = transport.NewServerActor(ctx.GetSystem(), vivid.NewActorOptions[*transport.ServerActor](). + WithSupervisor(func(message, reason vivid.Message) vivid.Directive { + return vivid.DirectiveRestart + }), + ) + + l.srv.Tell(transport.ServerLaunchMessage{ + Network: network.Tcp(message.Address), + }) + + l.srv.Api().SubscribeConnOpenedEvent(ctx) +} + +func (l *L4TCPListenerActor) onConnectionOpened(ctx vivid.MessageContext, message transport.ServerConnectionOpenedEvent) { + // 选择端点 + endpoint, err := l.endpoints.Select() + if err != nil { + ctx.GetSystem().GetLogger().Error("SelectEndpoint", log.Err(err)) + message.Conn.Stop() + return + } + + // 连接到端点 + tcpAddr, err := net.ResolveTCPAddr("tcp", endpoint.GetAddress()) + if err != nil { + ctx.GetSystem().GetLogger().Error("ResolveTcpAddr", log.Err(err)) + message.Conn.Stop() + return + } + + endpointConn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + ctx.GetSystem().GetLogger().Error("DialTcp", log.Err(err)) + message.Conn.Stop() + return + } + + message.Conn.Api().SetTerminateHandler(func(ctx vivid.MessageContext, conn transport.Conn, message vivid.OnTerminate) { + endpointConn.Close() + }) + message.Conn.Api().SetPacketHandler(func(ctx vivid.MessageContext, conn transport.Conn, packet transport.ConnectionReactPacketMessage) { + endpointConn.Write(packet.Packet.GetBytes()) + }) +} + +func (l *L4TCPListenerActor) onBindBalancer(ctx vivid.MessageContext, message ListenerActorBindBalancerMessage) { + l.endpoints = message.Balancer +} + +func (l *L4TCPListenerActor) onBindEndpoint(ctx vivid.MessageContext, message ListenerActorBindEndpointMessage) { + if l.endpoints == nil { + ctx.GetSystem().GetLogger().Error("BindEndpoint", log.Err(errors.New("endpoints balancer is nil"))) + return + } + + l.endpoints.Add(message.Endpoint) +} diff --git a/minotaur/gateway/listener.go b/minotaur/gateway/listener.go new file mode 100644 index 00000000..9b2bc5a2 --- /dev/null +++ b/minotaur/gateway/listener.go @@ -0,0 +1,43 @@ +package gateway + +import ( + "context" + "github.com/kercylan98/minotaur/toolkit/balancer" +) + +type ListenerActorBindAddressMessage struct { + Address Address +} + +type ListenerActorBindBalancerMessage struct { + Balancer balancer.Balancer[EndpointId, *Endpoint] +} + +type ListenerActorBindEndpointMessage struct { + Endpoint *Endpoint +} + +type Listener interface { + // Start 启动监听器开始监听端口活动 + Start(ctx context.Context) error + + // Stop 停止监听器 + Stop() error + + // SetAddress 设置监听器的地址 + SetAddress(addr Address) + + // Address 获取监听器的地址 + Address() Address + + // AddEndpoint 添加一个端点 + AddEndpoint(endpoint *Endpoint) + + // RemoveEndpoint 移除一个端点 + RemoveEndpoint(id EndpointId) +} + +type listenerBindEvent struct { + listener Listener + callback []func(err error) +}