/
etcd.go
105 lines (89 loc) · 2.37 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package etcd
import (
"context"
"fmt"
"time"
"github.com/jellydator/ttlcache/v3"
log "github.com/sirupsen/logrus"
"github.com/ushakovn/boiler/internal/pkg/stringer"
"github.com/ushakovn/boiler/pkg/config/provider"
"github.com/ushakovn/boiler/pkg/config/types"
v3 "go.etcd.io/etcd/client/v3"
)
type etcd struct {
appName string
client *v3.Client
cachedKeys *ttlcache.Cache[string, types.Value]
}
type config struct {
client v3.Config
appName string
cacheTTL time.Duration
}
func New(calls ...Option) provider.Values {
options := callOptions(calls...)
client, err := v3.New(options.client)
if err != nil {
log.Fatalf("config: failed to create etcd values provider: %v", err)
}
cache := ttlcache.New[string, types.Value](
ttlcache.WithTTL[string, types.Value](
options.cacheTTL,
),
)
return &etcd{
appName: options.appName,
client: client,
cachedKeys: cache,
}
}
func (e *etcd) Get(ctx context.Context, key string) types.Value {
key = e.buildKey(key)
if value := e.getCached(key); !value.IsNil() {
return value
}
return e.get(ctx, key)
}
func (e *etcd) Watch(ctx context.Context, key string, action func(value types.Value)) {
key = e.buildKey(key)
ch := e.client.Watch(ctx, key)
for {
select {
case resp := <-ch:
for _, event := range resp.Events {
isCreateOrUpdate := event.IsCreate() || event.IsModify()
if !isCreateOrUpdate || event.Kv == nil {
continue
}
value := types.NewValue(string(event.Kv.Value))
e.cachedKeys.Set(key, value, ttlcache.DefaultTTL)
action(value)
}
case <-ctx.Done():
return
}
}
}
func (e *etcd) get(ctx context.Context, key string) types.Value {
resp, err := e.client.Get(ctx, key,
v3.WithSort(v3.SortByVersion, v3.SortDescend),
)
if err != nil || resp.Count == 0 || len(resp.Kvs) == 0 {
return types.NewNilValue()
}
kv := resp.Kvs[len(resp.Kvs)-1]
value := types.NewValue(string(kv.Value))
e.cachedKeys.Set(key, value, ttlcache.DefaultTTL)
return value
}
func (e *etcd) getCached(key string) types.Value {
if value := e.cachedKeys.Get(key); value != nil {
return value.Value()
}
return types.NewNilValue()
}
func (e *etcd) buildKey(key string) string {
key = fmt.Sprintf("%s_%s", e.appName, key)
key = stringer.StringToSnakeCase(key)
return key
}