-
Notifications
You must be signed in to change notification settings - Fork 0
/
datacenter.go
137 lines (118 loc) · 3.07 KB
/
datacenter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package mconsul
import (
"strings"
"sync"
"time"
"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"github.com/o-kit/micro-kit/misc/context"
)
// consul 数据中心 - 通过wan进行访问
// 默认为本地数据中心 - 一般局域网内只会访问本地的consul
var DefaultDatacenter = NewDatacenter(discoverConfig.Datacenter)
type Datacenter struct {
Name string // 数据中心名称,第一个数据中心默认:dc1
instance *api.Client // 请求这个数据中心的客户端
instanceOnce sync.Once // 保证只能实例化一个
}
func NewDatacenter(name string) *Datacenter {
return &Datacenter{
Name: name,
}
}
func GetDefaultDatacenter() *Datacenter {
return DefaultDatacenter
}
// 获取 consul client
func (dc *Datacenter) getConsul() *api.Client {
dc.instanceOnce.Do(func() {
cli, err := api.NewClient(&api.Config{
Address: discoverConfig.Address,
Datacenter: dc.Name,
})
if err != nil {
panic(err)
}
dc.instance = cli
})
return dc.instance
}
// 获取数据中心的写权限 - 添加token验证
func (dc *Datacenter) getWriteOption() *api.WriteOptions {
return &api.WriteOptions{
Token: discoverConfig.Token,
Datacenter: dc.Name,
}
}
// 获取数据中心的读取权限 - 添加token验证
func (dc *Datacenter) getQueryOption() *api.QueryOptions {
return &api.QueryOptions{
Token: discoverConfig.Token,
Datacenter: dc.Name,
}
}
func GetDatacenters() ([]string, error) {
return DefaultDatacenter.GetDatacenters()
}
// https://www.consul.io/api-docs/catalog#list-datacenters
// 获取多数据中心
func (dc *Datacenter) GetDatacenters() ([]string, error) {
start := time.Now()
retry:
ret, err := dc.getConsul().Catalog().Datacenters()
if ok := dc.handleError(start, err); ok {
goto retry
}
if len(ret) > 1 {
// 可以对多个数据中心进行排序,这将影响数据中心的顺序
}
return ret, err
}
func (dc *Datacenter) GetNode() (*api.CatalogNode, error) {
client := dc.getConsul()
name, err := client.Agent().NodeName()
if err != nil {
return nil, err
}
node, _, err := client.Catalog().Node(name, dc.getQueryOption())
if err != nil {
return nil, err
}
if node == nil {
return nil, errors.New("unable to fetch node: " + name)
}
return node, nil
}
func (dc *Datacenter) handleError(start time.Time, err error) bool {
if err == nil {
return false
}
if time.Since(start) > 10*time.Second {
return false
}
if strings.Contains(err.Error(), "connection refused") {
context.LogInfof("connect consul, retry, %v", err)
time.Sleep(time.Second)
return true
}
if strings.Contains(err.Error(), "No cluster leader") {
context.LogInfof("no cluster leader, retry, %v", err)
time.Sleep(time.Second)
return true
}
if strings.Contains(err.Error(), "unable to fetch node") {
context.LogInfof("no cluster leader, %v", err)
time.Sleep(time.Second)
return true
}
return false
}
func GetNodeFromDC(dc *Datacenter) (*api.CatalogNode, error) {
start := time.Now()
retry:
node, err := dc.GetNode()
if ok := dc.handleError(start, err); ok {
goto retry
}
return node, nil
}