From 7239a278ee7c0db55b701510c5ff477ba4f666b5 Mon Sep 17 00:00:00 2001 From: kercylan Date: Wed, 20 Mar 2024 00:13:31 +0800 Subject: [PATCH] =?UTF-8?q?other:=20=E6=96=B0=20server=20=E5=8C=85?= =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 4 +- go.sum | 19 +- server/v2/event_handler.go | 10 +- server/v2/options.go | 4 + server/v2/server.go | 2 +- server/v2/server_test.go | 29 +++ server/v2/traffickers/http.go | 28 ++- server/v2/traffickers/http_recorder.go | 211 ++++++++++++++++++ server/v2/traffickers/http_response_writer.go | 58 ----- server/v2/traffickers/websocket.go | 64 ++++++ server/v2/traffickers/websocket_conn.go | 16 ++ 11 files changed, 354 insertions(+), 91 deletions(-) create mode 100644 server/v2/options.go create mode 100644 server/v2/server_test.go create mode 100644 server/v2/traffickers/http_recorder.go delete mode 100644 server/v2/traffickers/http_response_writer.go create mode 100644 server/v2/traffickers/websocket.go create mode 100644 server/v2/traffickers/websocket_conn.go diff --git a/go.mod b/go.mod index 045868d..5719bdd 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/panjf2000/ants/v2 v2.9.0 github.com/panjf2000/gnet v1.6.7 + github.com/panjf2000/gnet/v2 v2.3.6 github.com/pkg/errors v0.9.1 github.com/smartystreets/goconvey v1.8.1 github.com/sony/sonyflake v1.2.0 @@ -46,11 +47,8 @@ require ( github.com/mattn/go-isatty v0.0.19 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/panjf2000/gnet/v2 v2.3.6 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/samber/do/v2 v2.0.0-beta.5 // indirect - github.com/samber/go-type-to-string v1.2.0 // indirect github.com/smarty/assertions v1.15.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/templexxx/cpu v0.1.0 // indirect diff --git a/go.sum b/go.sum index 11c264d..3155bcd 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,9 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7oFtbDZXC4dnY093M1kZx6k/95sen92gafbY= github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10= github.com/alphadose/haxmap v1.3.1 h1:KmZh75duO1tC8pt3LmUwoTYiZ9sh4K52FX8p7/yrlqU= github.com/alphadose/haxmap v1.3.1/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= @@ -129,10 +127,6 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/samber/do/v2 v2.0.0-beta.5 h1:KpQhVkkzDlsLSDC5WXgyCL8Q3SqOYoInFJIbvntPazM= -github.com/samber/do/v2 v2.0.0-beta.5/go.mod h1:FNMy1RSKMX11Ag8v4KW95n9k+ZkCXn8GuvDKufVKN9E= -github.com/samber/go-type-to-string v1.2.0 h1:Pvdqx3r/EHn9/DTKoW6RoHz/850s5yV1vA6MqKKG5Ys= -github.com/samber/go-type-to-string v1.2.0/go.mod h1:jpU77vIDoIxkahknKDoEx9C8bQ1ADnh2sotZ8I4QqBU= github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY= github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec= github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY= @@ -189,14 +183,13 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= @@ -245,8 +238,6 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -266,8 +257,6 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -322,8 +311,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -332,14 +319,12 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server/v2/event_handler.go b/server/v2/event_handler.go index 37cb6b8..b21fc14 100644 --- a/server/v2/event_handler.go +++ b/server/v2/event_handler.go @@ -1,18 +1,18 @@ package server import ( - "fmt" "github.com/panjf2000/ants/v2" "github.com/panjf2000/gnet/v2" "time" ) -func newEventHandler(trafficker Trafficker) (handler *eventHandler, err error) { +func newEventHandler(options *Options, trafficker Trafficker) (handler *eventHandler, err error) { var wp *ants.Pool if wp, err = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithNonblocking(true)); err != nil { return } handler = &eventHandler{ + options: options, trafficker: trafficker, workerPool: wp, } @@ -21,17 +21,18 @@ func newEventHandler(trafficker Trafficker) (handler *eventHandler, err error) { type ( Trafficker interface { - OnBoot() error + OnBoot(options *Options) error OnTraffic(c gnet.Conn, packet []byte) } eventHandler struct { + options *Options trafficker Trafficker workerPool *ants.Pool } ) func (e *eventHandler) OnBoot(eng gnet.Engine) (action gnet.Action) { - if err := e.trafficker.OnBoot(); err != nil { + if err := e.trafficker.OnBoot(e.options); err != nil { action = gnet.Shutdown } return @@ -46,7 +47,6 @@ func (e *eventHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { } func (e *eventHandler) OnClose(c gnet.Conn, err error) (action gnet.Action) { - fmt.Println("断开") return } diff --git a/server/v2/options.go b/server/v2/options.go new file mode 100644 index 0000000..01b105d --- /dev/null +++ b/server/v2/options.go @@ -0,0 +1,4 @@ +package server + +type Options struct { +} diff --git a/server/v2/server.go b/server/v2/server.go index 02e50fd..6e0c722 100644 --- a/server/v2/server.go +++ b/server/v2/server.go @@ -15,6 +15,6 @@ type Server struct { func (s *Server) Run(protoAddr string) (err error) { var handler *eventHandler - handler, err = newEventHandler(s.trafficker) + handler, err = newEventHandler(new(Options), s.trafficker) return gnet.Run(handler, protoAddr) } diff --git a/server/v2/server_test.go b/server/v2/server_test.go new file mode 100644 index 0000000..14d989c --- /dev/null +++ b/server/v2/server_test.go @@ -0,0 +1,29 @@ +package server_test + +import ( + "github.com/gin-gonic/gin" + "github.com/kercylan98/minotaur/server/v2" + "github.com/kercylan98/minotaur/server/v2/traffickers" + "net/http" + "testing" +) + +func TestNewServer(t *testing.T) { + r := gin.New() + r.GET("/", func(context *gin.Context) { + context.JSON(200, gin.H{ + "ping": "pong", + }) + }) + srv := server.NewServer(traffickers.WebSocket(r, func(handler *gin.Engine, upgradeHandler func(writer http.ResponseWriter, request *http.Request) error) { + handler.GET("/ws", func(context *gin.Context) { + if err := upgradeHandler(context.Writer, context.Request); err != nil { + context.AbortWithError(500, err) + } + }) + })) + + if err := srv.Run("tcp://:8080"); err != nil { + panic(err) + } +} diff --git a/server/v2/traffickers/http.go b/server/v2/traffickers/http.go index 179e294..d506eaf 100644 --- a/server/v2/traffickers/http.go +++ b/server/v2/traffickers/http.go @@ -9,18 +9,22 @@ import ( netHttp "net/http" ) -func Http(handler netHttp.Handler) server.Trafficker { - return &http{ +func Http[H netHttp.Handler](handler H) server.Trafficker { + return &http[H]{ handler: handler, + ncb: func(c gnet.Conn, err error) error { + return nil + }, } } -type http struct { - handler netHttp.Handler +type http[H netHttp.Handler] struct { + handler H rwp *hub.ObjectPool[*httpResponseWriter] + ncb func(c gnet.Conn, err error) error } -func (h *http) OnBoot() error { +func (h *http[H]) OnBoot(options *server.Options) error { h.rwp = hub.NewObjectPool[httpResponseWriter](func() *httpResponseWriter { return new(httpResponseWriter) }, func(data *httpResponseWriter) { @@ -29,14 +33,24 @@ func (h *http) OnBoot() error { return nil } -func (h *http) OnTraffic(c gnet.Conn, packet []byte) { +func (h *http[H]) OnTraffic(c gnet.Conn, packet []byte) { + var responseWriter *httpResponseWriter + defer func() { + if responseWriter == nil || !responseWriter.isHijack { + _ = c.Close() + } + }() httpRequest, err := netHttp.ReadRequest(bufio.NewReader(bytes.NewReader(packet))) if err != nil { return } - responseWriter := h.rwp.Get() + responseWriter = h.rwp.Get() responseWriter.init(c) h.handler.ServeHTTP(responseWriter, httpRequest) + if responseWriter.isHijack { + return + } + _ = responseWriter.Result().Write(c) } diff --git a/server/v2/traffickers/http_recorder.go b/server/v2/traffickers/http_recorder.go new file mode 100644 index 0000000..031a1b1 --- /dev/null +++ b/server/v2/traffickers/http_recorder.go @@ -0,0 +1,211 @@ +package traffickers + +import ( + "bufio" + "bytes" + "fmt" + "github.com/panjf2000/gnet/v2" + "io" + "net" + netHttp "net/http" + "net/textproto" + "strconv" + "strings" + + "golang.org/x/net/http/httpguts" +) + +type httpResponseWriter struct { + Code int + HeaderMap netHttp.Header + Body *bytes.Buffer + Flushed bool + + conn *websocketConn + result *netHttp.Response + snapHeader netHttp.Header + wroteHeader bool + isHijack bool +} + +func (rw *httpResponseWriter) init(c gnet.Conn) { + rw.conn = &websocketConn{Conn: c} + rw.Code = 200 + rw.Body = new(bytes.Buffer) + rw.HeaderMap = make(netHttp.Header) + rw.isHijack = false +} + +func (rw *httpResponseWriter) reset() { + rw.conn = nil + rw.Code = 200 + rw.Body = nil + rw.HeaderMap = nil + rw.isHijack = false +} + +func (rw *httpResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { + if !rw.isHijack { + return rw.conn, bufio.NewReadWriter(bufio.NewReader(rw.conn), bufio.NewWriter(rw.conn)), nil + } + return nil, nil, netHttp.ErrHijacked +} + +func (rw *httpResponseWriter) Header() netHttp.Header { + m := rw.HeaderMap + if m == nil { + m = make(netHttp.Header) + rw.HeaderMap = m + } + return m +} + +func (rw *httpResponseWriter) writeHeader(b []byte, str string) { + if rw.wroteHeader { + return + } + if len(str) > 512 { + str = str[:512] + } + + m := rw.Header() + + _, hasType := m["Content-Type"] + hasTE := m.Get("Transfer-Encoding") != "" + if !hasType && !hasTE { + if b == nil { + b = []byte(str) + } + m.Set("Content-Type", netHttp.DetectContentType(b)) + } + + rw.WriteHeader(200) +} + +func (rw *httpResponseWriter) Write(buf []byte) (n int, err error) { + if rw.isHijack { + n = len(buf) + var wait = make(chan error) + if err = rw.conn.AsyncWrite(buf, func(c gnet.Conn, err error) error { + if err != nil { + wait <- err + } + return nil + }); err != nil { + return + } + err = <-wait + return + } + rw.writeHeader(buf, "") + if rw.Body != nil { + rw.Body.Write(buf) + } + return len(buf), nil +} + +func (rw *httpResponseWriter) WriteString(str string) (int, error) { + rw.writeHeader(nil, str) + if rw.Body != nil { + rw.Body.WriteString(str) + } + return len(str), nil +} + +func checkWriteHeaderCode(code int) { + if code < 100 || code > 999 { + panic(fmt.Sprintf("invalid WriteHeader code %v", code)) + } +} + +func (rw *httpResponseWriter) WriteHeader(code int) { + if rw.wroteHeader { + return + } + + checkWriteHeaderCode(code) + rw.Code = code + rw.wroteHeader = true + if rw.HeaderMap == nil { + rw.HeaderMap = make(netHttp.Header) + } + rw.snapHeader = rw.HeaderMap.Clone() +} + +func (rw *httpResponseWriter) Flush() { + if !rw.wroteHeader { + rw.WriteHeader(200) + } + rw.Flushed = true +} + +func (rw *httpResponseWriter) Result() *netHttp.Response { + if rw.result != nil { + return rw.result + } + if rw.snapHeader == nil { + rw.snapHeader = rw.HeaderMap.Clone() + } + res := &netHttp.Response{ + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + StatusCode: rw.Code, + Header: rw.snapHeader, + } + rw.result = res + if res.StatusCode == 0 { + res.StatusCode = 200 + } + res.Status = fmt.Sprintf("%03d %s", res.StatusCode, netHttp.StatusText(res.StatusCode)) + if rw.Body != nil { + res.Body = io.NopCloser(bytes.NewReader(rw.Body.Bytes())) + } else { + res.Body = netHttp.NoBody + } + res.ContentLength = parseContentLength(res.Header.Get("Content-Length")) + + if trailers, ok := rw.snapHeader["Trailer"]; ok { + res.Trailer = make(netHttp.Header, len(trailers)) + for _, k := range trailers { + for _, k := range strings.Split(k, ",") { + k = netHttp.CanonicalHeaderKey(textproto.TrimString(k)) + if !httpguts.ValidTrailerHeader(k) { + // Ignore since forbidden by RFC 7230, section 4.1.2. + continue + } + vv, ok := rw.HeaderMap[k] + if !ok { + continue + } + vv2 := make([]string, len(vv)) + copy(vv2, vv) + res.Trailer[k] = vv2 + } + } + } + for k, vv := range rw.HeaderMap { + if !strings.HasPrefix(k, netHttp.TrailerPrefix) { + continue + } + if res.Trailer == nil { + res.Trailer = make(netHttp.Header) + } + for _, v := range vv { + res.Trailer.Add(strings.TrimPrefix(k, netHttp.TrailerPrefix), v) + } + } + return res +} + +func parseContentLength(cl string) int64 { + cl = textproto.TrimString(cl) + if cl == "" { + return -1 + } + n, err := strconv.ParseUint(cl, 10, 63) + if err != nil { + return -1 + } + return int64(n) +} diff --git a/server/v2/traffickers/http_response_writer.go b/server/v2/traffickers/http_response_writer.go deleted file mode 100644 index fcff5ac..0000000 --- a/server/v2/traffickers/http_response_writer.go +++ /dev/null @@ -1,58 +0,0 @@ -package traffickers - -import ( - "bytes" - "github.com/panjf2000/gnet/v2" - netHttp "net/http" - "strconv" - "sync" -) - -type httpResponseWriter struct { - c gnet.Conn - statusCode int - header netHttp.Header -} - -func (w *httpResponseWriter) init(c gnet.Conn) { - w.c = c - w.statusCode = 200 - w.header = make(netHttp.Header) -} - -func (w *httpResponseWriter) reset() { - w.c = nil - w.statusCode = 200 - w.header = nil -} - -func (w *httpResponseWriter) Header() netHttp.Header { - return w.header -} - -func (w *httpResponseWriter) Write(b []byte) (n int, err error) { - var buf bytes.Buffer - buf.WriteString("HTTP/1.1 ") - buf.WriteString(netHttp.StatusText(w.statusCode)) - buf.WriteString("\r\n") - w.header.Set("Content-Length", strconv.Itoa(len(b))) - if err = w.header.Write(&buf); err != nil { - return - } - buf.WriteString("\r\n") - buf.Write(b) - res := buf.Bytes() - var wg sync.WaitGroup - wg.Add(1) - err = w.c.AsyncWrite(res, func(c gnet.Conn, e error) error { - err = e - wg.Done() - return nil - }) - wg.Wait() - return len(res), err -} - -func (w *httpResponseWriter) WriteHeader(statusCode int) { - w.statusCode = statusCode -} diff --git a/server/v2/traffickers/websocket.go b/server/v2/traffickers/websocket.go new file mode 100644 index 0000000..4b8c7b1 --- /dev/null +++ b/server/v2/traffickers/websocket.go @@ -0,0 +1,64 @@ +package traffickers + +import ( + "fmt" + ws "github.com/gorilla/websocket" + "github.com/kercylan98/minotaur/server/v2" + "github.com/panjf2000/gnet/v2" + netHttp "net/http" +) + +func WebSocket[H netHttp.Handler](handler H, binder func(handler H, upgradeHandler func(writer netHttp.ResponseWriter, request *netHttp.Request) error)) server.Trafficker { + w := &websocket[H]{ + http: Http(handler).(*http[H]), + binder: binder, + upgrader: &ws.Upgrader{ + ReadBufferSize: 4096, + WriteBufferSize: 4096, + CheckOrigin: func(r *netHttp.Request) bool { + return true + }, + }, + } + binder(handler, w.OnUpgrade) + return w +} + +type websocket[H netHttp.Handler] struct { + *http[H] + binder func(handler H, upgradeHandler func(writer netHttp.ResponseWriter, request *netHttp.Request) error) + upgrader *ws.Upgrader +} + +func (w *websocket[H]) OnBoot(options *server.Options) error { + return w.http.OnBoot(options) +} + +func (w *websocket[H]) OnTraffic(c gnet.Conn, packet []byte) { + w.http.OnTraffic(c, packet) +} + +func (w *websocket[H]) OnUpgrade(writer netHttp.ResponseWriter, request *netHttp.Request) (err error) { + var ( + ip string + conn *ws.Conn + ) + + ip = request.Header.Get("X-Real-IP") + conn, err = w.upgrader.Upgrade(writer, request, nil) + if err != nil { + return + } + + fmt.Println("opened", ip) + go func() { + for { + mt, data, err := conn.ReadMessage() + if err != nil { + continue + } + conn.WriteMessage(mt, data) + } + }() + return nil +} diff --git a/server/v2/traffickers/websocket_conn.go b/server/v2/traffickers/websocket_conn.go new file mode 100644 index 0000000..c27dccb --- /dev/null +++ b/server/v2/traffickers/websocket_conn.go @@ -0,0 +1,16 @@ +package traffickers + +import ( + "github.com/panjf2000/gnet/v2" + "time" +) + +type websocketConn struct { + gnet.Conn + deadline time.Time +} + +func (c *websocketConn) SetDeadline(t time.Time) error { + c.deadline = t + return nil +}