-
Notifications
You must be signed in to change notification settings - Fork 1
/
deliverer_distributed.go
97 lines (77 loc) · 2.15 KB
/
deliverer_distributed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package rpc
import (
"git.golaxy.org/core/runtime"
"git.golaxy.org/core/service"
"git.golaxy.org/core/util/types"
"git.golaxy.org/plugins/dist"
"git.golaxy.org/plugins/gap"
"git.golaxy.org/plugins/gap/variant"
"git.golaxy.org/plugins/log"
"git.golaxy.org/plugins/util/concurrent"
"strings"
)
// DistributedDeliverer 分布式服务的RPC投递器
type DistributedDeliverer struct {
servCtx service.Context
dist dist.IDistService
}
// Init 初始化
func (d *DistributedDeliverer) Init(ctx service.Context) {
d.servCtx = ctx
d.dist = dist.Using(ctx)
log.Debugf(d.servCtx, "rpc deliverer %q started", types.AnyFullName(*d))
}
// Shut 结束
func (d *DistributedDeliverer) Shut(ctx service.Context) {
log.Debugf(d.servCtx, "rpc deliverer %q stopped", types.AnyFullName(*d))
}
// Match 是否匹配
func (d *DistributedDeliverer) Match(ctx service.Context, dst, path string, oneWay bool) bool {
addr := d.dist.GetAddress()
if !strings.HasPrefix(dst, addr.Domain) {
return false
}
if !oneWay {
if !strings.HasPrefix(dst, addr.BalanceSubdomain) && !strings.HasPrefix(dst, addr.NodeSubdomain) {
return false
}
}
return true
}
// Request 请求
func (d *DistributedDeliverer) Request(ctx service.Context, dst, path string, args []any) runtime.AsyncRet {
ret := concurrent.MakeRespAsyncRet()
future := concurrent.MakeFuture(d.dist.GetFutures(), nil, ret)
vargs, err := variant.MakeArray(args)
if err != nil {
future.Cancel(err)
return ret.CastAsyncRet()
}
msg := &gap.MsgRPCRequest{
CorrId: future.Id,
Path: path,
Args: vargs,
}
if err = d.dist.SendMsg(dst, msg); err != nil {
future.Cancel(err)
return ret.CastAsyncRet()
}
log.Debugf(d.servCtx, "rpc request(%d) to dst:%q, path:%q ok", future.Id, dst, path)
return ret.CastAsyncRet()
}
// Notify 通知
func (d *DistributedDeliverer) Notify(ctx service.Context, dst, path string, args []any) error {
vargs, err := variant.MakeArray(args)
if err != nil {
return err
}
msg := &gap.MsgOneWayRPC{
Path: path,
Args: vargs,
}
if err = d.dist.SendMsg(dst, msg); err != nil {
return err
}
log.Debugf(d.servCtx, "rpc notify to dst:%q, path:%q ok", dst, path)
return nil
}