forked from kubeedge/kubeedge
/
metaclient.go
109 lines (90 loc) · 2.63 KB
/
metaclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package client
import (
"time"
"github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
syncPeriod = 10 * time.Millisecond
syncMsgRespTimeout = 1 * time.Minute
)
//CoreInterface is interface of mataclient
type CoreInterface interface {
PodsGetter
PodStatusGetter
ConfigMapsGetter
NodesGetter
NodeStatusGetter
SecretsGetter
EndpointsGetter
ServiceGetter
}
type metaClient struct {
context *context.Context
send SendInterface
}
func (m *metaClient) Pods(namespace string) PodsInterface {
return newPods(namespace, m.context, m.send)
}
func (m *metaClient) ConfigMaps(namespace string) ConfigMapsInterface {
return newConfigMaps(namespace, m.context, m.send)
}
func (m *metaClient) Nodes(namespace string) NodesInterface {
return newNodes(namespace, m.context, m.send)
}
func (m *metaClient) NodeStatus(namespace string) NodeStatusInterface {
return newNodeStatus(namespace, m.context, m.send)
}
func (m *metaClient) Secrets(namespace string) SecretsInterface {
return newSecrets(namespace, m.context, m.send)
}
func (m *metaClient) PodStatus(namespace string) PodStatusInterface {
return newPodStatus(namespace, m.context, m.send)
}
//New creates a new metaclient
func (m *metaClient) Endpoints(namespace string) EndpointsInterface {
return newEndpoints(namespace, m.context, m.send)
}
// New Services metaClient
func (m *metaClient) Services(namespace string) ServiceInterface {
return newServices(namespace, m.context, m.send)
}
// New creates new metaclient
func New(c *context.Context) CoreInterface {
return &metaClient{
context: c,
send: newSend(c),
}
}
//SendInterface is to sync interface
type SendInterface interface {
SendSync(message *model.Message) (*model.Message, error)
}
type send struct {
context *context.Context
}
func newSend(c *context.Context) SendInterface {
return &send{c}
}
func (s *send) SendSync(message *model.Message) (*model.Message, error) {
var err error
var resp model.Message
retries := 0
err = wait.Poll(syncPeriod, syncMsgRespTimeout, func() (bool, error) {
resp, err = s.context.SendSync(metamanager.MetaManagerModuleName, *message, syncMsgRespTimeout)
retries++
if err == nil {
log.LOGGER.Infof("send sync message %s successed and response: %v", message.GetResource(), resp)
return true, nil
}
if retries < 3 {
log.LOGGER.Errorf("send sync message %s failed, error:%v, retries: %d", message.GetResource(), err, retries)
return false, nil
}
return true, err
})
return &resp, err
}