forked from go-kratos/kratos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
opt.go
178 lines (165 loc) · 4.32 KB
/
opt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package naming
import (
"encoding/json"
"fmt"
"math/rand"
"net/url"
"os"
"sort"
"github.com/itering/kratos/pkg/conf/env"
"github.com/itering/kratos/pkg/log"
"github.com/dgryski/go-farm"
)
// BuildOptions build options.
type BuildOptions struct {
Filter func(map[string][]*Instance) map[string][]*Instance
Subset func([]*Instance, int) []*Instance
SubsetSize int
ClientZone string
Scheduler func(*InstancesInfo) []*Instance
}
// BuildOpt build option interface.
type BuildOpt interface {
Apply(*BuildOptions)
}
type funcOpt struct {
f func(*BuildOptions)
}
func (f *funcOpt) Apply(opt *BuildOptions) {
f.f(opt)
}
// Filter filter option.
func Filter(schema string, clusters map[string]struct{}) BuildOpt {
return &funcOpt{f: func(opt *BuildOptions) {
opt.Filter = func(inss map[string][]*Instance) map[string][]*Instance {
newInss := make(map[string][]*Instance)
for zone := range inss {
var instances []*Instance
for _, ins := range inss[zone] {
//如果r.clusters的长度大于0说明需要进行集群选择
if len(clusters) > 0 {
if _, ok := clusters[ins.Metadata[MetaCluster]]; !ok {
continue
}
}
var addr string
for _, a := range ins.Addrs {
u, err := url.Parse(a)
if err == nil && u.Scheme == schema {
addr = u.Host
}
}
if addr == "" {
fmt.Fprintf(os.Stderr, "resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
log.Warn("resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
continue
}
instances = append(instances, ins)
}
newInss[zone] = instances
}
return newInss
}
}}
}
func defulatSubset(inss []*Instance, size int) []*Instance {
backends := inss
if len(backends) <= int(size) {
return backends
}
clientID := env.Hostname
sort.Slice(backends, func(i, j int) bool {
return backends[i].Hostname < backends[j].Hostname
})
count := len(backends) / size
// hash得到ID
id := farm.Fingerprint64([]byte(clientID))
// 获得rand轮数
round := int64(id / uint64(count))
s := rand.NewSource(round)
ra := rand.New(s)
// 根据source洗牌
ra.Shuffle(len(backends), func(i, j int) {
backends[i], backends[j] = backends[j], backends[i]
})
start := (id % uint64(count)) * uint64(size)
return backends[int(start) : int(start)+int(size)]
}
// Subset Subset option.
func Subset(defaultSize int) BuildOpt {
return &funcOpt{f: func(opt *BuildOptions) {
opt.SubsetSize = defaultSize
opt.Subset = defulatSubset
}}
}
// ScheduleNode ScheduleNode option.
func ScheduleNode(clientZone string) BuildOpt {
return &funcOpt{f: func(opt *BuildOptions) {
opt.ClientZone = clientZone
opt.Scheduler = func(app *InstancesInfo) (instances []*Instance) {
type Zone struct {
inss []*Instance
weight int64
name string
score float64
}
var zones []*Zone
if app.Scheduler != nil {
si, err := json.Marshal(app.Scheduler)
if err == nil {
log.Info("schedule info: %s", string(si))
}
if strategy, ok := app.Scheduler.Clients[clientZone]; ok {
var min *Zone
for name, zone := range strategy.Zones {
inss := app.Instances[name]
if len(inss) == 0 {
continue
}
z := &Zone{
inss: inss,
weight: zone.Weight,
name: name,
score: float64(len(inss)) / float64(zone.Weight),
}
if min == nil || z.score < min.score {
min = z
}
zones = append(zones, z)
}
if opt.SubsetSize != 0 && len(min.inss) > opt.SubsetSize {
min.score = float64(opt.SubsetSize) / float64(min.weight)
}
for _, z := range zones {
nums := int(min.score * float64(z.weight))
if nums == 0 {
nums = 1
}
if nums < len(z.inss) {
if opt.Subset != nil {
z.inss = opt.Subset(z.inss, nums)
} else {
z.inss = defulatSubset(z.inss, nums)
}
}
}
}
}
for _, zone := range zones {
for _, ins := range zone.inss {
instances = append(instances, ins)
}
}
//如果没有拿到节点,则选择直接获取
if len(instances) == 0 {
instances = app.Instances[clientZone]
if len(instances) == 0 {
for _, value := range app.Instances {
instances = append(instances, value...)
}
}
}
return
}
}}
}