This repository has been archived by the owner on Jul 11, 2023. It is now read-only.
/
cluster.go
321 lines (293 loc) · 11.7 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package cds
import (
"strings"
"time"
xds_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
xds_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
xds_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
)
// replacer used to configure an Envoy cluster's altStatName
var replacer = strings.NewReplacer(".", "_", ":", "_")
// getUpstreamServiceCluster returns an Envoy Cluster corresponding to the given upstream service
// Note: ServiceIdentity must be in the format "name.namespace" [https://github.com/openservicemesh/osm/issues/3188]
func getUpstreamServiceCluster(downstreamIdentity identity.ServiceIdentity, config trafficpolicy.MeshClusterConfig) *xds_cluster.Cluster {
HTTP2ProtocolOptions, err := envoy.GetHTTP2ProtocolOptions()
if err != nil {
log.Error().Err(err).Msgf("Error marshalling HTTP2ProtocolOptions for upstream cluster %s", config.Name)
return nil
}
marshalledUpstreamTLSContext, err := ptypes.MarshalAny(
envoy.GetUpstreamTLSContext(downstreamIdentity, config.Service))
if err != nil {
log.Error().Err(err).Msgf("Error marshalling UpstreamTLSContext for upstream cluster %s", config.Name)
return nil
}
remoteCluster := &xds_cluster.Cluster{
Name: config.Name,
TypedExtensionProtocolOptions: HTTP2ProtocolOptions,
TransportSocket: &xds_core.TransportSocket{
Name: wellknown.TransportSocketTls,
ConfigType: &xds_core.TransportSocket_TypedConfig{
TypedConfig: marshalledUpstreamTLSContext,
},
},
}
// Configure service discovery based on traffic policies
remoteCluster.ClusterDiscoveryType = &xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_EDS}
remoteCluster.EdsClusterConfig = &xds_cluster.Cluster_EdsClusterConfig{EdsConfig: envoy.GetADSConfigSource()}
remoteCluster.LbPolicy = xds_cluster.Cluster_ROUND_ROBIN
if config.EnableEnvoyActiveHealthChecks {
enableHealthChecksOnCluster(remoteCluster, config.Service)
}
return remoteCluster
}
// getMulticlusterGatewayUpstreamServiceCluster returns an Envoy Cluster corresponding to the given upstream service for the multicluster gateway
func getMulticlusterGatewayUpstreamServiceCluster(catalog catalog.MeshCataloger, upstreamSvc service.MeshService, withActiveHealthChecks bool) (*xds_cluster.Cluster, error) {
HTTP2ProtocolOptions, err := envoy.GetHTTP2ProtocolOptions()
if err != nil {
return nil, err
}
remoteCluster := &xds_cluster.Cluster{
Name: upstreamSvc.ServerName(),
ClusterDiscoveryType: &xds_cluster.Cluster_Type{
Type: xds_cluster.Cluster_STRICT_DNS,
},
LbPolicy: xds_cluster.Cluster_ROUND_ROBIN,
TypedExtensionProtocolOptions: HTTP2ProtocolOptions,
LoadAssignment: &xds_endpoint.ClusterLoadAssignment{
ClusterName: upstreamSvc.ServerName(),
Endpoints: []*xds_endpoint.LocalityLbEndpoints{
{
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(upstreamSvc.ServerName(), uint32(upstreamSvc.TargetPort)),
},
},
}},
},
},
},
}
if withActiveHealthChecks {
enableHealthChecksOnCluster(remoteCluster, upstreamSvc)
}
return remoteCluster, nil
}
func enableHealthChecksOnCluster(cluster *xds_cluster.Cluster, upstreamSvc service.MeshService) {
cluster.HealthChecks = []*xds_core.HealthCheck{
{
Timeout: durationpb.New(1 * time.Second),
Interval: durationpb.New(10 * time.Second),
HealthyThreshold: wrapperspb.UInt32(1),
UnhealthyThreshold: wrapperspb.UInt32(3),
HealthChecker: &xds_core.HealthCheck_HttpHealthCheck_{
HttpHealthCheck: &xds_core.HealthCheck_HttpHealthCheck{
Host: upstreamSvc.ServerName(),
Path: envoy.EnvoyActiveHealthCheckPath,
RequestHeadersToAdd: []*xds_core.HeaderValueOption{
{
Header: &xds_core.HeaderValue{
Key: envoy.EnvoyActiveHealthCheckHeaderKey,
Value: "1",
},
},
},
},
},
},
}
}
// getLocalServiceCluster returns an Envoy Cluster corresponding to the local service
func getLocalServiceCluster(config trafficpolicy.MeshClusterConfig) *xds_cluster.Cluster {
HTTP2ProtocolOptions, err := envoy.GetHTTP2ProtocolOptions()
if err != nil {
log.Error().Err(err).Msgf("Error marshalling HTTP2ProtocolOptions for local cluster %s", config.Name)
return nil
}
return &xds_cluster.Cluster{
// The name must match the domain being cURLed in the demo
Name: config.Name,
AltStatName: config.Name,
LbPolicy: xds_cluster.Cluster_ROUND_ROBIN,
RespectDnsTtl: true,
ClusterDiscoveryType: &xds_cluster.Cluster_Type{
Type: xds_cluster.Cluster_STRICT_DNS,
},
DnsLookupFamily: xds_cluster.Cluster_V4_ONLY,
LoadAssignment: &xds_endpoint.ClusterLoadAssignment{
// NOTE: results.MeshService is the top level service that is cURLed.
ClusterName: config.Name,
Endpoints: []*xds_endpoint.LocalityLbEndpoints{
// Filled based on discovered endpoints for the service
{
Locality: &xds_core.Locality{
Zone: "zone",
},
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(config.Address, config.Port),
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Value: constants.ClusterWeightAcceptAll, // Local cluster accepts all traffic
},
}},
},
},
},
TypedExtensionProtocolOptions: HTTP2ProtocolOptions,
}
}
// getPrometheusCluster returns an Envoy Cluster responsible for scraping metrics by Prometheus
func getPrometheusCluster() *xds_cluster.Cluster {
return &xds_cluster.Cluster{
Name: constants.EnvoyMetricsCluster,
AltStatName: constants.EnvoyMetricsCluster,
ClusterDiscoveryType: &xds_cluster.Cluster_Type{
Type: xds_cluster.Cluster_STATIC,
},
LbPolicy: xds_cluster.Cluster_ROUND_ROBIN,
LoadAssignment: &xds_endpoint.ClusterLoadAssignment{
// NOTE: results.MeshService is the top level service that is accessed.
ClusterName: constants.EnvoyMetricsCluster,
Endpoints: []*xds_endpoint.LocalityLbEndpoints{
{
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(constants.LocalhostIPAddress, constants.EnvoyAdminPort),
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Value: constants.ClusterWeightAcceptAll,
},
}},
},
},
},
}
}
// getEgressClusters returns a slice of XDS cluster objects for the given egress cluster configs.
// If the cluster config is invalid, an error is logged and the corresponding cluster config is ignored.
func getEgressClusters(clusterConfigs []*trafficpolicy.EgressClusterConfig) []*xds_cluster.Cluster {
if clusterConfigs == nil {
return nil
}
var egressClusters []*xds_cluster.Cluster
for _, config := range clusterConfigs {
switch config.Host {
case "":
// Cluster config does not have a Host specified, route it to its original destination.
// Used for TCP based clusters
if originalDestinationEgressCluster, err := getOriginalDestinationEgressCluster(config.Name); err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrGettingOrgDstEgressCluster)).
Msg("Error building the original destination cluster for the given egress cluster config")
} else {
egressClusters = append(egressClusters, originalDestinationEgressCluster)
}
default:
// Cluster config has a Host specified, route it based on the Host resolved using DNS.
// Used for HTTP based clusters
if cluster, err := getDNSResolvableEgressCluster(config); err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrGettingDNSEgressCluster)).
Msg("Error building cluster for the given egress cluster config")
} else {
egressClusters = append(egressClusters, cluster)
}
}
}
return egressClusters
}
// getDNSResolvableEgressCluster returns an XDS cluster object that is resolved using DNS for the given egress cluster config.
// If the egress cluster config is invalid, an error is returned.
func getDNSResolvableEgressCluster(config *trafficpolicy.EgressClusterConfig) (*xds_cluster.Cluster, error) {
if config == nil {
return nil, errors.New("Invalid egress cluster config: nil type")
}
if config.Name == "" {
return nil, errors.New("Invalid egress cluster config: Name unspecified")
}
if config.Host == "" {
return nil, errors.New("Invalid egress cluster config: Host unspecified")
}
if config.Port == 0 {
return nil, errors.New("Invalid egress cluster config: Port unspecified")
}
return &xds_cluster.Cluster{
Name: config.Name,
AltStatName: formatAltStatNameForPrometheus(config.Name),
ClusterDiscoveryType: &xds_cluster.Cluster_Type{
Type: xds_cluster.Cluster_STRICT_DNS,
},
LbPolicy: xds_cluster.Cluster_ROUND_ROBIN,
LoadAssignment: &xds_endpoint.ClusterLoadAssignment{
ClusterName: config.Name,
Endpoints: []*xds_endpoint.LocalityLbEndpoints{
{
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(config.Host, uint32(config.Port)),
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Value: constants.ClusterWeightAcceptAll,
},
}},
},
},
},
}, nil
}
// getOriginalDestinationEgressCluster returns an Envoy cluster that routes traffic to its original destination.
// The original destination is the original IP address and port prior to being redirected to the sidecar proxy.
func getOriginalDestinationEgressCluster(name string) (*xds_cluster.Cluster, error) {
HTTP2ProtocolOptions, err := envoy.GetHTTP2ProtocolOptions()
if err != nil {
return nil, err
}
return &xds_cluster.Cluster{
Name: name,
ClusterDiscoveryType: &xds_cluster.Cluster_Type{
Type: xds_cluster.Cluster_ORIGINAL_DST,
},
LbPolicy: xds_cluster.Cluster_CLUSTER_PROVIDED,
TypedExtensionProtocolOptions: HTTP2ProtocolOptions,
}, nil
}
// formatAltStatNameForPrometheus returns an altStatName for a Envoy cluster. If the cluster name contains
// periods or colons the characters must be removed so that the name is correctly interpreted by Envoy when
// generating stats/prometheus. The Envoy cluster's name can remain the same, and the formatted cluster name
// can be assigned to the cluster's altStatName.
func formatAltStatNameForPrometheus(clusterName string) string {
return replacer.Replace(clusterName)
}
func upstreamClustersFromClusterConfigs(downstreamIdentity identity.ServiceIdentity, configs []*trafficpolicy.MeshClusterConfig) []*xds_cluster.Cluster {
var clusters []*xds_cluster.Cluster
for _, c := range configs {
clusters = append(clusters, getUpstreamServiceCluster(downstreamIdentity, *c))
}
return clusters
}
func localClustersFromClusterConfigs(configs []*trafficpolicy.MeshClusterConfig) []*xds_cluster.Cluster {
var clusters []*xds_cluster.Cluster
for _, c := range configs {
clusters = append(clusters, getLocalServiceCluster(*c))
}
return clusters
}