-
Notifications
You must be signed in to change notification settings - Fork 97
/
rpc.client.go
228 lines (206 loc) · 5.56 KB
/
rpc.client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package rpc
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/micro-plat/hydra/rpc/balancer"
"github.com/micro-plat/hydra/servers/rpc/pb"
"github.com/micro-plat/lib4go/jsons"
"github.com/micro-plat/lib4go/logger"
"github.com/micro-plat/lib4go/types"
"errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
//Client rpc client, 用于构建基础的RPC调用,并提供基于服务器的限流工具,轮询、本地优先等多种负载算法
type Client struct {
address string //RPC Server Address 或 registry target
conn *grpc.ClientConn
*clientOption
client pb.RPCClient
hasRunChecker bool
IsConnect bool
isClose bool
}
type clientOption struct {
connectionTimeout time.Duration
log *logger.Logger
balancer balancer.CustomerBalancer
resolver balancer.ServiceResolver
service string
tls []string
}
//ClientOption 客户端配置选项
type ClientOption func(*clientOption)
//WithConnectionTimeout 配置网络连接超时时长
func WithConnectionTimeout(t time.Duration) ClientOption {
return func(o *clientOption) {
o.connectionTimeout = t
}
}
//WithLogger 配置日志记录器
func WithLogger(log *logger.Logger) ClientOption {
return func(o *clientOption) {
o.log = log
}
}
//WithTLS 设置TLS证书(pem,key)
func WithTLS(tls []string) ClientOption {
return func(o *clientOption) {
if len(tls) == 2 {
o.tls = tls
}
}
}
//WithRoundRobinBalancer 配置为轮询负载均衡器
func WithRoundRobinBalancer(r balancer.ServiceResolver, service string, timeout time.Duration, limit map[string]int) ClientOption {
return func(o *clientOption) {
o.service = service
o.resolver = r
o.balancer = balancer.RoundRobin(service, r, limit, o.log)
}
}
//WithLocalFirstBalancer 配置为本地优先负载均衡器
func WithLocalFirstBalancer(r balancer.ServiceResolver, service string, local string, limit map[string]int) ClientOption {
return func(o *clientOption) {
o.service = service
o.resolver = r
o.balancer = balancer.LocalFirst(service, local, r, limit)
}
}
//WithBalancer 设置负载均衡器
func WithBalancer(service string, lb balancer.CustomerBalancer) ClientOption {
return func(o *clientOption) {
o.service = service
o.balancer = lb
}
}
//NewClient 创建RPC客户端,地址是远程RPC服务器地址或注册中心地址
func NewClient(address string, opts ...ClientOption) (*Client, error) {
client := &Client{address: address, clientOption: &clientOption{connectionTimeout: time.Second * 3}}
for _, opt := range opts {
opt(client.clientOption)
}
if client.log == nil {
client.log = logger.GetSession("rpc.client", logger.CreateSession())
}
//grpclog.SetLogger(client.log)
err := client.connect()
if err != nil {
err = fmt.Errorf("rpc.client连接到服务器失败:%s(%v)(err:%v)", address, client.connectionTimeout, err)
return nil, err
}
time.Sleep(time.Second)
return client, err
}
//Connect 连接到RPC服务器,如果当前无法连接系统会定时自动重连
func (c *Client) connect() (err error) {
if c.IsConnect {
return nil
}
if c.balancer == nil {
switch len(c.tls) {
case 2: //使用安全证书
creds, err := credentials.NewClientTLSFromFile(c.tls[0], c.tls[1])
if err != nil {
return err
}
c.conn, err = grpc.Dial(c.address,
grpc.WithInsecure(),
grpc.WithTimeout(c.connectionTimeout),
grpc.WithTransportCredentials(creds))
default:
c.conn, err = grpc.Dial(c.address,
grpc.WithInsecure(),
grpc.WithTimeout(c.connectionTimeout))
}
} else {
ctx, _ := context.WithTimeout(context.Background(), c.connectionTimeout)
switch len(c.tls) {
case 2: //使用安全证书
creds, err := credentials.NewClientTLSFromFile(c.tls[0], c.tls[1])
if err != nil {
return err
}
c.conn, err = grpc.DialContext(ctx,
c.address,
grpc.WithInsecure(),
grpc.WithBalancer(c.balancer),
grpc.WithTransportCredentials(creds))
default:
c.conn, err = grpc.DialContext(ctx,
c.address,
grpc.WithInsecure(),
grpc.WithBalancer(c.balancer))
}
}
if err != nil {
return
}
c.client = pb.NewRPCClient(c.conn)
return nil
}
//Request 发送Request请求
func (c *Client) Request(service string, method string, header map[string]string, form map[string]interface{}, failFast bool) (status int, result string, param map[string]string, err error) {
h, err := jsons.Marshal(header)
if err != nil {
return
}
if len(h) == 0 {
h = []byte("{}")
}
f, err := jsons.Marshal(form)
if err != nil {
return
}
if len(f) == 0 {
h = []byte("{}")
}
response, err := c.client.Request(context.Background(),
&pb.RequestContext{
Method: method,
Service: service,
Header: string(h),
Input: string(f),
},
grpc.FailFast(failFast))
if err != nil {
status = 500
return
}
if response.Header != "" {
mh, err := jsons.Unmarshal([]byte(response.Header))
if err != nil {
return 400, "", nil, err
}
param, _ = types.ToStringMap(mh)
}
status = int(response.Status)
result = response.GetResult()
var r string
dec := json.NewDecoder(strings.NewReader(result))
if err := dec.Decode(&r); err == nil {
result = r
}
return
}
//UpdateLimiter 修改服务器限流规则
func (c *Client) UpdateLimiter(limit map[string]int) error {
if c.balancer != nil {
c.balancer.UpdateLimiter(limit)
return nil
}
return errors.New("rpc.client.未指定balancer")
}
//Close 关闭RPC客户端连接
func (c *Client) Close() {
c.isClose = true
if c.resolver != nil {
c.resolver.Close()
}
if c.conn != nil {
c.conn.Close()
}
}