-
Notifications
You must be signed in to change notification settings - Fork 1
/
forwardout_deliverer.go
177 lines (148 loc) · 4.41 KB
/
forwardout_deliverer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package processor
import (
"git.golaxy.org/core/runtime"
"git.golaxy.org/core/service"
"git.golaxy.org/core/util/types"
"git.golaxy.org/core/util/uid"
"git.golaxy.org/framework/net/gap"
"git.golaxy.org/framework/net/gap/variant"
"git.golaxy.org/framework/net/netpath"
"git.golaxy.org/framework/plugins/dentq"
"git.golaxy.org/framework/plugins/dserv"
"git.golaxy.org/framework/plugins/gate"
"git.golaxy.org/framework/plugins/log"
"git.golaxy.org/framework/util/concurrent"
"github.com/elliotchance/pie/v2"
)
// NewForwardOutDeliverer RPC外转投递器,用于S->C的通信
func NewForwardOutDeliverer(gate string) IDeliverer {
return &_ForwardOutDeliverer{
gate: gate,
}
}
// _ForwardOutDeliverer RPC外转投递器,用于S->C的通信
type _ForwardOutDeliverer struct {
servCtx service.Context
dist dserv.IDistService
dentq dentq.IDistEntityQuerier
gate string
multicastBCAddr string
}
// Init 初始化
func (d *_ForwardOutDeliverer) Init(ctx service.Context) {
d.servCtx = ctx
d.dist = dserv.Using(ctx)
d.dentq = dentq.Using(ctx)
d.multicastBCAddr = d.dist.MakeBroadcastAddr(d.gate)
log.Debugf(d.servCtx, "rpc deliverer %q started", types.AnyFullName(*d))
}
// Shut 结束
func (d *_ForwardOutDeliverer) Shut(ctx service.Context) {
log.Debugf(d.servCtx, "rpc deliverer %q stopped", types.AnyFullName(*d))
}
// Match 是否匹配
func (d *_ForwardOutDeliverer) Match(ctx service.Context, dst, path string, oneWay bool) bool {
// 只支持客户端域通信
if !gate.CliDetails.InDomain(dst) {
return false
}
if oneWay {
// 单向请求,支持组播、单播地址
return gate.CliDetails.InNodeSubdomain(dst) || gate.CliDetails.InMulticastSubdomain(dst)
} else {
// 普通请求,支持单播地址
return gate.CliDetails.InNodeSubdomain(dst)
}
}
// Request 请求
func (d *_ForwardOutDeliverer) Request(ctx service.Context, dst, path string, args []any) runtime.AsyncRet {
ret := concurrent.MakeRespAsyncRet()
future := concurrent.MakeFuture(d.dist.GetFutures(), nil, ret)
forwardAddr, err := d.getDistEntityForwardAddr(uid.From(netpath.Base(gate.CliDetails.PathSeparator, dst)))
if err != nil {
future.Cancel(err)
return ret.CastAsyncRet()
}
vargs, err := variant.MakeArray(args)
if err != nil {
future.Cancel(err)
return ret.CastAsyncRet()
}
msg := &gap.MsgRPCRequest{
CorrId: future.Id,
Path: path,
Args: vargs,
}
bs, err := gap.Marshal(msg)
if err != nil {
future.Cancel(err)
return ret.CastAsyncRet()
}
defer bs.Release()
forwardMsg := &gap.MsgForward{
Dst: dst,
CorrId: msg.CorrId,
TransId: msg.MsgId(),
TransData: bs.Data(),
}
if err = d.dist.SendMsg(forwardAddr, forwardMsg); err != nil {
future.Cancel(err)
return ret.CastAsyncRet()
}
log.Debugf(d.servCtx, "rpc request(%d) forwarding to dst:%q, path:%q ok", future.Id, forwardAddr, path)
return ret.CastAsyncRet()
}
// Notify 通知
func (d *_ForwardOutDeliverer) Notify(ctx service.Context, dst, path string, args []any) error {
forwardAddr, err := d.getForwardAddr(dst)
if err != nil {
return err
}
vargs, err := variant.MakeArray(args)
if err != nil {
return err
}
msg := &gap.MsgOneWayRPC{
Path: path,
Args: vargs,
}
bs, err := gap.Marshal(msg)
if err != nil {
return err
}
defer bs.Release()
forwardMsg := &gap.MsgForward{
Dst: dst,
TransId: msg.MsgId(),
TransData: bs.Data(),
}
if err = d.dist.SendMsg(forwardAddr, forwardMsg); err != nil {
return err
}
log.Debugf(d.servCtx, "rpc notify forwarding to dst:%q, path:%q ok", forwardAddr, path)
return nil
}
func (d *_ForwardOutDeliverer) getForwardAddr(dst string) (string, error) {
if gate.CliDetails.InNodeSubdomain(dst) {
// 目标为单播地址,查询实体的通信中转服务地址
return d.getDistEntityForwardAddr(uid.From(netpath.Base(gate.CliDetails.PathSeparator, dst)))
} else if gate.CliDetails.InMulticastSubdomain(dst) {
// 目标为组播地址,广播所有的通信中转服务
return d.multicastBCAddr, nil
} else {
return "", ErrIncorrectDestAddress
}
}
func (d *_ForwardOutDeliverer) getDistEntityForwardAddr(entId uid.Id) (string, error) {
dent, ok := d.dentq.GetDistEntity(entId)
if !ok {
return "", ErrDistEntityNotFound
}
idx := pie.FindFirstUsing(dent.Nodes, func(node dentq.Node) bool {
return node.Service == d.gate
})
if idx < 0 {
return "", ErrDistEntityNodeNotFound
}
return dent.Nodes[idx].RemoteAddr, nil
}