forked from graarh/golang-socketio
/
websocket.go
171 lines (141 loc) · 4.93 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package transport
import (
"crypto/tls"
"errors"
"io/ioutil"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/mtfelian/golang-socketio/logging"
)
const (
upgradeFailed = "Upgrade failed: "
wsDefaultPingInterval = 30 * time.Second
wsDefaultPingTimeout = 60 * time.Second
wsDefaultReceiveTimeout = 60 * time.Second
wsDefaultSendTimeout = 60 * time.Second
wsDefaultBufferSize = 1024 * 32
)
// WebsocketTransportParams is a parameters for getting non-default websocket transport
type WebsocketTransportParams struct {
Headers http.Header
TLSClientConfig *tls.Config
}
var (
errBinaryMessage = errors.New("binary messages are not supported")
errBadBuffer = errors.New("buffer error")
errPacketWrong = errors.New("wrong packet type error")
errMethodNotAllowed = errors.New("method not allowed")
errHttpUpgradeFailed = errors.New("http upgrade failed")
)
// WebsocketConnection represents websocket connection
type WebsocketConnection struct {
socket *websocket.Conn
transport *WebsocketTransport
}
// GetMessage from the connection
func (ws *WebsocketConnection) GetMessage() (string, error) {
logging.Log().Debug("WebsocketConnection.GetMessage() fired")
ws.socket.SetReadDeadline(time.Now().Add(ws.transport.ReceiveTimeout))
msgType, reader, err := ws.socket.NextReader()
if err != nil {
logging.Log().Debug("WebsocketConnection.GetMessage() ws.socket.NextReader() err:", err)
return "", err
}
// supports only text messages exchange
if msgType != websocket.TextMessage {
logging.Log().Debug("WebsocketConnection.GetMessage() returns errBinaryMessage")
return "", errBinaryMessage
}
data, err := ioutil.ReadAll(reader)
if err != nil {
logging.Log().Debug("WebsocketConnection.GetMessage() returns errBadBuffer")
return "", errBadBuffer
}
text := string(data)
logging.Log().Debug("WebsocketConnection.GetMessage() text:", text)
// empty messages are not allowed
if len(text) == 0 {
logging.Log().Debug("WebsocketConnection.GetMessage() returns errPacketWrong")
return "", errPacketWrong
}
return text, nil
}
// SetSid does nothing for the websocket transport, it's used only when transport changes (from)
func (t *WebsocketTransport) SetSid(string, Connection) {}
// WriteMessage message m into a connection
func (ws *WebsocketConnection) WriteMessage(m string) error {
logging.Log().Debug("WebsocketConnection.WriteMessage() fired with:", m)
ws.socket.SetWriteDeadline(time.Now().Add(ws.transport.SendTimeout))
writer, err := ws.socket.NextWriter(websocket.TextMessage)
if err != nil {
return err
}
if _, err := writer.Write([]byte(m)); err != nil {
return err
}
return writer.Close()
}
// Close the connection
func (ws *WebsocketConnection) Close() error {
logging.Log().Debug("WebsocketConnection.Close() fired")
return ws.socket.Close()
}
// PingParams returns ping params
func (ws *WebsocketConnection) PingParams() (time.Duration, time.Duration) {
return ws.transport.PingInterval, ws.transport.PingTimeout
}
// WebsocketTransport implements websocket transport
type WebsocketTransport struct {
PingInterval time.Duration
PingTimeout time.Duration
ReceiveTimeout time.Duration
SendTimeout time.Duration
BufferSize int
Headers http.Header
TLSClientConfig *tls.Config
}
// Connect to the given url
func (t *WebsocketTransport) Connect(url string) (Connection, error) {
dialer := websocket.Dialer{TLSClientConfig: t.TLSClientConfig}
socket, _, err := dialer.Dial(url, t.Headers)
if err != nil {
return nil, err
}
return &WebsocketConnection{socket, t}, nil
}
// HandleConnection
func (t *WebsocketTransport) HandleConnection(w http.ResponseWriter, r *http.Request) (Connection, error) {
if r.Method != http.MethodGet {
http.Error(w, upgradeFailed+errMethodNotAllowed.Error(), http.StatusServiceUnavailable)
return nil, errMethodNotAllowed
}
socket, err := (&websocket.Upgrader{
ReadBufferSize: t.BufferSize,
WriteBufferSize: t.BufferSize,
}).Upgrade(w, r, nil)
if err != nil {
http.Error(w, upgradeFailed+err.Error(), http.StatusServiceUnavailable)
return nil, errHttpUpgradeFailed
}
return &WebsocketConnection{socket, t}, nil
}
// Serve does nothing here. Websocket connection does not require any additional processing
func (t *WebsocketTransport) Serve(w http.ResponseWriter, r *http.Request) {}
// DefaultWebsocketTransport returns websocket connection with default params
func DefaultWebsocketTransport() *WebsocketTransport {
return &WebsocketTransport{
PingInterval: wsDefaultPingInterval,
PingTimeout: wsDefaultPingTimeout,
ReceiveTimeout: wsDefaultReceiveTimeout,
SendTimeout: wsDefaultSendTimeout,
BufferSize: wsDefaultBufferSize,
}
}
// NewWebsocketTransport returns websocket transport with given params
func NewWebsocketTransport(params WebsocketTransportParams) *WebsocketTransport {
tr := DefaultWebsocketTransport()
tr.Headers = params.Headers
tr.TLSClientConfig = params.TLSClientConfig
return tr
}