/
rpc.go
153 lines (134 loc) · 4.28 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package connection
import (
"context"
"io"
"net"
"time"
"github.com/rs/zerolog"
"zombiezen.com/go/capnproto2/rpc"
"github.com/khulnasoft/netscale/tunnelrpc"
tunnelpogs "github.com/khulnasoft/netscale/tunnelrpc/pogs"
)
type tunnelServerClient struct {
client tunnelpogs.TunnelServer_PogsClient
transport rpc.Transport
}
// NewTunnelRPCClient creates and returns a new RPC client, which will communicate using a stream on the given muxer.
// This method is exported for supervisor to call Authenticate RPC
func NewTunnelServerClient(
ctx context.Context,
stream io.ReadWriteCloser,
log *zerolog.Logger,
) *tunnelServerClient {
transport := tunnelrpc.NewTransportLogger(log, rpc.StreamTransport(stream))
conn := rpc.NewConn(
transport,
tunnelrpc.ConnLog(log),
)
registrationClient := tunnelpogs.RegistrationServer_PogsClient{Client: conn.Bootstrap(ctx), Conn: conn}
return &tunnelServerClient{
client: tunnelpogs.TunnelServer_PogsClient{RegistrationServer_PogsClient: registrationClient, Client: conn.Bootstrap(ctx), Conn: conn},
transport: transport,
}
}
func (tsc *tunnelServerClient) Authenticate(ctx context.Context, classicTunnel *ClassicTunnelProperties, registrationOptions *tunnelpogs.RegistrationOptions) (tunnelpogs.AuthOutcome, error) {
authResp, err := tsc.client.Authenticate(ctx, classicTunnel.OriginCert, classicTunnel.Hostname, registrationOptions)
if err != nil {
return nil, err
}
return authResp.Outcome(), nil
}
func (tsc *tunnelServerClient) Close() {
// Closing the client will also close the connection
_ = tsc.client.Close()
_ = tsc.transport.Close()
}
type NamedTunnelRPCClient interface {
RegisterConnection(
c context.Context,
config *NamedTunnelProperties,
options *tunnelpogs.ConnectionOptions,
connIndex uint8,
edgeAddress net.IP,
observer *Observer,
) (*tunnelpogs.ConnectionDetails, error)
SendLocalConfiguration(
c context.Context,
config []byte,
observer *Observer,
) error
GracefulShutdown(ctx context.Context, gracePeriod time.Duration)
Close()
}
type registrationServerClient struct {
client tunnelpogs.RegistrationServer_PogsClient
transport rpc.Transport
}
func newRegistrationRPCClient(
ctx context.Context,
stream io.ReadWriteCloser,
log *zerolog.Logger,
) NamedTunnelRPCClient {
transport := tunnelrpc.NewTransportLogger(log, rpc.StreamTransport(stream))
conn := rpc.NewConn(
transport,
tunnelrpc.ConnLog(log),
)
return ®istrationServerClient{
client: tunnelpogs.RegistrationServer_PogsClient{Client: conn.Bootstrap(ctx), Conn: conn},
transport: transport,
}
}
func (rsc *registrationServerClient) RegisterConnection(
ctx context.Context,
properties *NamedTunnelProperties,
options *tunnelpogs.ConnectionOptions,
connIndex uint8,
edgeAddress net.IP,
observer *Observer,
) (*tunnelpogs.ConnectionDetails, error) {
conn, err := rsc.client.RegisterConnection(
ctx,
properties.Credentials.Auth(),
properties.Credentials.TunnelID,
connIndex,
options,
)
if err != nil {
if err.Error() == DuplicateConnectionError {
observer.metrics.regFail.WithLabelValues("dup_edge_conn", "registerConnection").Inc()
return nil, errDuplicationConnection
}
observer.metrics.regFail.WithLabelValues("server_error", "registerConnection").Inc()
return nil, serverRegistrationErrorFromRPC(err)
}
observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc()
return conn, nil
}
func (rsc *registrationServerClient) SendLocalConfiguration(ctx context.Context, config []byte, observer *Observer) (err error) {
observer.metrics.localConfigMetrics.pushes.Inc()
defer func() {
if err != nil {
observer.metrics.localConfigMetrics.pushesErrors.Inc()
}
}()
return rsc.client.SendLocalConfiguration(ctx, config)
}
func (rsc *registrationServerClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
ctx, cancel := context.WithTimeout(ctx, gracePeriod)
defer cancel()
_ = rsc.client.UnregisterConnection(ctx)
}
func (rsc *registrationServerClient) Close() {
// Closing the client will also close the connection
_ = rsc.client.Close()
// Closing the transport also closes the stream
_ = rsc.transport.Close()
}
type rpcName string
const (
register rpcName = "register"
reconnect rpcName = "reconnect"
unregister rpcName = "unregister"
authenticate rpcName = " authenticate"
)