-
Notifications
You must be signed in to change notification settings - Fork 50
/
rpc.go
131 lines (109 loc) · 2.99 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package event_plugin
import (
"encoding/gob"
"log"
"net/rpc"
"os"
"github.com/openconfig/gnmic/pkg/api/types"
"github.com/openconfig/gnmic/pkg/api/utils"
"github.com/openconfig/gnmic/pkg/formatters"
)
const (
processorType = "event-plugin"
loggingPrefix = "[" + processorType + "] "
)
type InitArgs struct {
Cfg interface{}
}
type ApplyArgs struct {
Events []*formatters.EventMsg
}
type ApplyResponse struct {
Events []*formatters.EventMsg
}
type (
Actionresponse struct{}
InitResponse struct{}
Targetresponse struct{}
Proccessorresponse struct{}
)
type eventProcessorRPCServer struct {
Impl formatters.EventProcessor
}
func init() {
gob.Register(map[string]interface{}{})
gob.Register([]interface{}{})
}
func (s *eventProcessorRPCServer) Init(args *InitArgs, resp *InitResponse) error {
return s.Impl.Init(args.Cfg)
}
func (s *eventProcessorRPCServer) Apply(args *ApplyArgs, resp *ApplyResponse) error {
resp.Events = s.Impl.Apply(args.Events...)
return nil
}
func (s *eventProcessorRPCServer) WithActions(args map[string]map[string]interface{}, resp *Actionresponse) error {
s.Impl.WithActions(args)
return nil
}
func (s *eventProcessorRPCServer) WithTargets(args map[string]*types.TargetConfig, resp *Targetresponse) error {
s.Impl.WithTargets(args)
return nil
}
func (s *eventProcessorRPCServer) WithProcessors(
args map[string]map[string]interface{},
resp *Proccessorresponse,
) error {
s.Impl.WithProcessors(args)
return nil
}
func (s *eventProcessorRPCServer) WithLogger() error {
return nil
}
type EventProcessorRPC struct {
client *rpc.Client
logger *log.Logger
}
func (g *EventProcessorRPC) Init(cfg interface{}, opts ...formatters.Option) error {
for _, opt := range opts {
opt(g)
}
err := g.client.Call("Plugin.Init", &InitArgs{Cfg: cfg}, &InitResponse{})
if err != nil {
return err
}
return nil
}
func (g *EventProcessorRPC) Apply(event ...*formatters.EventMsg) []*formatters.EventMsg {
var resp ApplyResponse
err := g.client.Call("Plugin.Apply", &ApplyArgs{Events: event}, &resp)
if err != nil {
g.logger.Print("RPC client call error: ", err)
return nil
}
return resp.Events
}
func (g *EventProcessorRPC) WithActions(act map[string]map[string]interface{}) {
err := g.client.Call("Plugin.WithActions", act, &Actionresponse{})
if err != nil {
g.logger.Print("RPC client call error: ", err)
}
}
func (g *EventProcessorRPC) WithTargets(tcs map[string]*types.TargetConfig) {
err := g.client.Call("Plugin.WithTargets", tcs, &Targetresponse{})
if err != nil {
g.logger.Print("RPC client call error: ", err)
}
}
func (g *EventProcessorRPC) WithProcessors(procs map[string]map[string]any) {
err := g.client.Call("Plugin.WithProcessors", procs, &Proccessorresponse{})
if err != nil {
g.logger.Print("RPC client call error: ", err)
}
}
func (g *EventProcessorRPC) WithLogger(l *log.Logger) {
if l == nil {
g.logger = log.New(l.Writer(), loggingPrefix, l.Flags())
return
}
g.logger = log.New(os.Stderr, loggingPrefix, utils.DefaultLoggingFlags)
}