/
websocket.go
165 lines (141 loc) · 4.38 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package api
import (
"context"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"go.uber.org/zap"
apitypes "github.com/iotexproject/iotex-core/api/types"
"github.com/iotexproject/iotex-core/pkg/log"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 15 * 1024 * 1024
)
// WebsocketHandler handles requests from websocket protocol
type WebsocketHandler struct {
msgHandler Web3Handler
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// type safeWebsocketConn wraps websocket.Conn with a mutex
// to avoid concurrent write to the connection
// https://pkg.go.dev/github.com/gorilla/websocket#hdr-Concurrency
type safeWebsocketConn struct {
ws *websocket.Conn
mu sync.Mutex
}
// WiteJSON writes a JSON message to the connection in a thread-safe way
func (c *safeWebsocketConn) WriteJSON(message interface{}) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.ws.WriteJSON(message)
}
// WriteMessage writes a message to the connection in a thread-safe way
func (c *safeWebsocketConn) WriteMessage(messageType int, data []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.ws.WriteMessage(messageType, data)
}
// Close closes the underlying network connection without sending or waiting for a close frame
func (c *safeWebsocketConn) Close() error {
return c.ws.Close()
}
// SetWriteDeadline sets the write deadline on the underlying network connection
func (c *safeWebsocketConn) SetWriteDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.ws.SetWriteDeadline(t)
}
// NewWebsocketHandler creates a new websocket handler
func NewWebsocketHandler(web3Handler Web3Handler) *WebsocketHandler {
return &WebsocketHandler{
msgHandler: web3Handler,
}
}
func (wsSvr *WebsocketHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
upgrader.CheckOrigin = func(_ *http.Request) bool { return true }
// upgrade this connection to a WebSocket connection
ws, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Logger("api").Warn("failed to upgrade http server to websocket", zap.Error(err))
return
}
wsSvr.handleConnection(req.Context(), ws)
}
func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websocket.Conn) {
defer ws.Close()
if err := ws.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Logger("api").Warn("failed to set read deadline timeout.", zap.Error(err))
}
ws.SetReadLimit(maxMessageSize)
ws.SetPongHandler(func(string) error {
if err := ws.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Logger("api").Warn("failed to set read deadline timeout.", zap.Error(err))
}
return nil
})
ctx, cancel := context.WithCancel(ctx)
safeWs := &safeWebsocketConn{ws: ws}
go ping(ctx, safeWs, cancel)
for {
select {
case <-ctx.Done():
return
default:
_, reader, err := ws.NextReader()
if err != nil {
log.Logger("api").Debug("Client Disconnected", zap.Error(err))
cancel()
return
}
err = wsSvr.msgHandler.HandlePOSTReq(ctx, reader,
apitypes.NewResponseWriter(
func(resp interface{}) (int, error) {
if err = safeWs.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err))
}
return 0, safeWs.WriteJSON(resp)
}),
)
if err != nil {
log.Logger("api").Warn("fail to respond request.", zap.Error(err))
cancel()
return
}
}
}
}
func ping(ctx context.Context, ws *safeWebsocketConn, cancel context.CancelFunc) {
pingTicker := time.NewTicker(pingPeriod)
defer func() {
pingTicker.Stop()
if err := ws.Close(); err != nil {
log.Logger("api").Warn("fail to close websocket connection.", zap.Error(err))
}
}()
for {
select {
case <-ctx.Done():
return
case <-pingTicker.C:
if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err))
}
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Logger("api").Warn("fail to respond request.", zap.Error(err))
cancel()
return
}
}
}
}