This repository has been archived by the owner on Oct 24, 2022. It is now read-only.
forked from kubeedge/kubeedge
/
server.go
365 lines (328 loc) · 11 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
package wsserver
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
bhLog "github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/channelq"
hubio "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/io"
emodel "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/model"
)
// ExitCode exit code
type ExitCode int
const (
webSocketReadFail ExitCode = iota
webSocketWriteFail
eventQueueDisconnect
nodeStop
)
// constants for error message
const (
MsgFormatError = "message format not correct"
)
// constants for api path
const (
PathEvent = "/{project_id}/{node_id}/events"
)
// EventHandler handle all event
var EventHandler *EventHandle
// AccessHandle access handler
type AccessHandle struct {
EventHandle *EventHandle
NodeLimit int
}
// EventHandle processes events between cloud and edge
type EventHandle struct {
KeepaliveInterval int
WriteTimeout int
Nodes sync.Map
nodeConns sync.Map
nodeLocks sync.Map
EventQueue *channelq.ChannelEventQueue
Context *context.Context
}
func dumpEventMetadata(event *emodel.Event) string {
return fmt.Sprintf("id: %s, parent_id: %s, group: %s, source: %s, resource: %s, operation: %s",
event.ID, event.ParentID, event.Group, event.Source, event.UserGroup.Resource, event.UserGroup.Operation)
}
func trimMessage(msg *model.Message) {
resource := msg.GetResource()
if strings.HasPrefix(resource, emodel.ResNode) {
tokens := strings.Split(resource, "/")
if len(tokens) < 3 {
bhLog.LOGGER.Warnf("event resource %s starts with node but length less than 3", resource)
} else {
msg.SetResourceOperation(strings.Join(tokens[2:], "/"), msg.GetOperation())
}
}
}
// EventReadLoop processes all read requests
func (eh *EventHandle) EventReadLoop(hi hubio.CloudHubIO, info *emodel.HubInfo, stop chan ExitCode) {
for {
var msg model.Message
// set the read timeout as the keepalive interval so that we can disconnect when heart beat is lost
err := hi.SetReadDeadline(time.Now().Add(time.Duration(eh.KeepaliveInterval) * time.Second))
if err != nil {
bhLog.LOGGER.Errorf("SetReadDeadline error, %s", err.Error())
stop <- webSocketReadFail
return
}
_, err = hi.ReadData(&msg)
if err != nil {
bhLog.LOGGER.Errorf("read error, connection for node %s will be closed, reason: %s", info.NodeID, err.Error())
stop <- webSocketReadFail
return
}
if msg.GetOperation() == emodel.OpKeepalive {
bhLog.LOGGER.Infof("Keepalive message received from node: %s", info.NodeID)
continue
}
msg.SetResourceOperation(fmt.Sprintf("node/%s/%s", info.NodeID, msg.GetResource()), msg.GetOperation())
event := emodel.MessageToEvent(&msg)
bhLog.LOGGER.Infof("event received for node %s %s, content: %s", info.NodeID, dumpEventMetadata(&event), event.Content)
if event.IsFromEdge() {
err := eh.EventQueue.Publish(info, &event)
if err != nil {
// content is not logged since it may contain sensitive information
bhLog.LOGGER.Errorf("fail to publish event for node %s, %s, reason: %s",
info.NodeID, dumpEventMetadata(&event), err.Error())
stop <- eventQueueDisconnect
return
}
}
}
}
func (eh *EventHandle) handleNodeQuery(info *emodel.HubInfo, event *emodel.Event) (bool, error) {
if event.UserGroup.Operation != "request_exist" {
return false, nil
}
msg := model.NewMessage(event.ID)
event.ID = msg.GetID()
event.ParentID = msg.GetParentID()
event.Timestamp = msg.GetTimestamp()
event.UserGroup.Operation = "response_exist"
return true, eh.EventQueue.Publish(info, event)
}
// EventWriteLoop processes all write requests
func (eh *EventHandle) EventWriteLoop(hi hubio.CloudHubIO, info *emodel.HubInfo, stop chan ExitCode) {
events, err := eh.EventQueue.Consume(info)
if err != nil {
bhLog.LOGGER.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
stop <- eventQueueDisconnect
return
}
for {
event, err := events.Get()
if err != nil {
bhLog.LOGGER.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
if err.Error() == MsgFormatError {
// error format message should not impact other message
events.Ack()
continue
}
stop <- eventQueueDisconnect
return
}
isQuery, err := eh.handleNodeQuery(info, event)
if err != nil {
bhLog.LOGGER.Errorf("failed to process node query event for node %s, reason %s", info.NodeID, err.Error())
}
if isQuery {
events.Ack()
continue
}
if event.IsNodeStopped() {
bhLog.LOGGER.Infof("node %s is stopped, will disconnect", info.NodeID)
events.Ack()
stop <- nodeStop
return
}
if !event.IsToEdge() {
bhLog.LOGGER.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpEventMetadata(event), event.Content)
events.Ack()
continue
}
bhLog.LOGGER.Infof("event to send for node %s, %s, content %s", info.NodeID, dumpEventMetadata(event), event.Content)
msg := emodel.EventToMessage(event)
trimMessage(&msg)
err = hi.SetWriteDeadline(time.Now().Add(time.Duration(eh.WriteTimeout) * time.Second))
if err != nil {
bhLog.LOGGER.Errorf("SetWriteDeadline error, %s", err.Error())
stop <- webSocketWriteFail
return
}
err = eh.webSocketWrite(hi, info.NodeID, &msg)
if err != nil {
bhLog.LOGGER.Errorf("write error, connection for node %s will be closed, affected event %s, reason %s",
info.NodeID, dumpEventMetadata(event), err.Error())
stop <- webSocketWriteFail
return
}
events.Ack()
}
}
func (eh *EventHandle) webSocketWrite(hi hubio.CloudHubIO, nodeID string, v interface{}) error {
value, ok := eh.nodeLocks.Load(nodeID)
if !ok {
return fmt.Errorf("node disconnected")
}
mutex := value.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
return hi.WriteData(v)
}
func notifyEventQueueError(hi hubio.CloudHubIO, code ExitCode, nodeID string) {
if code == eventQueueDisconnect {
msg := model.NewMessage("").BuildRouter(emodel.GpResource, emodel.SrcCloudHub, emodel.NewResource(emodel.ResNode, nodeID, nil), emodel.OpDisConnect)
err := hi.WriteData(msg)
if err != nil {
bhLog.LOGGER.Errorf("fail to notify node %s event queue disconnected, reason: %s", nodeID, err.Error())
}
}
}
func constructConnectEvent(info *emodel.HubInfo, isConnected bool) *emodel.Event {
connected := emodel.OpConnect
if !isConnected {
connected = emodel.OpDisConnect
}
body := map[string]interface{}{
"event_type": connected,
"timestamp": time.Now().Unix(),
"client_id": info.NodeID}
content, _ := json.Marshal(body)
msg := model.NewMessage("")
return &emodel.Event{
Group: emodel.GpResource,
Source: emodel.SrcCloudHub,
UserGroup: emodel.UserGroupInfo{
Resource: emodel.NewResource(emodel.ResNode, info.NodeID, nil),
Operation: connected,
},
ID: msg.GetID(),
ParentID: msg.GetParentID(),
Timestamp: msg.GetTimestamp(),
Content: string(content),
}
}
// ServeEvent handle the event coming from websocket
func (ah *AccessHandle) ServeEvent(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
projectID := vars["project_id"]
nodeID := vars["node_id"]
if ah.EventHandle.GetNodeCount() >= ah.NodeLimit {
bhLog.LOGGER.Errorf("fail to serve node %s, reach node limit", nodeID)
http.Error(w, "too many Nodes connected", http.StatusTooManyRequests)
return
}
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
bhLog.LOGGER.Errorf("fail to build websocket connection for node %s, reason %s", nodeID, err.Error())
http.Error(w, "failed to upgrade to websocket protocol", http.StatusInternalServerError)
return
}
info := &emodel.HubInfo{ProjectID: projectID, NodeID: nodeID}
hi := &hubio.JSONWSIO{WSConn: conn}
ah.EventHandle.ServeConn(hi, info)
}
// ServeConn starts serving the incoming connection
func (eh *EventHandle) ServeConn(hi hubio.CloudHubIO, info *emodel.HubInfo) {
err := eh.EventQueue.Connect(info)
if err != nil {
bhLog.LOGGER.Errorf("fail to connect to event queue for node %s, reason %s", info.NodeID, err.Error())
notifyEventQueueError(hi, eventQueueDisconnect, info.NodeID)
err = hi.Close()
if err != nil {
bhLog.LOGGER.Errorf("fail to close connection, reason: %s", err.Error())
}
return
}
err = eh.EventQueue.Publish(info, constructConnectEvent(info, true))
if err != nil {
bhLog.LOGGER.Errorf("fail to publish node connect event for node %s, reason %s", info.NodeID, err.Error())
notifyEventQueueError(hi, eventQueueDisconnect, info.NodeID)
err = hi.Close()
if err != nil {
bhLog.LOGGER.Errorf("fail to close connection, reason: %s", err.Error())
}
eh.EventQueue.Close(info)
return
}
eh.nodeConns.Store(info.NodeID, hi)
eh.nodeLocks.Store(info.NodeID, &sync.Mutex{})
eh.Nodes.Store(info.NodeID, true)
bhLog.LOGGER.Infof("edge node %s for project %s connected", info.NodeID, info.ProjectID)
stop := make(chan ExitCode, 2)
go eh.EventReadLoop(hi, info, stop)
go eh.EventWriteLoop(hi, info, stop)
code := <-stop
bhLog.LOGGER.Infof("edge node %s for project %s disconnected", info.NodeID, info.ProjectID)
eh.nodeLocks.Delete(info.NodeID)
eh.nodeConns.Delete(info.NodeID)
err = eh.EventQueue.Publish(info, constructConnectEvent(info, false))
if err != nil {
bhLog.LOGGER.Errorf("fail to publish node disconnect event for node %s, reason %s", info.NodeID, err.Error())
}
notifyEventQueueError(hi, code, info.NodeID)
eh.Nodes.Delete(info.NodeID)
err = hi.Close()
if err != nil {
bhLog.LOGGER.Errorf("fail to close connection, reason: %s", err.Error())
}
eh.EventQueue.Close(info)
}
// ServeQueueWorkload handle workload from queue
func (ah *AccessHandle) ServeQueueWorkload(w http.ResponseWriter, r *http.Request) {
workload, err := ah.EventHandle.GetWorkload()
if err != nil {
bhLog.LOGGER.Errorf("%s", err.Error())
http.Error(w, "fail to get event queue workload", http.StatusInternalServerError)
return
}
_, err = io.WriteString(w, fmt.Sprintf("%f", workload))
if err != nil {
bhLog.LOGGER.Errorf("fail to write string, reason: %s", err.Error())
}
}
// GetNodeCount returns the number of connected Nodes
func (eh *EventHandle) GetNodeCount() int {
var num int
iter := func(key, value interface{}) bool {
num++
return true
}
eh.Nodes.Range(iter)
return num
}
// GetWorkload returns the workload of the event queue
func (eh *EventHandle) GetWorkload() (float64, error) {
return eh.EventQueue.Workload()
}
// returns if the event queue is available or not.
// returns 0 if not available and 1 if available.
func (ah *AccessHandle) getEventQueueAvailability() int {
_, err := ah.EventHandle.GetWorkload()
if err != nil {
bhLog.LOGGER.Errorf("eventq is not available, reason %s", err.Error())
return 0
}
return 1
}
// FilterWriter filter writer
type FilterWriter struct{}
func (f *FilterWriter) Write(p []byte) (n int, err error) {
output := string(p)
if strings.Contains(output, "http: TLS handshake error from") {
return 0, nil
}
return os.Stderr.Write(p)
}