-
Notifications
You must be signed in to change notification settings - Fork 4
/
consul.go
116 lines (104 loc) · 2.75 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package consul
import (
"errors"
"io/fs"
"log/slog"
"strings"
"time"
consul "github.com/hashicorp/consul/api"
"github.com/mss-boot-io/mss-boot/pkg"
"github.com/mss-boot-io/mss-boot/pkg/config/source"
)
/*
* @Author: lwnmengjing<lwnmengjing@qq.com>
* @Date: 2024/4/28 22:26:27
* @Last Modified by: lwnmengjing<lwnmengjing@qq.com>
* @Last Modified time: 2024/4/28 22:26:27
*/
// Source is a consul source
type Source struct {
opt *source.Options
name string
}
func (s *Source) GetExtend() source.Scheme {
return s.opt.Extend
}
// Open a file for reading
func (s *Source) Open(_ string) (fs.File, error) {
return nil, errors.New("method Get not implemented")
}
func (s *Source) ReadFile(name string) (rb []byte, err error) {
client, err := consul.NewClient(consul.DefaultConfig())
if err != nil {
return nil, err
}
key := strings.ReplaceAll(s.opt.Dir, "\\", "/")
var kvPair *consul.KVPair
for i := range source.Extends {
kvPair, _, err = client.KV().Get(key+"/"+name+"."+string(source.Extends[i]), nil)
if err != nil {
break
}
if kvPair != nil {
s.name = name
s.opt.Extend = source.Extends[i]
return kvPair.Value, nil
}
}
return nil, err
}
func (s *Source) Watch(c source.Entity, unm func([]byte, any) error) error {
client, err := consul.NewClient(consul.DefaultConfig())
if err != nil {
return err
}
key := strings.ReplaceAll(s.opt.Dir, "\\", "/")
// 监听配置变化
go func(sc *Source, cfg source.Entity, decoder func([]byte, any) error) {
var pairs consul.KVPairs
var meta *consul.QueryMeta
params := &consul.QueryOptions{WaitIndex: 0, WaitTime: 10 * time.Minute}
for {
// 获取最新的配置变化
pairs, meta, err = client.KV().List(key, params)
if err != nil {
slog.Error("Error watching for config changes", slog.Any("err", err))
continue
}
if params.WaitIndex == 0 {
params.WaitIndex = meta.LastIndex
continue
}
if meta.LastIndex > params.WaitIndex {
mapPairs := make(map[string]*consul.KVPair)
for i := range pairs {
mapPairs[pairs[i].Key] = pairs[i]
}
pair := mapPairs[key+"/"+sc.name+"."+string(sc.opt.Extend)]
if pair != nil {
if err = decoder(pair.Value, cfg); err != nil {
slog.Error("Failed to decode config", slog.Any("error", err))
}
}
pair = mapPairs[key+"/"+sc.name+"-"+pkg.GetStage()+"."+string(sc.opt.Extend)]
if pair != nil {
if err = decoder(pair.Value, cfg); err != nil {
slog.Error("Failed to decode config", slog.Any("error", err))
}
}
cfg.OnChange()
}
params.WaitIndex = meta.LastIndex
}
}(s, c, unm)
return nil
}
func New(options ...source.Option) (*Source, error) {
s := &Source{
opt: source.DefaultOptions(),
}
for _, opt := range options {
opt(s.opt)
}
return s, nil
}