-
Notifications
You must be signed in to change notification settings - Fork 4
/
balancer.go
97 lines (83 loc) · 2.55 KB
/
balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package grpc
import (
"sync/atomic"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
netbalancer "github.com/trafficstars/registry/net/balancer"
)
// NewBalancerBuilder creates a new registry balancer builder.
func NewBalancerBuilder(name string) balancer.Builder {
return base.NewBalancerBuilder(name, ®istryPickerBuilder{}, base.Config{HealthCheck: true})
}
type registryPickerBuilder struct{}
func (*registryPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
grpclog.Infof("registryPicker: newPicker called with readySCs: %v", info.ReadySCs)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
if len(info.ReadySCs) == 1 {
for sc := range info.ReadySCs {
return &simplePicker{subConn: sc}
}
}
picker := ®istryPicker{
subConns: map[string]balancer.SubConn{},
}
for sc, scInfo := range info.ReadySCs {
switch meta := scInfo.Address.Metadata.(type) {
case nil:
case *grpcMetadata:
if meta.balancer != nil && picker.balancer == nil {
picker.balancer = meta.balancer
picker.serviceName = meta.serviceName
picker.servicePort = meta.servicePort
picker.maxRequestsByBackend = meta.maxRequestsByBackend
}
picker.subConns[scInfo.Address.Addr] = sc
}
picker.subConnList = append(picker.subConnList, sc)
}
return picker
}
type registryPicker struct {
next uint32
serviceName string
servicePort string
balancer netbalancer.Balancer
subConns map[string]balancer.SubConn
subConnList []balancer.SubConn
maxRequestsByBackend int
}
func (p *registryPicker) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
if p.balancer != nil {
if backend, err := p.balancer.Next(p.serviceName, p.maxRequestsByBackend); err == nil {
address := backend.Address()
if p.servicePort != "" {
address = backend.Hostname() + ":" + p.servicePort
}
if conn, ok := p.subConns[address]; ok {
backend.IncConcurrentRequest(1)
return balancer.PickResult{
SubConn: conn,
Done: func(balancer.DoneInfo) { backend.IncConcurrentRequest(-1) },
}, nil
}
}
}
next := atomic.AddUint32(&p.next, 1) % uint32(len(p.subConnList))
sc := p.subConnList[next]
return balancer.PickResult{
SubConn: sc,
Done: nil,
}, nil
}
type simplePicker struct {
subConn balancer.SubConn
}
func (p *simplePicker) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{
SubConn: p.subConn,
Done: nil,
}, nil
}