-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
118 lines (102 loc) · 2.36 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package chadango
import (
"context"
"time"
"golang.org/x/net/websocket"
)
// WebSocket represents a WebSocket connection.
// It implements `golang.org/x/net/websocket` under the hood and wraps it into a channel,
// allowing it to be select-able along with other channels.
type WebSocket struct {
Connected bool
Events chan string
OnError func(error)
url string
client *websocket.Conn
runCtx context.Context
cancelFunc context.CancelFunc
}
// Connect establishes a WebSocket connection to the specified URL.
func (w *WebSocket) Connect(url string) (err error) {
if w.Connected {
return
}
w.url = url
w.client, err = websocket.Dial(url, "", WEBSOCKET_ORIGIN)
if err != nil {
return err
}
w.Connected = true
w.Events = make(chan string, EVENT_BUFFER_SIZE)
return
}
// Close closes the WebSocket connection.
func (w *WebSocket) Close() {
if w.Connected {
w.Connected = false
if w.cancelFunc != nil {
w.cancelFunc()
}
w.client.Close()
}
}
// Sustain starts pumping events and keeps the WebSocket connection alive.
func (w *WebSocket) Sustain(ctx context.Context) {
w.runCtx, w.cancelFunc = context.WithCancel(ctx)
go w.pumpEvent()
go w.keepAlive()
}
// pumpEvent pumps incoming events to the Events channel.
func (w *WebSocket) pumpEvent() {
defer func() {
w.Close()
w.Events <- EndFrame
close(w.Events)
}()
var msg string
var err error
for {
if msg, err = w.Recv(); err != nil {
if w.OnError != nil {
w.OnError(err)
}
return
}
w.Events <- msg
}
}
// keepAlive sends periodic ping messages to keep the WebSocket connection alive.
func (w *WebSocket) keepAlive() {
ticker := time.NewTicker(PING_INTERVAL)
defer ticker.Stop()
// This is added as a precaution in case the parent context is canceled before calling `w.Close()`.
defer w.Close()
for {
select {
case <-ticker.C:
if w.Send("\r\n") != nil {
return
}
case <-w.runCtx.Done():
return
}
}
}
// Send sends a message over the WebSocket connection.
func (w *WebSocket) Send(msg string) (err error) {
if w.Connected {
err = websocket.Message.Send(w.client, msg)
} else {
err = ErrNotConnected
}
return
}
// Recv receives a message from the WebSocket connection.
func (w *WebSocket) Recv() (msg string, err error) {
if w.Connected {
err = websocket.Message.Receive(w.client, &msg)
} else {
err = ErrNotConnected
}
return
}