Skip to content

Commit

Permalink
other: v2 vivid 结合 server
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 27, 2024
1 parent 2e0c52e commit 6531767
Show file tree
Hide file tree
Showing 18 changed files with 764 additions and 41 deletions.
16 changes: 15 additions & 1 deletion server3/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,22 @@ type ConnWriter func(packet Packet) error
type conn struct {
conn net.Conn
writer ConnWriter
actor vivid.ActorRef
}

func (c *conn) OnReceive(ctx vivid.MessageContext) {

switch m := ctx.GetMessage().(type) {
case vivid.OnPreStart:
c.actor = ctx.GetReceiver()
case onConnectionReceivedMessage:
c.onConnectionReceivedMessage(ctx, m)
}
}

func (c *conn) onConnectionReceivedMessage(ctx vivid.MessageContext, m onConnectionReceivedMessage) {
packet := m.Packet
if err := c.writer(packet); err != nil {
c.conn.Close()
c.actor.Tell(onConnectionClosedTellMessage{conn: c.conn})
}
}
30 changes: 26 additions & 4 deletions server3/core.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package server

import "net"
import (
"github.com/kercylan98/minotaur/vivid"
"net"
)

// Core 是对于可自行实现功能提供的对外接口
type Core interface {
BindConn(conn net.Conn, writer ConnWriter)
// BindConn 用于绑定一个连接到服务器
BindConn(conn net.Conn, writer ConnWriter) vivid.ActorRef
// UnbindConn 用于解绑一个连接
UnbindConn(conn net.Conn)
// ProcessPacket 用于处理一个数据包
ProcessPacket(conn vivid.ActorRef, packet Packet)
}

