-
Notifications
You must be signed in to change notification settings - Fork 1
/
rpc.go
106 lines (85 loc) · 2.33 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package rpc
import (
"git.golaxy.org/core/runtime"
"git.golaxy.org/core/service"
"git.golaxy.org/core/util/option"
"git.golaxy.org/framework/plugins/log"
"git.golaxy.org/framework/plugins/rpc/processor"
"git.golaxy.org/framework/util/concurrent"
"sync/atomic"
)
// IRPC RPC支持
type IRPC interface {
// RPC RPC调用
RPC(dst, path string, args ...any) runtime.AsyncRet
// OneWayRPC 单向RPC调用
OneWayRPC(dst, path string, args ...any) error
}
func newRPC(settings ...option.Setting[RPCOptions]) IRPC {
return &_RPC{
options: option.Make(With.Default(), settings...),
}
}
type _RPC struct {
options RPCOptions
servCtx service.Context
terminated atomic.Bool
deliverers []processor.IDeliverer
}
// InitSP 初始化服务插件
func (r *_RPC) InitSP(ctx service.Context) {
log.Infof(ctx, "init plugin %q", self.Name)
r.servCtx = ctx
for _, p := range r.options.Processors {
if deliverer, ok := p.(processor.IDeliverer); ok {
r.deliverers = append(r.deliverers, deliverer)
}
}
for _, p := range r.options.Processors {
if init, ok := p.(processor.LifecycleInit); ok {
init.Init(r.servCtx)
}
}
}
// ShutSP 关闭服务插件
func (r *_RPC) ShutSP(ctx service.Context) {
log.Infof(ctx, "shut plugin %q", self.Name)
r.terminated.Store(true)
for _, p := range r.options.Processors {
if shut, ok := p.(processor.LifecycleShut); ok {
shut.Shut(r.servCtx)
}
}
}
// RPC RPC调用
func (r *_RPC) RPC(dst, path string, args ...any) runtime.AsyncRet {
if r.terminated.Load() {
ret := concurrent.MakeRespAsyncRet()
ret.Push(concurrent.MakeRet[any](nil, processor.ErrTerminated))
return ret.CastAsyncRet()
}
for i := range r.deliverers {
deliverer := r.deliverers[i]
if !deliverer.Match(r.servCtx, dst, path, false) {
continue
}
return deliverer.Request(r.servCtx, dst, path, args)
}
ret := concurrent.MakeRespAsyncRet()
ret.Push(concurrent.MakeRet[any](nil, processor.ErrUndeliverable))
return ret.CastAsyncRet()
}
// OneWayRPC 单向RPC调用
func (r *_RPC) OneWayRPC(dst, path string, args ...any) error {
if r.terminated.Load() {
return processor.ErrTerminated
}
for i := range r.deliverers {
deliverer := r.deliverers[i]
if !deliverer.Match(r.servCtx, dst, path, true) {
continue
}
return deliverer.Notify(r.servCtx, dst, path, args)
}
return processor.ErrUndeliverable
}