/
stub.go
131 lines (112 loc) · 3.13 KB
/
stub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package test
import (
"encoding/json"
"io"
"net/http"
"github.com/gorilla/websocket"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"github.com/kubeedge/beehive/pkg/core"
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
"github.com/kubeedge/kubeedge/edge/pkg/common/modules"
)
func init() {
core.Register(&stubCloudHub{enable: true})
}
type Attributes struct {
RoleName string `json:"iam_role"`
ProjectID string `json:"project_id"`
}
type stubCloudHub struct {
wsConn *websocket.Conn
enable bool
}
var _ core.Module = (*stubCloudHub)(nil)
func (*stubCloudHub) Name() string {
return "stubCloudHub"
}
func (*stubCloudHub) Group() string {
//return core.MetaGroup
return modules.MetaGroup
}
func (tm *stubCloudHub) Enable() bool {
return tm.enable
}
func (tm *stubCloudHub) eventReadLoop(conn *websocket.Conn, stop chan bool) {
for {
var event interface{}
err := conn.ReadJSON(&event)
if err != nil {
klog.Errorf("read error, connection will be closed: %v", err)
stop <- true
return
}
klog.Infof("cloud hub receive message %+v", event)
}
}
func (tm *stubCloudHub) serveEvent(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
klog.Errorf("fail to build websocket connection: %v", err)
http.Error(w, "fail to upgrade to websocket protocol", http.StatusInternalServerError)
return
}
tm.wsConn = conn
stop := make(chan bool, 1)
klog.Info("edge connected")
go tm.eventReadLoop(conn, stop)
<-stop
tm.wsConn = nil
klog.Info("edge disconnected")
}
func (tm *stubCloudHub) podHandler(w http.ResponseWriter, req *http.Request) {
if req.Body != nil {
body, err := io.ReadAll(req.Body)
if err != nil {
klog.Errorf("read body error %v", err)
w.Write([]byte("read request body error"))
return
}
klog.Infof("request body is %s\n", string(body))
var pod v1.Pod
if err = json.Unmarshal(body, &pod); err != nil {
klog.Errorf("unmarshal request body error %v", err)
w.Write([]byte("unmarshal request body error"))
return
}
var msgReq *model.Message
switch req.Method {
case http.MethodPost:
msgReq = model.NewMessage("").BuildRouter("edgecontroller", "resource",
"node/fake_node_id/pod/"+string(pod.UID), model.InsertOperation).FillBody(pod)
case http.MethodDelete:
msgReq = model.NewMessage("").BuildRouter("edgecontroller", "resource",
"node/fake_node_id/pod/"+string(pod.UID), model.DeleteOperation).FillBody(pod)
}
if tm.wsConn != nil {
tm.wsConn.WriteJSON(*msgReq)
klog.Infof("send message to edgehub is %+v\n", *msgReq)
}
io.WriteString(w, "OK\n")
}
}
func (tm *stubCloudHub) Start() {
defer tm.Cleanup()
mux := http.NewServeMux()
mux.HandleFunc("/{group_id}/events", tm.serveEvent) // for edge-hub
mux.HandleFunc("/pod", tm.podHandler) // for pod test
s := http.Server{
Addr: "127.0.0.1:20000",
Handler: mux,
}
klog.Info("Start cloud hub service")
err := s.ListenAndServe()
if err != nil {
klog.Errorf("ListenAndServe: %v", err)
}
}
func (tm *stubCloudHub) Cleanup() {
beehiveContext.Cleanup(tm.Name())
}