forked from xmidt-org/kratos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
206 lines (178 loc) · 5.49 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package kratos
import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/gorilla/websocket"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/wrp-go/v3"
)
// Client is what function calls we expose to the user of kratos
type Client interface {
Hostname() string
HandlerRegistry() HandlerRegistry
Send(message *wrp.Message)
Close() error
}
// sendWRPFunc is the function for sending a message downstream.
type sendWRPFunc func(*wrp.Message)
type client struct {
deviceID string
userAgent string
deviceProtocols string
hostname string
registry HandlerRegistry
handlePingMiss HandlePingMiss
encoderSender encoderSender
decoderSender decoderSender
connection websocketConnection
headerInfo *clientHeader
logger log.Logger
done chan struct{}
wg sync.WaitGroup
pingConfig PingConfig
once sync.Once
config ClientConfig
connected bool
connectedMutex sync.Mutex
}
// used to track everything that we want to know about the client headers
type clientHeader struct {
deviceName string
firmwareName string
modelName string
manufacturer string
token string
}
// websocketConnection maintains the websocket connection upstream (to XMiDT).
type websocketConnection interface {
WriteMessage(messageType int, data []byte) error
ReadMessage() (messageType int, p []byte, err error)
Close() error
}
// Hostname provides the client's hostname.
func (c *client) Hostname() string {
return c.hostname
}
// HandlerRegistry returns the HandlerRegistry that the client maintains.
func (c *client) HandlerRegistry() HandlerRegistry {
return c.registry
}
// Send is used to open a channel for writing to XMiDT
func (c *client) Send(message *wrp.Message) {
if !c.isConnected() {
logging.Warn(c.logger).Log(logging.MessageKey(), "Send failed because connection is closed...")
return
}
c.encoderSender.EncodeAndSend(message)
}
// Close closes connections downstream and the socket upstream.
func (c *client) Close() error {
var connectionErr error
c.once.Do(func() {
logging.Info(c.logger).Log(logging.MessageKey(), "Closing client...")
close(c.done)
c.wg.Wait()
c.decoderSender.Close()
c.encoderSender.Close()
connectionErr = c.connection.Close()
c.connection = nil
// TODO: if this fails, can we really do anything. Is there potential for leaks?
// if err != nil {
// return emperror.Wrap(err, "Failed to close connection")
// }
logging.Info(c.logger).Log(logging.MessageKey(), "Client Closed")
})
return connectionErr
}
func (c *client) setConnected() {
c.connectedMutex.Lock()
c.connected = true
c.connectedMutex.Unlock()
}
func (c *client) RequestReconnect() {
logging.Info(c.logger).Log(logging.MessageKey(), "Client RequestReconnect...")
c.connectedMutex.Lock()
c.connected = false
if c.connection != nil {
c.connection.Close()
c.encoderSender.Close()
c.connection = nil
c.encoderSender = nil
}
c.connectedMutex.Unlock()
}
func (c *client) isConnected() (connected bool) {
c.connectedMutex.Lock()
connected = c.connected
c.connectedMutex.Unlock()
return connected
}
// going to be used to access the HandleMessage() function
func (c *client) read() {
defer c.wg.Done()
logging.Info(c.logger).Log(logging.MessageKey(), "Watching socket for messages.")
for {
select {
case <-c.done:
logging.Info(c.logger).Log(logging.MessageKey(), "Stopped reading from socket.")
return
default:
if !c.isConnected() {
c.reconnect()
continue
}
logging.Debug(c.logger).Log(logging.MessageKey(), "Reading message...")
mt, serverMessage, err := c.connection.ReadMessage()
if mt == websocket.CloseMessage {
logging.Info(c.logger).Log(logging.MessageKey(), "Received close message type.")
c.RequestReconnect()
continue
}
logging.Info(c.logger).Log(logging.MessageKey(), "Read Message")
if err != nil {
logging.Info(c.logger).Log(logging.MessageKey(), "Failed to read message.", logging.ErrorKey(), err.Error())
c.RequestReconnect()
continue
}
c.decoderSender.DecodeAndSend(serverMessage)
logging.Debug(c.logger).Log(logging.MessageKey(), "Message sent to be decoded")
}
}
}
func (c *client) reconnect() error {
config := c.config
inHeader := &clientHeader{
deviceName: config.DeviceName,
firmwareName: config.FirmwareName,
modelName: config.ModelName,
manufacturer: config.Manufacturer,
token: config.Token,
}
logging.Warn(c.logger).Log(logging.MessageKey(), "Reconnecting...")
newConnection, _, err := createConnection(inHeader, config.DestinationURL, config.TlsConfig)
if err != nil {
logging.Debug(c.logger).Log(logging.MessageKey(), "Failed to connect, retrying after a timeout", "error", err)
<-time.After(time.Second * 2)
return err
}
logging.Warn(c.logger).Log(logging.MessageKey(), "Reconnected successfully...")
pinged := make(chan string)
newConnection.SetPingHandler(func(appData string) error {
pinged <- appData
err := newConnection.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(writeWait))
return err
})
var logger log.Logger
if config.ClientLogger != nil {
logger = config.ClientLogger
} else {
logger = logging.DefaultLogger()
}
sender := NewSender(newConnection, config.OutboundQueue.MaxWorkers, config.OutboundQueue.Size, logger)
encoder := NewEncoderSender(sender, config.WRPEncoderQueue.MaxWorkers, config.WRPEncoderQueue.Size, logger)
c.connection = newConnection
c.encoderSender = encoder
c.setConnected()
return nil
}