-
Notifications
You must be signed in to change notification settings - Fork 2
/
outbound_dispatcher.go
162 lines (137 loc) · 4.46 KB
/
outbound_dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package processor
import (
"context"
"git.golaxy.org/core/ec"
"git.golaxy.org/core/runtime"
"git.golaxy.org/core/service"
"git.golaxy.org/core/util/generic"
"git.golaxy.org/core/util/types"
"git.golaxy.org/core/util/uid"
"git.golaxy.org/framework/net/gap"
"git.golaxy.org/framework/net/gap/codec"
"git.golaxy.org/framework/net/gap/variant"
"git.golaxy.org/framework/net/netpath"
"git.golaxy.org/framework/plugins/dserv"
"git.golaxy.org/framework/plugins/gate"
"git.golaxy.org/framework/plugins/log"
"git.golaxy.org/framework/plugins/router"
)
// NewOutboundDispatcher 创建出站方向RPC分发器,用于S->C的通信
func NewOutboundDispatcher() IDispatcher {
return &_OutboundDispatcher{
encoder: codec.MakeEncoder(),
}
}
// _OutboundDispatcher 出站方向RPC分发器,用于S->C的通信
type _OutboundDispatcher struct {
servCtx service.Context
dist dserv.IDistService
router router.IRouter
encoder codec.Encoder
watcher dserv.IWatcher
}
// Init 初始化
func (d *_OutboundDispatcher) Init(ctx service.Context) {
d.servCtx = ctx
d.dist = dserv.Using(ctx)
d.router = router.Using(ctx)
d.watcher = d.dist.WatchMsg(context.Background(), generic.CastDelegateFunc2(d.handleMsg))
log.Debugf(d.servCtx, "rpc dispatcher %q started", types.AnyFullName(*d))
}
// Shut 结束
func (d *_OutboundDispatcher) Shut(ctx service.Context) {
<-d.watcher.Terminate()
log.Debugf(d.servCtx, "rpc dispatcher %q stopped", types.AnyFullName(*d))
}
func (d *_OutboundDispatcher) handleMsg(topic string, mp gap.MsgPacket) error {
// 只支持服务域通信
if !d.dist.GetNodeDetails().InDomain(mp.Head.Src) {
return nil
}
switch mp.Head.MsgId {
case gap.MsgId_Forward:
d.acceptForward(mp.Head.Src, mp.Msg.(*gap.MsgForward))
}
return nil
}
func (d *_OutboundDispatcher) acceptForward(src string, req *gap.MsgForward) {
if gate.CliDetails.InNodeSubdomain(req.Dst) {
// 目标为单播地址,解析实体Id
entId := uid.From(netpath.Base(gate.CliDetails.PathSeparator, req.Dst))
// 为了保持消息时序,在实体线程中,向对端发送消息
asyncRet := d.servCtx.Call(entId, func(entity ec.Entity, _ ...any) runtime.Ret {
session, ok := d.router.LookupSession(entity.GetId())
if !ok {
return runtime.MakeRet(nil, ErrSessionNotFound)
}
bs, err := d.encoder.EncodeBytes(src, 0, &gap.MsgRaw{Id: req.TransId, Data: req.TransData})
if err != nil {
return runtime.MakeRet(nil, err)
}
defer bs.Release()
err = session.SendData(bs.Data())
if err != nil {
return runtime.MakeRet(nil, err)
}
return runtime.MakeRet(nil, nil)
})
go d.forwardingFinish(src, req, (<-asyncRet).Error)
return
} else if gate.CliDetails.InMulticastSubdomain(req.Dst) {
// 目标为组播地址,解析分组Id
groupId := uid.From(netpath.Base(gate.CliDetails.PathSeparator, req.Dst))
group, ok := d.router.GetGroup(groupId)
if !ok {
go d.forwardingFinish(src, req, ErrGroupNotFound)
return
}
bs, err := d.encoder.EncodeBytes(src, 0, &gap.MsgRaw{Id: req.TransId, Data: req.TransData})
if err != nil {
go d.forwardingFinish(src, req, err)
return
}
// 为了保持消息时序,使用分组发送数据的channel
select {
case group.SendDataChan() <- bs:
go d.forwardingFinish(src, req, nil)
default:
bs.Release()
go d.forwardingFinish(src, req, ErrGroupChanIsFull)
}
return
} else {
go d.forwardingFinish(src, req, ErrIncorrectDestAddress)
return
}
}
func (d *_OutboundDispatcher) forwardingFinish(src string, req *gap.MsgForward, err error) {
if err == nil {
if req.CorrId != 0 {
log.Debugf(d.servCtx, "forwarding src:%q rpc request(%d) to remote:%q finish", src, req.CorrId, req.Dst)
} else {
log.Debugf(d.servCtx, "forwarding src:%q rpc notify to remote:%q finish", src, req.Dst)
}
} else {
if req.CorrId != 0 {
log.Errorf(d.servCtx, "forwarding src:%q rpc request(%d) to remote:%q failed, %s", src, req.CorrId, req.Dst, err)
d.reply(src, req.CorrId, err)
} else {
log.Errorf(d.servCtx, "forwarding src:%q rpc notify to remote:%q failed, %s", src, req.Dst, err)
}
}
}
func (d *_OutboundDispatcher) reply(src string, corrId int64, retErr error) {
if corrId == 0 || retErr == nil {
return
}
msg := &gap.MsgRPCReply{
CorrId: corrId,
Error: *variant.MakeError(retErr),
}
err := d.dist.SendMsg(src, msg)
if err != nil {
log.Errorf(d.servCtx, "rpc reply(%d) to src:%q failed, %s", corrId, src, err)
return
}
log.Debugf(d.servCtx, "rpc reply(%d) to src:%q ok", corrId, src)
}