forked from fluxcd/flux
/
server.go
112 lines (96 loc) · 2.51 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package rpc
import (
"io"
"net/rpc"
"net/rpc/jsonrpc"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/platform"
)
// net/rpc cannot serialise errors, so we transmit strings and
// reconstitute them on the other side.
type ApplyResult map[flux.ServiceID]string
// Likewise with SyncResult
type SyncResult map[string]string
// Server takes a platform and makes it available over RPC.
type Server struct {
server *rpc.Server
}
// NewServer instantiates a new RPC server, handling requests on the
// conn by invoking methods on the underlying (assumed local)
// platform.
func NewServer(p platform.Platform) (*Server, error) {
server := rpc.NewServer()
if err := server.Register(&RPCServer{p}); err != nil {
return nil, err
}
return &Server{server: server}, nil
}
func (c *Server) ServeConn(conn io.ReadWriteCloser) {
c.server.ServeCodec(jsonrpc.NewServerCodec(conn))
}
type RPCServer struct {
p platform.Platform
}
func (p *RPCServer) Ping(_ struct{}, _ *struct{}) error {
return p.p.Ping()
}
func (p *RPCServer) Version(_ struct{}, resp *string) error {
v, err := p.p.Version()
*resp = v
return err
}
func (p *RPCServer) AllServices(req AllServicesRequestV4, resp *[]platform.Service) error {
s, err := p.p.AllServices(req.MaybeNamespace, req.Ignored)
if s == nil {
s = []platform.Service{}
}
*resp = s
return err
}
func (p *RPCServer) SomeServices(ids []flux.ServiceID, resp *[]platform.Service) error {
s, err := p.p.SomeServices(ids)
if s == nil {
s = []platform.Service{}
}
*resp = s
return err
}
func (p *RPCServer) Export(_ struct{}, resp *[]byte) error {
v, err := p.p.Export()
*resp = v
return err
}
// Regrade is still around for backwards compatibility, though it is called "Apply" everywhere else.
func (p *RPCServer) Regrade(defs []platform.ServiceDefinition, applyResult *ApplyResult) error {
return p.Apply(defs, applyResult)
}
func (p *RPCServer) Apply(defs []platform.ServiceDefinition, applyResult *ApplyResult) error {
result := ApplyResult{}
err := p.p.Apply(defs)
if err != nil {
switch applyErr := err.(type) {
case platform.ApplyError:
for s, e := range applyErr {
result[s] = e.Error()
}
err = nil
}
}
*applyResult = result
return err
}
func (p *RPCServer) Sync(spec platform.SyncDef, syncResult *SyncResult) error {
result := SyncResult{}
err := p.p.Sync(spec)
if err != nil {
switch syncError := err.(type) {
case platform.SyncError:
for s, e := range syncError {
result[s] = e.Error()
}
err = nil
}
}
*syncResult = result
return err
}