-
Notifications
You must be signed in to change notification settings - Fork 280
/
grpc_xds.go
136 lines (121 loc) · 3.92 KB
/
grpc_xds.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package controlplane
import (
"context"
"encoding/json"
"errors"
"fmt"
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/internal/log"
)
func (srv *Server) registerXDSHandlers() {
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv.GRPCServer, srv)
}
// StreamAggregatedResources streams xDS resources based on incoming discovery requests.
//
// This is setup as 3 concurrent goroutines:
// - The first retrieves the requests from the client.
// - The third sends responses back to the client.
// - The second waits for either the client to request a new resource type
// or for the config to have been updated
// - in either case, we loop over all of the current client versions
// and if any of them are different from the current version, we send
// the updated resource
func (srv *Server) StreamAggregatedResources(stream envoy_service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
incoming := make(chan *envoy_service_discovery_v3.DiscoveryRequest)
outgoing := make(chan *envoy_service_discovery_v3.DiscoveryResponse)
eg, ctx := errgroup.WithContext(stream.Context())
// receive requests
eg.Go(func() error {
return srv.streamAggregatedResourcesIncomingStep(ctx, stream, incoming)
})
eg.Go(func() error {
return srv.streamAggregatedResourcesProcessStep(ctx, incoming, outgoing)
})
// send responses
eg.Go(func() error {
return srv.streamAggregatedResourcesOutgoingStep(ctx, stream, outgoing)
})
return eg.Wait()
}
func (srv *Server) streamAggregatedResourcesIncomingStep(
ctx context.Context,
stream envoy_service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer,
incoming chan<- *envoy_service_discovery_v3.DiscoveryRequest,
) error {
for {
req, err := stream.Recv()
if err != nil {
return err
}
select {
case incoming <- req:
case <-ctx.Done():
return ctx.Err()
}
}
}
func (srv *Server) streamAggregatedResourcesProcessStep(
ctx context.Context,
incoming <-chan *envoy_service_discovery_v3.DiscoveryRequest,
outgoing chan<- *envoy_service_discovery_v3.DiscoveryResponse,
) error {
versions := map[string]string{}
for {
select {
case req := <-incoming:
if req.ErrorDetail != nil {
bs, _ := json.Marshal(req.ErrorDetail.Details)
log.Error().
Err(errors.New(req.ErrorDetail.Message)).
Int32("code", req.ErrorDetail.Code).
RawJSON("details", bs).Msg("error applying configuration")
continue
}
// update the currently stored version
// if this version is different from the current version
// we will send the response below
versions[req.TypeUrl] = req.VersionInfo
case <-srv.configUpdated:
case <-ctx.Done():
return ctx.Err()
}
current := srv.currentConfig.Load()
for typeURL, version := range versions {
// the versions are different, so the envoy config needs to be updated
if version != fmt.Sprint(current.version) {
res, err := srv.buildDiscoveryResponse(fmt.Sprint(current.version), typeURL, ¤t.Options)
if err != nil {
return err
}
select {
case outgoing <- res:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
}
func (srv *Server) streamAggregatedResourcesOutgoingStep(
ctx context.Context,
stream envoy_service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer,
outgoing <-chan *envoy_service_discovery_v3.DiscoveryResponse,
) error {
for {
var res *envoy_service_discovery_v3.DiscoveryResponse
select {
case res = <-outgoing:
case <-ctx.Done():
return ctx.Err()
}
err := stream.Send(res)
if err != nil {
return err
}
}
}
// DeltaAggregatedResources is not implemented.
func (srv *Server) DeltaAggregatedResources(in envoy_service_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return fmt.Errorf("DeltaAggregatedResources not implemented")
}