type _Core struct {
Expand All @@ -15,10 +24,23 @@ func (c *_Core) init(srv *server) *_Core {
return c
}

func (c *_Core) BindConn(netConn net.Conn, writer ConnWriter) {
func (c *_Core) BindConn(netConn net.Conn, writer ConnWriter) vivid.ActorRef {
connection := &conn{
conn: netConn,
writer: writer,
}
c.srv.actor.Tell(onConnectionOpenedMessage{connection})
reply := c.srv.actor.Ask(onConnectionOpenedAskMessage{connection})
switch r := reply.(type) {
case vivid.ActorRef:
return r
}
return nil
}

func (c *_Core) UnbindConn(netConn net.Conn) {
c.srv.actor.Tell(onConnectionClosedTellMessage{netConn})
}

func (c *_Core) ProcessPacket(conn vivid.ActorRef, packet Packet) {
conn.Tell(onConnectionReceivedMessage{Packet: packet})
}
14 changes: 12 additions & 2 deletions server3/messages.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package server

import "net"

type (
onLaunchServerAskMessage struct {
onLaunchServerTellMessage struct {
Network Network // 网络接口
}

// onShutdownServerAskMessage 用于关闭服务器的消息,该消息回复一个 error 类型的值
onShutdownServerAskMessage struct {
}

onConnectionOpenedMessage struct {
onConnectionOpenedAskMessage struct {
conn *conn
}

onConnectionClosedTellMessage struct {
conn net.Conn
}

onConnectionReceivedMessage struct {
Packet Packet
}
)
183 changes: 183 additions & 0 deletions server3/network/gnet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package network

import (
"context"
"errors"
"fmt"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
server "github.com/kercylan98/minotaur/server3"
"github.com/kercylan98/minotaur/toolkit/collection"
"github.com/kercylan98/minotaur/vivid"
"github.com/panjf2000/gnet/v2"
"time"
)

var (
schemaWebSocket = "ws"
schemaTcp = "tcp"
schemaTcp4 = "tcp4"
schemaTcp6 = "tcp6"
schemaUdp = "udp"
schemaUdp4 = "udp4"
schemaUdp6 = "udp6"
schemaUnix = "unix"
)

func newGnetEngine(schema, addr string, pattern ...string) server.Network {
g := &gnetEngine{
addr: addr,
schema: schema,
pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"),
}
return g
}

type gnetEngine struct {
addr string
schema string
pattern string
eng gnet.Engine
upgrader ws.Upgrader
srv server.Core
}

func (g *gnetEngine) Launch(ctx context.Context, srv server.Core) error {
g.srv = srv

var addr string
switch g.schema {
case schemaTcp, schemaWebSocket:
addr = fmt.Sprintf("tcp://%s", g.addr)
if g.schema == schemaWebSocket {
g.initWebSocketUpgrader()
}
case schemaTcp4:
addr = fmt.Sprintf("tcp4://%s", g.addr)
case schemaTcp6:
addr = fmt.Sprintf("tcp6://%s", g.addr)
case schemaUdp:
addr = fmt.Sprintf("udp://%s", g.addr)
case schemaUdp4:
addr = fmt.Sprintf("udp4://%s", g.addr)
case schemaUdp6:
addr = fmt.Sprintf("udp6://%s", g.addr)
case schemaUnix:
addr = fmt.Sprintf("unix://%s", g.addr)
default:
return fmt.Errorf("unsupported schema: %s", g.schema)
}
return gnet.Run(g, addr)
}

func (g *gnetEngine) Shutdown() error {
return g.eng.Stop(context.TODO())
}

func (g *gnetEngine) Schema() string {
return g.schema
}

func (g *gnetEngine) Address() string {
return g.addr
}

func (g *gnetEngine) OnBoot(eng gnet.Engine) (action gnet.Action) {
g.eng = eng
return
}

func (g *gnetEngine) OnShutdown(eng gnet.Engine) {

}

func (g *gnetEngine) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
if g.schema == schemaWebSocket {
c.SetContext(newWebsocketWrapper(c))
} else {
connActor := g.srv.BindConn(c, func(packet server.Packet) error {
return c.AsyncWrite(packet.GetBytes(), func(c gnet.Conn, err error) error {
return nil
})
})

if connActor == nil {
action = gnet.Close
return
}

c.SetContext(connActor)
}
return
}

func (g *gnetEngine) OnClose(c gnet.Conn, err error) (action gnet.Action) {
g.srv.UnbindConn(c)
return
}

func (g *gnetEngine) OnTraffic(c gnet.Conn) (action gnet.Action) {
if g.schema == schemaWebSocket {
wrapper := c.Context().(*websocketWrapper)

if err := wrapper.readToBuffer(); err != nil {
return gnet.Close
}

if err := wrapper.upgrade(g.upgrader, func() {
// 协议升级成功后视为连接建立
conn := g.srv.BindConn(c, func(packet server.Packet) error {
return wsutil.WriteServerMessage(c, ws.OpText, packet.GetBytes())
})

if conn == nil {
action = gnet.Close
return
}

wrapper.ref = conn

}); err != nil {
return gnet.Close
}
wrapper.active = time.Now()

// decode
messages, err := wrapper.decode()
if err != nil {
return gnet.Close
}

for _, message := range messages {
packet := server.NewPacket(message.Payload)
packet.SetContext(message.OpCode)
g.srv.ProcessPacket(wrapper.ref, packet)
}
} else {
buf, err := c.Next(-1)
if err != nil {
return gnet.Close
}

var clone = make([]byte, len(buf))
copy(clone, buf)

g.srv.ProcessPacket(c.Context().(vivid.ActorRef), server.NewPacket(clone))
}
return
}

func (g *gnetEngine) OnTick() (delay time.Duration, action gnet.Action) {
return
}

func (g *gnetEngine) initWebSocketUpgrader() {
g.upgrader = ws.Upgrader{
OnRequest: func(uri []byte) (err error) {
if string(uri) != g.pattern {
err = errors.New("bad request")
}
return
},
}
}
24 changes: 24 additions & 0 deletions server3/network/gnet_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package network

type gNetLogger struct {
}

func (l *gNetLogger) Debugf(format string, args ...interface{}) {

}

func (l *gNetLogger) Infof(format string, args ...interface{}) {

}

func (l *gNetLogger) Warnf(format string, args ...interface{}) {

}

func (l *gNetLogger) Errorf(format string, args ...interface{}) {

}

func (l *gNetLogger) Fatalf(format string, args ...interface{}) {

}
35 changes: 35 additions & 0 deletions server3/network/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package network

import (
"context"
"github.com/kercylan98/minotaur/server"
server2 "github.com/kercylan98/minotaur/server3"
"net/http"
)

type HttpServe struct {
*http.ServeMux
}

type httpCore[H http.Handler] struct {
addr string
handler H
srv *http.Server
controller server.Controller
}

func (h *httpCore[H]) Launch(ctx context.Context, srv server2.Core) error {
return h.srv.ListenAndServe()
}

func (h *httpCore[H]) Shutdown() error {
return h.srv.Shutdown(context.TODO())
}

func (h *httpCore[H]) Schema() string {
return "http(s)"
}

func (h *httpCore[H]) Address() string {
return h.addr
}
Loading

0 comments on commit 6531767

Please sign in to comment.