/
consul.go
131 lines (108 loc) · 2.7 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package consul
import (
"fmt"
"net"
"time"
"github.com/hashicorp/consul/api"
"github.com/itzmanish/go-micro/v2/config/source"
)
// Currently a single consul reader
type consul struct {
prefix string
stripPrefix string
addr string
opts source.Options
client *api.Client
}
var (
// DefaultPrefix is the prefix that consul keys will be assumed to have if you
// haven't specified one
DefaultPrefix = "/micro/config/"
)
func (c *consul) Read() (*source.ChangeSet, error) {
kv, _, err := c.client.KV().List(c.prefix, nil)
if err != nil {
return nil, err
}
if kv == nil || len(kv) == 0 {
return nil, fmt.Errorf("source not found: %s", c.prefix)
}
data, err := makeMap(c.opts.Encoder, kv, c.stripPrefix)
if err != nil {
return nil, fmt.Errorf("error reading data: %v", err)
}
b, err := c.opts.Encoder.Encode(data)
if err != nil {
return nil, fmt.Errorf("error reading source: %v", err)
}
cs := &source.ChangeSet{
Timestamp: time.Now(),
Format: c.opts.Encoder.String(),
Source: c.String(),
Data: b,
}
cs.Checksum = cs.Sum()
return cs, nil
}
// Write is unsupported
func (c *consul) Write(cs *source.ChangeSet) error {
return nil
}
func (c *consul) String() string {
return "consul"
}
func (c *consul) Watch() (source.Watcher, error) {
w, err := newWatcher(c.prefix, c.addr, c.String(), c.stripPrefix, c.opts.Encoder)
if err != nil {
return nil, err
}
return w, nil
}
// NewSource creates a new consul source
func NewSource(opts ...source.Option) source.Source {
options := source.NewOptions(opts...)
// use default config
config := api.DefaultConfig()
// use the consul config passed in the options if any
if co, ok := options.Context.Value(configKey{}).(*api.Config); ok {
config = co
}
// check if there are any addrs
a, ok := options.Context.Value(addressKey{}).(string)
if ok {
addr, port, err := net.SplitHostPort(a)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
addr = a
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
dc, ok := options.Context.Value(dcKey{}).(string)
if ok {
config.Datacenter = dc
}
token, ok := options.Context.Value(tokenKey{}).(string)
if ok {
config.Token = token
}
// create the client
client, _ := api.NewClient(config)
prefix := DefaultPrefix
sp := ""
f, ok := options.Context.Value(prefixKey{}).(string)
if ok {
prefix = f
}
if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b {
sp = prefix
}
return &consul{
prefix: prefix,
stripPrefix: sp,
addr: config.Address,
opts: options,
client: client,
}
}