/
kubecm.go
177 lines (163 loc) · 5.4 KB
/
kubecm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/joy12825/gf.
// Package kubecm implements gcfg.Adapter using kubernetes configmap.
package kubecm
import (
"context"
"fmt"
kubeMetaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"github.com/joy12825/gf/encoding/gjson"
"github.com/joy12825/gf/errors/gerror"
"github.com/joy12825/gf/frame/g"
"github.com/joy12825/gf/os/gcfg"
"github.com/joy12825/gf/util/gutil"
)
// Client implements gcfg.Adapter.
type Client struct {
config Config // Config object when created.
client *kubernetes.Clientset // Kubernetes client.
value *g.Var // Configmap content cached. It is `*gjson.Json` value internally.
}
// Config for Client.
type Config struct {
ConfigMap string `v:"required"` // ConfigMap name.
DataItem string `v:"required"` // DataItem is the key item in Configmap data.
Namespace string // Specify the namespace for configmap.
RestConfig *rest.Config // Custom rest config for kube client.
KubeClient *kubernetes.Clientset // Custom kube client.
Watch bool // Watch watches remote configuration updates, which updates local configuration in memory immediately when remote configuration changes.
}
// New creates and returns gcfg.Adapter implementing using kubernetes configmap.
func New(ctx context.Context, config Config) (adapter gcfg.Adapter, err error) {
// Data validation.
err = g.Validator().Data(config).Run(ctx)
if err != nil {
return nil, err
}
// Kubernetes client creating.
if config.KubeClient == nil {
if config.RestConfig == nil {
config.RestConfig, err = NewDefaultKubeConfig(ctx)
if err != nil {
return nil, gerror.Wrapf(err, `create kube config failed`)
}
}
config.KubeClient, err = kubernetes.NewForConfig(config.RestConfig)
if err != nil {
return nil, gerror.Wrapf(err, `create kube client failed`)
}
}
adapter = &Client{
config: config,
client: config.KubeClient,
value: g.NewVar(nil, true),
}
return
}
// Available checks and returns the backend configuration service is available.
// The optional parameter `resource` specifies certain configuration resource.
//
// Note that this function does not return error as it just does simply check for
// backend configuration service.
func (c *Client) Available(ctx context.Context, configMap ...string) (ok bool) {
if len(configMap) == 0 && !c.value.IsNil() {
return true
}
var (
namespace = gutil.GetOrDefaultStr(Namespace(), c.config.Namespace)
configMapName = gutil.GetOrDefaultStr(c.config.ConfigMap, configMap...)
)
_, err := c.config.KubeClient.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, kubeMetaV1.GetOptions{})
if err != nil {
return false
}
return true
}
// Get retrieves and returns value by specified `pattern` in current resource.
// Pattern like:
// "x.y.z" for map item.
// "x.0.y" for slice item.
func (c *Client) Get(ctx context.Context, pattern string) (value interface{}, err error) {
if c.value.IsNil() {
if err = c.updateLocalValueAndWatch(ctx); err != nil {
return nil, err
}
}
return c.value.Val().(*gjson.Json).Get(pattern).Val(), nil
}
// Data retrieves and returns all configuration data in current resource as map.
// Note that this function may lead lots of memory usage if configuration data is too large,
// you can implement this function if necessary.
func (c *Client) Data(ctx context.Context) (data map[string]interface{}, err error) {
if c.value.IsNil() {
if err = c.updateLocalValueAndWatch(ctx); err != nil {
return nil, err
}
}
return c.value.Val().(*gjson.Json).Map(), nil
}
// init retrieves and caches the configmap content.
func (c *Client) updateLocalValueAndWatch(ctx context.Context) (err error) {
var namespace = gutil.GetOrDefaultStr(Namespace(), c.config.Namespace)
err = c.doUpdate(ctx, namespace)
if err != nil {
return err
}
err = c.doWatch(ctx, namespace)
if err != nil {
return err
}
return nil
}
func (c *Client) doUpdate(ctx context.Context, namespace string) (err error) {
cm, err := c.client.CoreV1().ConfigMaps(namespace).Get(ctx, c.config.ConfigMap, kubeMetaV1.GetOptions{})
if err != nil {
return gerror.Wrapf(
err,
`retrieve configmap "%s" from namespace "%s" failed`,
c.config.ConfigMap, namespace,
)
}
var j *gjson.Json
if j, err = gjson.LoadContent(cm.Data[c.config.DataItem]); err != nil {
return gerror.Wrapf(
err,
`parse config map item from %s[%s] failed`, c.config.ConfigMap, c.config.DataItem,
)
}
c.value.Set(j)
return nil
}
func (c *Client) doWatch(ctx context.Context, namespace string) (err error) {
if !c.config.Watch {
return nil
}
var watchHandler watch.Interface
watchHandler, err = c.client.CoreV1().ConfigMaps(namespace).Watch(ctx, kubeMetaV1.ListOptions{
FieldSelector: fmt.Sprintf(`metadata.name=%s`, c.config.ConfigMap),
Watch: true,
})
if err != nil {
return gerror.Wrapf(
err,
`watch configmap "%s" from namespace "%s" failed`,
c.config.ConfigMap, namespace,
)
}
go func() {
for {
event := <-watchHandler.ResultChan()
switch event.Type {
case watch.Modified:
_ = c.doUpdate(ctx, namespace)
}
}
}()
return nil
}