-
Notifications
You must be signed in to change notification settings - Fork 0
/
websocket.go
209 lines (180 loc) · 4.62 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package websocket
import (
"crypto/tls"
"crypto/x509"
"errors"
"log"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
)
var syncLock = new(sync.Mutex)
var reconnectLock = new(sync.Mutex)
var reconnectBool bool
type Ws struct {
// websocket connection
conn *websocket.Conn
// certificate pool used for secure connections
caPool *x509.CertPool
// set to true to use certificates
secure bool
// url contains the url to connect to
url url.URL
// set to true to send the initMsg when a connection is mademv6l.tar.g
sendInitMsg bool
// message that is to be send when a connection is made
initMsg []byte
// set to true to automatically try to reconnect
reconnect bool
reconnecting bool
// close handler is called when a connection ends
closeHandler func(int, string) error
}
// create a nwe caPool, this is needed since we can not add new certs to an empty cert pool
func init() {
Websocket.caPool = x509.NewCertPool()
}
// semver 2.0
const version = "1.1.0"
// Version return the current version number
func (w *Ws) Version() string {
return version
}
// Read a websocket message
func (w *Ws) Read() (int, []byte, error) {
if w.conn == nil {
_ = w.Connect()
return 0, []byte{}, errors.New("can not read when there is no connection, trying to reconnect")
}
t, d, err := w.conn.ReadMessage()
go w.errCheck(err)
return t, d, err
}
// ReadJSON read a websocket message in json format
func (w *Ws) ReadJSON(v interface{}) error {
if w.conn == nil {
_ = w.Connect()
return errors.New("can not read when there is no connection, trying to reconnect")
}
err := w.conn.ReadJSON(v)
go w.errCheck(err)
return err
}
// WriteMessage write a message
func (w *Ws) WriteMessage(messageType int, data []byte) error {
if w.conn == nil {
_ = w.Connect()
return errors.New("can not write when there is no connection, trying to reconnect")
}
err := w.conn.WriteMessage(messageType, data)
go w.errCheck(err)
return err
}
// WriteJSON write a message in json format
func (w *Ws) WriteJSON(v interface{}) error {
if w.conn == nil {
_ = w.Connect()
return errors.New("can not write when there is no connection, trying to reconnect")
}
err := w.conn.WriteJSON(v)
go w.errCheck(err)
return err
}
// AppendCertsFromPem add a certificate to the certificate pool
func (w *Ws) AppendCertsFromPem(pemCerts []byte) bool {
return w.caPool.AppendCertsFromPEM(pemCerts)
}
// SetUrl set the url to connect to
func (w *Ws) SetUrl(scheme, host, path string) {
w.url = url.URL{Scheme: scheme, Host: host, Path: path}
}
// Connect connect to the websocket server
func (w *Ws) Connect() error {
syncLock.Lock()
defer syncLock.Unlock()
var d websocket.Dialer
if w.secure {
config := tls.Config{RootCAs: w.caPool}
d = websocket.Dialer{TLSClientConfig: &config}
}
c, _, err := d.Dial(w.url.String(), nil)
if err != nil {
return err
}
if w.conn != nil {
err = w.Close()
if err != nil {
log.Println(err)
}
}
w.conn = c
w.conn.SetCloseHandler(w.closeHandler)
if w.sendInitMsg {
return w.WriteMessage(1, w.initMsg)
}
return nil
}
// SetInitMsg set a message to be send when a connection is established
func (w *Ws) SetInitMsg(msg []byte) {
w.sendInitMsg = true
w.initMsg = msg
}
// SetCloseHandler set a close handler to call when a connection ends
func (w *Ws) SetCloseHandler(f func(int, string) error) {
w.closeHandler = f
}
// Reconnect set to true for automatic reconnecting
func (w *Ws) Reconnect(b bool) {
w.reconnect = b
}
// Close the websocket connection
func (w *Ws) Close() error {
if w.conn == nil {
return nil
}
w.WriteMessage(websocket.CloseMessage, []byte{})
return w.conn.Close()
}
// check for network problems
func (w *Ws) errCheck(err error) {
if err != nil {
log.Println(err)
}
if w.reconnecting {
return
}
if w.reconnect && err != nil {
w.reconnecting = true
reconnectLock.Lock()
defer reconnectLock.Unlock()
for err != nil {
err = w.Connect()
time.Sleep(1 * time.Second)
}
w.reconnecting = false
return
}
}
// SetSecure set the secure bit
func (w *Ws) SetSecure(b bool) {
w.secure = b
}
// WriteQueue requires a channel te read message from and a channel to send errors to
// if wil requeue failed messages until the queue is filled, then it will throw them away
// todo create a way to remove old message when the queue is filling up so that there is space for new message
func (w *Ws) WriteQueue(c chan []byte, e chan error) {
go func() {
for bytes := range c {
err := w.WriteMessage(1, bytes)
go w.errCheck(err)
if err != nil {
e <- err
c <- bytes
time.Sleep(1 * time.Second)
}
}
}()
}
// Websocket exported as symbol named "Websocket"
var Websocket Ws