/
client.go
124 lines (113 loc) · 3.32 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package client
import (
"github.com/tedcy/sheep/client/balancer"
"github.com/tedcy/sheep/client/breaker_notify"
"github.com/tedcy/sheep/client/weighter_notify"
"golang.org/x/net/context"
"google.golang.org/grpc"
"time"
"strings"
)
type BalancerType int
const (
DefaultBalancer BalancerType = iota
RespTimeBalancer
)
type DialConfig struct {
EnableBreak bool
BalancerType BalancerType
Timeout time.Duration
//etcd://172.16.176.38:2379,ip:port/path
TargetPath string
}
func splitTargetPath(targetPath string) (target, path string) {
index := strings.LastIndex(targetPath, "/")
target = targetPath[:index]
path = targetPath[index:]
return
}
func DialContext(ctx context.Context, config *DialConfig, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
target, path := splitTargetPath(config.TargetPath)
c := &client{}
c.ctx = ctx
c.Balancer, err = balancer.New(ctx, path, config.Timeout)
if err != nil {
return
}
if config.EnableBreak {
c.enableBreak()
}
c.withBalanceType(config.BalancerType)
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithBalancer(c))
opts = append(opts, grpc.WithUnaryInterceptor(c.clientIntercept()))
conn, err = grpc.DialContext(ctx, target, opts...)
if err != nil {
return
}
return
}
type client struct {
*balancer.Balancer
breaker breaker_notify.BreakerNotifyI
weighter weighter_notify.WeighterNotifyI
//打开熔断器时插入aop
//使用balance返回aop不为空就插入
intercepts []grpc.UnaryClientInterceptor
ctx context.Context
}
func (this *client) enableBreak() {
this.breaker = breaker_notify.New()
this.Balancer.SetNotifyOpen(this.breaker.NotifyOpen())
this.Balancer.SetNotifyClose(this.breaker.NotifyClose())
this.Balancer.SetNotifyHalfOpen(this.breaker.NotifyHalfOpen())
this.intercepts = append(this.intercepts, this.breaker.GrpcUnaryClientInterceptor)
}
func (this *client) withBalanceType(t BalancerType) {
switch t {
case RespTimeBalancer:
this.weighter = weighter_notify.New(this.ctx, weighter_notify.RespTimeWeighter)
case DefaultBalancer:
this.weighter = weighter_notify.New(this.ctx, weighter_notify.DefaultWeighter)
}
if this.weighter != nil {
this.Balancer.SetNotifyLbPolicyChange(this.weighter.NotifyWeighterChange())
this.intercepts = append(this.intercepts, this.weighter.GrpcUnaryClientInterceptor)
}
}
func (this *client) clientIntercept() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, handler grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
var index int
index = -1
var i grpc.UnaryInvoker
i = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
index++
if index >= len(this.intercepts) {
return handler(ctx, method, req, reply, cc, opts...)
} else {
return this.intercepts[index](ctx, method, req, reply, cc, i, opts...)
}
}
return i(ctx, method, req, reply, cc, opts...)
}
}
//will call by grpc.ClientConn Close()
func (this *client) Close() (err error) {
if this.breaker != nil {
err = this.breaker.Close()
if err != nil {
println(err)
}
}
if this.weighter != nil {
err = this.weighter.Close()
if err != nil {
println(err)
}
}
err = this.Balancer.Close()
if err != nil {
println(err.Error())
}
return nil
}