Skip to content

Commit

Permalink
other: v2 server 增加 tcp\udp\unix 网络支持
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 2, 2024
1 parent db31f5a commit 102a9ec
Show file tree
Hide file tree
Showing 17 changed files with 353 additions and 173 deletions.
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *conn) initWriteQueue() {
if atomic.LoadInt32(&c.state) == ConnStatusClosed {
break // 连接已关闭,退出写入循环,交给下一次循环处理关闭
}
c.server.events.onConnectionAsyncWriteError(c, p, err)
c.server.events.OnConnectionAsyncWriteError(c, p, err)
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ type Controller interface {
// GetServer 获取服务器
GetServer() Server
// RegisterConnection 注册连接
RegisterConnection(conn net.Conn, writer ConnWriter)
RegisterConnection(conn net.Conn, writer ConnWriter, callback func(conn Conn))
// EliminateConnection 消除连接
EliminateConnection(conn net.Conn, err error)
// ReactPacket 反应连接数据包
ReactPacket(conn net.Conn, packet Packet)
// GetAnts 获取服务器异步池
GetAnts() *ants.Pool
// OnConnectionAsyncWriteError 注册连接异步写入数据错误事件
OnConnectionAsyncWriteError(conn Conn, packet Packet, err error)
}

type controller struct {
Expand All @@ -39,10 +41,13 @@ func (s *controller) GetAnts() *ants.Pool {
return s.server.ants
}

func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) {
func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter, callback func(conn Conn)) {
s.server.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context) {
c := newConn(s.server, conn, writer)
s.server.connections[conn] = c
if callback != nil {
callback(c)
}
s.events.onConnectionOpened(c)
})
}
Expand Down
2 changes: 1 addition & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (s *events) RegisterConnectionAsyncWriteErrorEvent(handler ConnectionAsyncW
s.connectionAsyncWriteErrorEventHandlers.AppendByOptionalPriority(handler, priority...)
}

