diff --git a/v2/delivery/websocket.go b/v2/delivery/websocket.go index 001fb9b..699743e 100644 --- a/v2/delivery/websocket.go +++ b/v2/delivery/websocket.go @@ -1,9 +1,9 @@ package delivery import ( - "context" - "nhooyr.io/websocket" "time" + + "github.com/gorilla/websocket" ) // WsHandler handle raw websocket message @@ -24,10 +24,8 @@ func newWsConfig(endpoint string) *WsConfig { } var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { - ctx, cancel := context.WithCancel(context.Background()) - c, _, err := websocket.Dial(ctx, cfg.Endpoint, nil) + c, _, err := websocket.DefaultDialer.Dial(cfg.Endpoint, nil) if err != nil { - cancel() return nil, nil, err } c.SetReadLimit(655350) @@ -38,9 +36,8 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don // websocket.Conn.ReadMessage or when the stopC channel is // closed by the client. defer close(doneC) - defer cancel() if WebsocketKeepalive { - go keepAlive(ctx, c, WebsocketTimeout) + keepAlive(c, WebsocketTimeout) } // Wait for the stopC channel to be closed. We do that in a // separate goroutine because ReadMessage is a blocking @@ -52,13 +49,13 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don silent = true case <-doneC: } - _ = c.Close(websocket.StatusNormalClosure, "normal closure") + c.Close() }() for { - _, message, readErr := c.Read(ctx) - if readErr != nil { + _, message, err := c.ReadMessage() + if err != nil { if !silent { - errHandler(readErr) + errHandler(err) } return } @@ -68,21 +65,28 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don return } -func keepAlive(ctx context.Context, c *websocket.Conn, d time.Duration) { - t := time.NewTimer(d) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - } +func keepAlive(c *websocket.Conn, timeout time.Duration) { + ticker := time.NewTicker(timeout) - err := c.Ping(ctx) - if err != nil { - return - } + lastResponse := time.Now() + c.SetPongHandler(func(msg string) error { + lastResponse = time.Now() + return nil + }) - t.Reset(d) - } + go func() { + defer ticker.Stop() + for { + deadline := time.Now().Add(10 * time.Second) + err := c.WriteControl(websocket.PingMessage, []byte{}, deadline) + if err != nil { + return + } + <-ticker.C + if time.Since(lastResponse) > timeout { + c.Close() + return + } + } + }() } diff --git a/v2/futures/websocket.go b/v2/futures/websocket.go index 31240e4..483f3b4 100644 --- a/v2/futures/websocket.go +++ b/v2/futures/websocket.go @@ -1,9 +1,9 @@ package futures import ( - "context" - "nhooyr.io/websocket" "time" + + "github.com/gorilla/websocket" ) // WsHandler handle raw websocket message @@ -24,10 +24,8 @@ func newWsConfig(endpoint string) *WsConfig { } var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { - ctx, cancel := context.WithCancel(context.Background()) - c, _, err := websocket.Dial(ctx, cfg.Endpoint, nil) + c, _, err := websocket.DefaultDialer.Dial(cfg.Endpoint, nil) if err != nil { - cancel() return nil, nil, err } c.SetReadLimit(655350) @@ -38,9 +36,8 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don // websocket.Conn.ReadMessage or when the stopC channel is // closed by the client. defer close(doneC) - defer cancel() if WebsocketKeepalive { - go keepAlive(ctx, c, WebsocketTimeout) + keepAlive(c, WebsocketTimeout) } // Wait for the stopC channel to be closed. We do that in a // separate goroutine because ReadMessage is a blocking @@ -52,13 +49,13 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don silent = true case <-doneC: } - _ = c.Close(websocket.StatusNormalClosure, "normal closure") + c.Close() }() for { - _, message, readErr := c.Read(ctx) - if readErr != nil { + _, message, err := c.ReadMessage() + if err != nil { if !silent { - errHandler(readErr) + errHandler(err) } return } @@ -68,21 +65,28 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don return } -func keepAlive(ctx context.Context, c *websocket.Conn, d time.Duration) { - t := time.NewTimer(d) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - } +func keepAlive(c *websocket.Conn, timeout time.Duration) { + ticker := time.NewTicker(timeout) - err := c.Ping(ctx) - if err != nil { - return - } + lastResponse := time.Now() + c.SetPongHandler(func(msg string) error { + lastResponse = time.Now() + return nil + }) - t.Reset(d) - } + go func() { + defer ticker.Stop() + for { + deadline := time.Now().Add(10 * time.Second) + err := c.WriteControl(websocket.PingMessage, []byte{}, deadline) + if err != nil { + return + } + <-ticker.C + if time.Since(lastResponse) > timeout { + c.Close() + return + } + } + }() } diff --git a/v2/go.mod b/v2/go.mod index 63c3945..7d36b2d 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -5,8 +5,7 @@ go 1.13 require ( github.com/bitly/go-simplejson v0.5.0 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect - github.com/klauspost/compress v1.13.1 // indirect + github.com/gorilla/websocket v1.5.0 github.com/kr/pretty v0.2.0 // indirect github.com/stretchr/testify v1.4.0 - nhooyr.io/websocket v1.8.7 ) diff --git a/v2/go.sum b/v2/go.sum index d863739..3c2ca02 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -2,76 +2,22 @@ github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkN github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= -github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= -github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= -github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= -github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= -github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= -github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= -github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= -github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= -github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= -github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= -github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= -github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= -github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= -github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= -github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= -github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= -github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= -github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= -github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= -github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ= -github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= -github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= -github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= -github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= -github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= -github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= -nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/v2/websocket.go b/v2/websocket.go index 3f18d9c..b9d4cbe 100644 --- a/v2/websocket.go +++ b/v2/websocket.go @@ -1,9 +1,9 @@ package binance import ( - "context" - "nhooyr.io/websocket" "time" + + "github.com/gorilla/websocket" ) // WsHandler handle raw websocket message @@ -24,10 +24,8 @@ func newWsConfig(endpoint string) *WsConfig { } var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { - ctx, cancel := context.WithCancel(context.Background()) - c, _, err := websocket.Dial(ctx, cfg.Endpoint, nil) + c, _, err := websocket.DefaultDialer.Dial(cfg.Endpoint, nil) if err != nil { - cancel() return nil, nil, err } c.SetReadLimit(655350) @@ -38,9 +36,8 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don // websocket.Conn.ReadMessage or when the stopC channel is // closed by the client. defer close(doneC) - defer cancel() if WebsocketKeepalive { - go keepAlive(ctx, c, WebsocketTimeout) + keepAlive(c, WebsocketTimeout) } // Wait for the stopC channel to be closed. We do that in a // separate goroutine because ReadMessage is a blocking @@ -52,13 +49,13 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don silent = true case <-doneC: } - _ = c.Close(websocket.StatusNormalClosure, "normal closure") + c.Close() }() for { - _, message, readErr := c.Read(ctx) - if readErr != nil { + _, message, err := c.ReadMessage() + if err != nil { if !silent { - errHandler(readErr) + errHandler(err) } return } @@ -68,21 +65,28 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don return } -func keepAlive(ctx context.Context, c *websocket.Conn, d time.Duration) { - t := time.NewTimer(d) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - } +func keepAlive(c *websocket.Conn, timeout time.Duration) { + ticker := time.NewTicker(timeout) - err := c.Ping(ctx) - if err != nil { - return - } + lastResponse := time.Now() + c.SetPongHandler(func(msg string) error { + lastResponse = time.Now() + return nil + }) - t.Reset(d) - } + go func() { + defer ticker.Stop() + for { + deadline := time.Now().Add(10 * time.Second) + err := c.WriteControl(websocket.PingMessage, []byte{}, deadline) + if err != nil { + return + } + <-ticker.C + if time.Since(lastResponse) > timeout { + c.Close() + return + } + } + }() }