forked from lalamove/konfig
-
Notifications
You must be signed in to change notification settings - Fork 0
/
etcdloader.go
174 lines (147 loc) · 4.01 KB
/
etcdloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package kletcd
import (
"bytes"
"context"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/lalamove/konfig"
"github.com/lalamove/konfig/parser"
"github.com/lalamove/konfig/watcher/kwpoll"
"github.com/lalamove/nui/ncontext"
"github.com/lalamove/nui/nstrings"
"go.etcd.io/etcd/clientv3"
)
var (
defaultTimeout = 5 * time.Second
_ konfig.Loader = (*Loader)(nil)
)
const (
defaultName = "etcd"
)
// Key is an Etcd Key to load
type Key struct {
// Key is the etcd key
Key string
// Parser is the parser for the key
// If nil, the value is casted to a string before adding to the config.Store
Parser parser.Parser
}
// Config is the structure representing the config of a Loader
type Config struct {
// Name is the name of the loader
Name string
// StopOnFailure tells whether a failure to load configs should closed the config and all registered closers
StopOnFailure bool
// Client is the etcd client
Client *clientv3.Client
// Keys is the list of keys to fetch
Keys []Key
// Timeout is the timeout duration when fetching a key
Timeout time.Duration
// Prefix is a prefix to prepend keys when adding into the konfig.Store
Prefix string
// Replacer is a Replacer for the key before adding to the konfig.Store
Replacer nstrings.Replacer
// Watch tells whether there should be a watcher with the loader
Watch bool
// Rater is the rater to pass to the poll watcher
Rater kwpoll.Rater
// MaxRetry is the maximum number of times we can retry to load if it fails
MaxRetry int
// RetryDelay is the time between each retry when a load fails
RetryDelay time.Duration
// Debug sets debug mode on the etcdloader
Debug bool
// Contexter provides a context, default value is contexter wrapping context package. It is used mostly for testing.
Contexter ncontext.Contexter
kvClient clientv3.KV
}
// Loader is the structure of a loader
type Loader struct {
*kwpoll.PollWatcher
cfg *Config
}
// New returns a new loader with the given config
func New(cfg *Config) *Loader {
if cfg.Timeout == 0 {
cfg.Timeout = defaultTimeout
}
if cfg.Contexter == nil {
cfg.Contexter = ncontext.DefaultContexter
}
if cfg.Name == "" {
cfg.Name = defaultName
}
if cfg.kvClient == nil {
cfg.kvClient = cfg.Client.KV
}
var l = &Loader{
cfg: cfg,
}
if cfg.Watch {
var v = konfig.Values{}
var err = l.Load(v)
if err != nil {
panic(err)
}
l.PollWatcher = kwpoll.New(&kwpoll.Config{
Loader: l,
Rater: cfg.Rater,
InitValue: v,
Debug: cfg.Debug,
Diff: true,
})
}
return l
}
// Name returns the name of the loader
func (l *Loader) Name() string { return l.cfg.Name }
// Load loads the values from the keys defined by the config in the konfig.Store
func (l *Loader) Load(s konfig.Values) error {
for _, k := range l.cfg.Keys {
values, err := l.keyValue(k.Key)
if err != nil {
return err
}
for _, v := range values {
var configKey = l.cfg.Prefix + string(v.Key)
if l.cfg.Replacer != nil {
configKey = l.cfg.Replacer.Replace(configKey)
}
// if the key has a parser, we parse the key value using the provided Parser
// else we just convert the value to a string
if k.Parser != nil {
if err := k.Parser.Parse(bytes.NewReader(v.Value), s); err != nil {
return err
}
} else {
s.Set(configKey, string(v.Value))
}
}
}
return nil
}
// MaxRetry is the maximum number of time to retry when a load fails
func (l *Loader) MaxRetry() int {
return l.cfg.MaxRetry
}
// RetryDelay is the delay between each retry
func (l *Loader) RetryDelay() time.Duration {
return l.cfg.RetryDelay
}
func (l *Loader) keyValue(k string) ([]*mvccpb.KeyValue, error) {
var ctx, cancel = l.cfg.Contexter.WithTimeout(
context.Background(),
l.cfg.Timeout,
)
defer cancel()
values, err := l.cfg.kvClient.Get(ctx, k)
if err != nil {
return nil, err
}
return values.Kvs, nil
}
// StopOnFailure returns whether a load failure should stop the config and the registered closers
func (l *Loader) StopOnFailure() bool {
return l.cfg.StopOnFailure
}