-
Notifications
You must be signed in to change notification settings - Fork 5
/
client.go
329 lines (315 loc) · 10.8 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
package client
import (
"context"
"errors"
"fmt"
"github.com/nyan233/littlerpc/core/common/errorhandler"
"github.com/nyan233/littlerpc/core/common/logger"
"github.com/nyan233/littlerpc/core/common/metadata"
transport2 "github.com/nyan233/littlerpc/core/common/transport"
metaDataUtil "github.com/nyan233/littlerpc/core/common/utils/metadata"
container2 "github.com/nyan233/littlerpc/core/container"
"github.com/nyan233/littlerpc/core/middle/loadbalance"
error2 "github.com/nyan233/littlerpc/core/protocol/error"
"github.com/nyan233/littlerpc/core/protocol/message"
"github.com/nyan233/littlerpc/core/utils/random"
"github.com/nyan233/littlerpc/internal/pool"
"reflect"
"time"
)
type Complete struct {
Message *message.Message
Error error2.LErrorDesc
}
// Client 在Client中同时使用同步调用和异步调用将导致同步调用阻塞某一连接上的所有异步调用
// 请求的发送
type Client struct {
cfg *Config
// 用于连接管理
cm *connManager
// 客户端的事件驱动引擎
engine transport2.ClientBuilder
// 为每个连接分配的资源
connSourceSet *container2.RWMutexMap[transport2.ConnAdapter, *connSource]
contextM *contextManager
// context id的起始, 开始时随机分配
contextInitId uint64
// services 可以支持不同实例的调用
// 所有的操作都是线程安全的
services container2.RCUMap[string, *metadata.Process]
// 用于keepalive
logger logger.LLogger
// 用于超时管理和异步调用模拟的goroutine池
gp pool.TaskPool[string]
// 用于客户端的插件
pluginManager *pluginManager
// 错误处理接口
eHandle error2.LErrors
}
func New(opts ...Option) (*Client, error) {
config := &Config{}
WithDefault()(config)
for _, v := range opts {
v(config)
}
client := &Client{
cfg: config,
}
client.logger = config.Logger
client.eHandle = config.ErrHandler
// init engine
client.engine = transport2.Manager.GetClientEngine(config.NetWork)()
eventD := client.engine.EventDriveInter()
eventD.OnOpen(client.onOpen)
eventD.OnMessage(client.onMessage)
eventD.OnClose(client.onClose)
err := client.engine.Client().Start()
if err != nil {
return nil, err
}
// 初始化负载均衡功能
client.connSourceSet = &container2.RWMutexMap[transport2.ConnAdapter, *connSource]{}
client.cm = new(connManager)
client.cm.cfg = config
if config.BalancerFactory != nil {
client.cm.balancer = config.BalancerFactory()
}
client.cm.selector = config.SelectorFactory(
config.MuxConnection,
func(node loadbalance.RpcNode) (transport2.ConnAdapter, error) {
return client.engine.Client().NewConn(transport2.NetworkClientConfig{
ServerAddr: node.Address,
KeepAlive: config.KeepAlive,
Dialer: nil,
})
},
)
if config.ResolverFactory != nil {
client.cm.resolver, err = config.ResolverFactory(config.ResolverParseUrl, client.cm.balancer, time.Second)
if err != nil {
return nil, err
}
}
// init goroutine pool
if config.PoolSize <= 0 {
// 关闭Async模式
client.gp = nil
} else if config.ExecPoolBuilder != nil {
client.gp = config.ExecPoolBuilder.Builder(
pool.MaxTaskPoolSize/4, config.PoolSize, config.PoolSize*2, func(poolId int, err interface{}) {
client.logger.Error(fmt.Sprintf("poolId : %d -> Panic : %v", poolId, err))
})
} else {
client.gp = pool.NewTaskPool[string](
pool.MaxTaskPoolSize/4, config.PoolSize, config.PoolSize*2, func(poolId int, err interface{}) {
client.logger.Error(fmt.Sprintf("poolId : %d -> Panic : %v", poolId, err))
})
}
// plugins
client.pluginManager = newPluginManager(config.Plugins)
// init ErrHandler
client.eHandle = config.ErrHandler
// init service map
client.services = *container2.NewRCUMap[string, *metadata.Process]()
// init context manager
client.contextM = newContextManager()
client.contextInitId = uint64(random.FastRand())
return client, nil
}
func (c *Client) BindFunc(sourceName string, i interface{}) error {
if i == nil {
return errors.New("register elem is nil")
}
if sourceName == "" {
return errors.New("the typ name is not defined")
}
source := new(metadata.Source)
source.InstanceType = reflect.TypeOf(i)
value := reflect.ValueOf(i)
// init map
source.ProcessSet = make(map[string]*metadata.Process, value.NumMethod())
// NOTE: 这里的判断不能依靠map的len/cap来确定实例用于多少的绑定方法
// 因为len/cap都不能提供准确的信息,调用make()时指定的cap只是给真正创建map的函数一个提示
// 并不代表真实大小,对没有插入过数据的map调用len()永远为0
if value.NumMethod() == 0 {
return errors.New("instance no method")
}
for i := 0; i < value.NumMethod(); i++ {
method := source.InstanceType.Method(i)
if method.IsExported() {
opt := &metadata.Process{
Value: value.Method(i),
}
metaDataUtil.IFContextOrStream(opt, method.Type)
source.ProcessSet[method.Name] = opt
}
}
kvs := make([]container2.RCUMapElement[string, *metadata.Process], 0, len(source.ProcessSet))
for k, v := range source.ProcessSet {
serviceName := fmt.Sprintf("%s.%s", sourceName, k)
_, ok := c.services.LoadOk(serviceName)
if ok {
return errors.New("service name already usage")
}
kvs = append(kvs, container2.RCUMapElement[string, *metadata.Process]{
Key: serviceName,
Value: v,
})
}
c.services.StoreMulti(kvs)
return nil
}
// RawCall 该调用和Client.Call不同, 这个调用不会识别Method和对应的in/out list
// 只会对除context.Context/stream.LStream外的args/reps直接序列化
func (c *Client) RawCall(service string, args ...interface{}) ([]interface{}, error) {
return c.call(service, nil, args, nil, false)
}
// Request req/rep风格的RPC调用, 这要求rep必须是指针类型, 否则会返回ErrCallArgsType
func (c *Client) Request(service string, ctx context.Context, request interface{}, response interface{}, opts ...CallOption) error {
if response == nil {
return c.eHandle.LWarpErrorDesc(errorhandler.ErrCallArgsType, "response pointer equal nil")
}
_, err := c.call(service, opts, []interface{}{ctx, request}, []interface{}{response}, false)
return err
}
// Requests multi request and response
func (c *Client) Requests(service string, requests []interface{}, responses []interface{}, opts ...CallOption) error {
// TODO: 修改检查的逻辑
if responses == nil || len(responses) > 0 {
return c.eHandle.LWarpErrorDesc(errorhandler.ErrCallArgsType, "responses length equal zero")
}
for _, response := range responses {
if response == nil {
return c.eHandle.LWarpErrorDesc(errorhandler.ErrCallArgsType, "response pointer equal nil")
}
}
_, err := c.call(service, opts, requests, responses, false)
return err
}
// Call 返回的error可能是由Server/Client本身产生的, 也有可能是调用用户过程返回的, 这些都会被Call
// 视为错误, args为用户参数, 即context.Context & stream.LStream都会被放置在此, 如果存在的话.
// Call实现context.Context传播的语义, 即传递的Context cancel时, client会同时将server端的
// Context cancel, 但不会影响到自身的调用过程, 如果cancel之后, remote process不返回, 那么这次调用将会阻塞
// 注册了元信息的过程返回的result数量始终等于自身结果数量-1, 因为error不包括在reps中, 不管发生了什么错误, 除非
// 找不到注册的元信息
func (c *Client) Call(service string, args ...interface{}) ([]interface{}, error) {
return c.call(service, nil, args, nil, true)
}
func (c *Client) call(
service string,
opts []CallOption,
args []interface{},
reps []interface{},
check bool,
) (completeReps []interface{}, completeErr error2.LErrorDesc) {
defer func() {
if completeErr != nil && check && (completeReps == nil || len(completeReps) == 0) {
if serviceInstance, ok := c.services.LoadOk(service); ok {
completeReps = make([]interface{}, serviceInstance.Value.Type().NumOut()-1)
}
}
}()
cs, err := c.takeConnSource(service)
if err != nil && check {
return nil, c.eHandle.LWarpErrorDesc(errorhandler.ErrClient, err)
}
mp := sharedPool.TakeMessagePool()
writeMsg := mp.Get().(*message.Message)
defer mp.Put(writeMsg)
writeMsg.Reset()
pCtx := c.pluginManager.GetContext()
defer c.pluginManager.FreeContext(pCtx)
if err := c.pluginManager.Request4C(pCtx, args, writeMsg); err != nil {
return nil, err
}
method, ctx, ctxId, err := c.identArgAndEncode(service, writeMsg, args, !check)
if err != nil {
_ = c.pluginManager.Send4C(pCtx, writeMsg, err)
return nil, err
}
err = c.sendCallMsg(pCtx, ctxId, writeMsg, cs, false)
if err != nil {
switch err.Code() {
case error2.ConnectionErr:
// TODO 连接错误启动重试
return nil, err
default:
return nil, err
}
}
if reps == nil || len(reps) == 0 {
if check {
reps = make([]interface{}, method.Type().NumOut()-1)
}
}
reps, err = c.readMsgAndDecodeReply(ctx, pCtx, writeMsg.GetMsgId(), cs, method, reps)
// 插件错误中断后续的处理
if err != nil && (err.Code() == errorhandler.ErrPlugin.Code()) {
return reps, err
}
if err := c.pluginManager.AfterReceive4C(pCtx, reps, err); err != nil {
return reps, err
}
if err == nil {
return reps, nil
}
switch err.Code() {
case error2.ConnectionErr:
// TODO 连接错误启动重试
return reps, err
default:
return reps, err
}
}
// AsyncCall TODO 改进这个不合时宜的API
// AsyncCall 该函数返回时至少数据已经经过Codec的序列化,调用者有责任检查error
// 该函数可能会传递来自Codec和内部组件的错误,因为它在发送消息之前完成
func (c *Client) AsyncCall(service string, args []interface{}, callBack func(results []interface{}, err error)) error {
if callBack == nil {
return c.eHandle.LWarpErrorDesc(errorhandler.ErrCallArgsType, "callBack is empty")
}
msg := message.New()
method, ctx, ctxId, err := c.identArgAndEncode(service, msg, args, false)
if err != nil {
return err
}
return c.gp.Push(service, func() {
// 在池中获取一个底层传输的连接
conn, err := c.takeConnSource(service)
if err != nil {
callBack(nil, err)
return
}
err = c.sendCallMsg(nil, ctxId, msg, conn, false)
if err != nil {
callBack(nil, err)
return
}
reps := make([]interface{}, method.Type().NumOut()-1)
reps, err = c.readMsgAndDecodeReply(ctx, nil, msg.GetMsgId(), conn, method, reps)
callBack(reps, err)
})
}
func (c *Client) takeConnSource(service string) (*connSource, error2.LErrorDesc) {
conn, err := c.cm.TakeConn(service)
if err != nil {
return nil, c.eHandle.LWarpErrorDesc(errorhandler.ErrClient, err)
}
cs, ok := c.connSourceSet.LoadOk(conn)
if !ok {
return nil, c.eHandle.LWarpErrorDesc(errorhandler.ErrClient, "connection source not found")
}
return cs, nil
}
func (c *Client) Close() error {
if c.gp != nil {
if err := c.gp.Stop(); err != nil {
return err
}
}
err := c.engine.Client().Stop()
if err != nil {
return err
}
return c.cm.Exit()
}