/
client.go
179 lines (159 loc) · 4.65 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package client
import (
"context"
"fmt"
"github.com/lsm1998/tinygo"
mRpc "github.com/lsm1998/tinygo/pkg/server/middleware/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"log"
"time"
)
const DailTimeout = 3 * time.Second
type connectMethod int
const (
cmRegister connectMethod = 1
cmDefault connectMethod = 2
cmProxy connectMethod = 3
)
type (
Client struct {
conn *grpc.ClientConn
Cli interface{}
opt Options
conf RpcClientConf
}
Option func(options *Options)
Options struct {
Timeout time.Duration
DialOptions []grpc.DialOption
Builder resolver.Builder
Debug bool
ProxyAddr string
connectMethod connectMethod
}
)
func WithDebug() Option {
return func(options *Options) {
options.Debug = true
}
}
func NewClient(c RpcClientConf, f func(*grpc.ClientConn) interface{}, opts ...Option) *Client {
client := newClient(opts...)
client.conf = c
client.updateConnectMethod()
client.updateDialOptions()
client.createConn()
client.Cli = f(client.conn)
return client
}
// TODO 提供一个接口实现 dial
func (c *Client) dialByDns(addr string, serviceName string) (*grpc.ClientConn, error) {
conn, err := c.dial(addr)
if err != nil {
return nil, fmt.Errorf("rpc dialByDns: %s, error: %s, make sure rpc service %s is already started",
addr, err.Error(), serviceName)
}
return conn, nil
}
func (c *Client) dialByProxy() (*grpc.ClientConn, error) {
return c.dial(c.opt.ProxyAddr)
}
func (c *Client) dialByBuilder(builder resolver.Builder, serviceName string) (*grpc.ClientConn, error) {
target := fmt.Sprintf("%s://%s/%s/%s", builder.Scheme(), "docer_discov", tinygo.AppMod, serviceName)
if c.opt.Debug {
fmt.Println("target", target)
}
conn, err := c.dial(target)
if err != nil {
return nil, fmt.Errorf("rpc dialByBuilder: %s, error: %s, make sure rpc service %s is already started",
target, err.Error(), serviceName)
}
return conn, nil
}
func (c *Client) dial(target string) (*grpc.ClientConn, error) {
timeCtx, cancel := context.WithTimeout(context.Background(), DailTimeout)
defer cancel()
conn, err := grpc.DialContext(timeCtx, target, c.opt.DialOptions...)
return conn, err
}
func WithBuilder(builder resolver.Builder) Option {
return func(options *Options) {
options.Builder = builder
}
}
func WithTimeout(timeout time.Duration) Option {
return func(options *Options) {
options.Timeout = timeout
}
}
// WithProxy 创建客户端的时候在metadata指定需要代理的grpc服务的应用域名和监听端口,即可在本地连接到kae上的grpc服务
func WithProxy(addrs ...string) Option {
return func(options *Options) {
if tinygo.AppMod == "prod" {
return
}
if len(addrs) > 0 {
options.ProxyAddr = addrs[len(addrs)-1]
} else {
panic("WithProxy empty")
}
}
}
func WithMiddleware(interceptor grpc.UnaryClientInterceptor) Option {
return func(options *Options) {
options.DialOptions = append(options.DialOptions, grpc.WithChainUnaryInterceptor(interceptor))
}
}
func (c *Client) updateConnectMethod() {
if c.opt.ProxyAddr != "" {
c.opt.connectMethod = cmProxy
} else if c.opt.Builder != nil {
c.opt.connectMethod = cmRegister
} else {
c.opt.connectMethod = cmDefault
}
}
func (c *Client) createConn() {
var err error
switch c.opt.connectMethod {
case cmRegister:
c.conn, err = c.dialByBuilder(c.opt.Builder, c.conf.ServiceName)
case cmDefault:
c.conn, err = c.dialByDns(c.conf.Addr, c.conf.ServiceName)
case cmProxy:
c.conn, err = c.dialByProxy()
}
if err != nil {
log.Fatal(err)
}
}
func (c *Client) updateDialOptions() {
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
// grpc.WithInsecure(),
grpc.WithChainUnaryInterceptor(
mRpc.TimeoutMiddleware(c.opt.Timeout),
),
}
c.opt.DialOptions = append(options, c.opt.DialOptions...)
switch c.opt.connectMethod {
case cmRegister:
s := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name)
c.opt.DialOptions = append([]grpc.DialOption{grpc.WithDefaultServiceConfig(s)}, c.opt.DialOptions...)
case cmProxy:
c.opt.DialOptions = append([]grpc.DialOption{grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(metadata.NewOutgoingContext(ctx, metadata.Pairs("target-endpoint", c.conf.Addr)), method, req, reply, cc, opts...)
})}, c.opt.DialOptions...)
}
}
func newClient(opts ...Option) *Client {
var c = &Client{}
for _, opt := range opts {
opt(&c.opt)
}
return c
}