func (s *events) onConnectionAsyncWriteError(conn Conn, packet Packet, err error) {
func (s *events) OnConnectionAsyncWriteError(conn Conn, packet Packet, err error) {
s.PublishSyncMessage(conn.GetQueue(), func(ctx context.Context) {
s.connectionAsyncWriteErrorEventHandlers.RangeValue(func(index int, value ConnectionAsyncWriteErrorEventHandler) bool {
value(s, conn, packet, err)
Expand Down
97 changes: 97 additions & 0 deletions server/network/gnet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package network

import (
"context"
"fmt"
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/toolkit/collection"
"github.com/panjf2000/gnet/v2"
"time"
)

var (
schemaWebSocket = "ws"
schemaTcp = "tcp"
schemaTcp4 = "tcp4"
schemaTcp6 = "tcp6"
schemaUdp = "udp"
schemaUdp4 = "udp4"
schemaUdp6 = "udp6"
schemaUnix = "unix"
)

type gNetHandler interface {
OnInit(core *gNetCore)
gnet.EventHandler

GetEngine() *gnet.Engine
}

func newGNetCore(handler gNetHandler, schema, addr string, pattern ...string) server.Network {
ws := &gNetCore{
handler: handler,
addr: addr,
schema: schema,
pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"),
}
return ws
}

type gNetCore struct {
ctx context.Context
controller server.Controller
handler gNetHandler
addr string
schema string
pattern string
}

func (w *gNetCore) OnSetup(ctx context.Context, controller server.Controller) (err error) {
w.ctx = ctx
w.controller = controller
return
}

func (w *gNetCore) OnRun() (err error) {
var addr string
switch w.schema {
case schemaTcp, schemaWebSocket:
addr = fmt.Sprintf("tcp://%s", w.addr)
case schemaTcp4:
addr = fmt.Sprintf("tcp4://%s", w.addr)
case schemaTcp6:
addr = fmt.Sprintf("tcp6://%s", w.addr)
case schemaUdp:
addr = fmt.Sprintf("udp://%s", w.addr)
case schemaUdp4:
addr = fmt.Sprintf("udp4://%s", w.addr)
case schemaUdp6:
addr = fmt.Sprintf("udp6://%s", w.addr)
case schemaUnix:
addr = fmt.Sprintf("unix://%s", w.addr)
default:
return fmt.Errorf("unsupported schema: %s", w.schema)
}
err = gnet.Run(w.handler, addr)
return
}

func (w *gNetCore) OnShutdown() error {
if w.handler.GetEngine() != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return w.handler.GetEngine().Stop(ctx)
}
return nil
}

func (w *gNetCore) Schema() string {
return w.schema
}

func (w *gNetCore) Address() string {
if w.pattern == "/" {
return w.addr
}
return fmt.Sprintf("%s:%s", w.addr, w.pattern)
}
64 changes: 64 additions & 0 deletions server/network/gnet_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package network

import (
"github.com/kercylan98/minotaur/server"
"github.com/panjf2000/gnet/v2"
"time"
)

type gNetGenericHandler struct {
engine *gnet.Engine
*gNetCore
}

func (t *gNetGenericHandler) OnInit(core *gNetCore) {
t.gNetCore = core
}

func (t *gNetGenericHandler) OnBoot(eng gnet.Engine) (action gnet.Action) {
t.engine = &eng
return
}

func (t *gNetGenericHandler) OnShutdown(eng gnet.Engine) {

}

func (t *gNetGenericHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
t.controller.RegisterConnection(c,
func(packet server.Packet) error {
return c.AsyncWrite(packet.GetBytes(), func(c gnet.Conn, err error) error {
t.controller.OnConnectionAsyncWriteError(c.Context().(server.Conn), packet, err)
return nil
})
}, func(conn server.Conn) {
c.SetContext(conn)
})
return
}

func (t *gNetGenericHandler) OnClose(c gnet.Conn, err error) (action gnet.Action) {
t.controller.EliminateConnection(c, err)
return
}

func (t *gNetGenericHandler) OnTraffic(c gnet.Conn) (action gnet.Action) {
buf, err := c.Next(-1)
if err != nil {
return gnet.Close
}

var clone = make([]byte, len(buf))
copy(clone, buf)

t.controller.ReactPacket(c, server.NewPacket(clone))
return
}

func (t *gNetGenericHandler) OnTick() (delay time.Duration, action gnet.Action) {
return
}

func (t *gNetGenericHandler) GetEngine() *gnet.Engine {
return t.engine
}
18 changes: 1 addition & 17 deletions server/network/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,13 @@ package network

import (
"context"
"github.com/kercylan98/minotaur/server"
"github.com/pkg/errors"
"net"
"net/http"
"time"
)

func Http(addr string) server.Network {
return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()})
}

func HttpWithHandler[H http.Handler](addr string, handler H) server.Network {
c := &httpCore[H]{
addr: addr,
handler: handler,
srv: &http.Server{
Addr: addr,
Handler: handler,
DisableGeneralOptionsHandler: false,
},
}
return c
}

type httpCore[H http.Handler] struct {
addr string
handler H
Expand Down
74 changes: 74 additions & 0 deletions server/network/networks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package network

import (
"github.com/kercylan98/minotaur/server"
"net/http"
)

// Http 创建一个基于 http.ServeMux 的 HTTP 的网络
func Http(addr string) server.Network {
return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()})
}

// HttpWithHandler 创建一个基于 http.Handler 的 HTTP 的网络
func HttpWithHandler[H http.Handler](addr string, handler H) server.Network {
c := &httpCore[H]{
addr: addr,
handler: handler,
srv: &http.Server{
Addr: addr,
Handler: handler,
DisableGeneralOptionsHandler: false,
},
}
return c
}

// WebSocket 创建一个基于 TCP 的 WebSocket 网络
// - addr 期望为类似于 127.0.0.1:1234 或 :1234 的地址
// - pattern 期望为 WebSocket 的路径,如果为空则默认为 /
func WebSocket(addr string, pattern ...string) server.Network {
return newGNetCore(new(websocketHandler), schemaWebSocket, addr, pattern...)
}

// Tcp 创建一个 TCP 网络
// - addr 期望为类似于 127.0.0.1:1234 或 :1234 的地址
func Tcp(addr string) server.Network {
return newGNetCore(new(gNetGenericHandler), schemaTcp, addr)
}

// Tcp4 创建一个 IPv4 TCP 网络
// - addr 期望为类似于 127.0.0.1:1234 或 :1234 的地址
func Tcp4(addr string) server.Network {
return newGNetCore(new(gNetGenericHandler), schemaTcp4, addr)
}

// Tcp6 创建一个 IPv6 TCP 网络
// - addr 期望为类似于 [::1]:1234 的地址
func Tcp6(addr string) server.Network {
return newGNetCore(new(gNetGenericHandler), schemaTcp6, addr)
}

// Udp 创建一个 UDP 网络
// - addr 期望为类似于 127.0.0.1:1234 或 :1234 的地址
func Udp(addr string) server.Network {
return newGNetCore(new(gNetGenericHandler), schemaUdp, addr)
}

// Udp4 创建一个 IPv4 UDP 网络
// - addr 期望为类似于 127.0.0.1:1234 或 :1234 的地址
func Udp4(addr string) server.Network {
return newGNetCore(new(gNetGenericHandler), schemaUdp4, addr)
}

// Udp6 创建一个 IPv6 UDP 网络
// - addr 期望为类似于 [::1]:1234 的地址
func Udp6(addr string) server.Network {
return newGNetCore(new(gNetGenericHandler), schemaUdp6, addr)
}

// Unix 创建一个 Unix Domain Socket 网络
// - addr 期望为类似于 /tmp/xxx.sock 的文件地址
func Unix(addr string) server.Network {
return newGNetCore(new(gNetGenericHandler), schemaUnix, addr)
}
Loading

0 comments on commit 102a9ec

Please sign in to comment.