This repository has been archived by the owner on Jul 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 277
/
response.go
104 lines (85 loc) · 4.3 KB
/
response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package eds
import (
"fmt"
"strconv"
"strings"
xds_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/endpoint"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/envoy/registry"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/service"
)
// NewResponse creates a new Endpoint Discovery Response.
func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, request *xds_discovery.DiscoveryRequest, _ configurator.Configurator, _ *certificate.Manager, _ *registry.ProxyRegistry) ([]types.Resource, error) {
// If request comes through and requests specific endpoints, just attempt to answer those
if request != nil && len(request.ResourceNames) > 0 {
return fulfillEDSRequest(meshCatalog, proxy, request)
}
// Otherwise, generate all endpoint configuration for this proxy
return generateEDSConfig(meshCatalog, proxy)
}
// fulfillEDSRequest replies only to requested EDS endpoints on Discovery Request
func fulfillEDSRequest(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, request *xds_discovery.DiscoveryRequest) ([]types.Resource, error) {
if request == nil {
return nil, fmt.Errorf("Endpoint discovery request for proxy %s cannot be nil", proxy.Identity)
}
var rdsResources []types.Resource
for _, cluster := range request.ResourceNames {
meshSvc, err := clusterToMeshSvc(cluster)
if err != nil {
log.Error().Err(err).Msgf("Error retrieving MeshService from Cluster %s", cluster)
continue
}
endpoints := meshCatalog.ListAllowedUpstreamEndpointsForService(proxy.Identity, meshSvc)
log.Trace().Msgf("Endpoints for upstream cluster %s for downstream proxy identity %s: %v", cluster, proxy.Identity, endpoints)
loadAssignment := newClusterLoadAssignment(meshSvc, endpoints)
rdsResources = append(rdsResources, loadAssignment)
}
return rdsResources, nil
}
// generateEDSConfig generates all endpoints expected for a given proxy
func generateEDSConfig(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy) ([]types.Resource, error) {
var edsResources []types.Resource
upstreamSvcEndpoints := getUpstreamEndpointsForProxyIdentity(meshCatalog, proxy.Identity)
for svc, endpoints := range upstreamSvcEndpoints {
loadAssignment := newClusterLoadAssignment(svc, endpoints)
edsResources = append(edsResources, loadAssignment)
}
return edsResources, nil
}
// clusterToMeshSvc returns the MeshService associated with the given cluster name
func clusterToMeshSvc(cluster string) (service.MeshService, error) {
splitFunc := func(r rune) bool {
return r == '/' || r == '|'
}
chunks := strings.FieldsFunc(cluster, splitFunc)
if len(chunks) != 3 {
return service.MeshService{}, fmt.Errorf("Invalid cluster name. Expected: <namespace>/<name>|<port>, got: %s", cluster)
}
port, err := strconv.ParseUint(chunks[2], 10, 16)
if err != nil {
return service.MeshService{}, fmt.Errorf("Invalid cluster port %s, expected int value: %w", chunks[2], err)
}
return service.MeshService{
Namespace: chunks[0],
Name: chunks[1],
// The port always maps to MeshService.TargetPort and not MeshService.Port because
// endpoints of a service are derived from it's TargetPort and not Port.
TargetPort: uint16(port),
}, nil
}
// getUpstreamEndpointsForProxyIdentity returns only those service endpoints that belong to the allowed upstream service accounts for the proxy
// Note: ServiceIdentity must be in the format "name.namespace" [https://github.com/openservicemesh/osm/issues/3188]
func getUpstreamEndpointsForProxyIdentity(meshCatalog catalog.MeshCataloger, proxyIdentity identity.ServiceIdentity) map[service.MeshService][]endpoint.Endpoint {
allowedServicesEndpoints := make(map[service.MeshService][]endpoint.Endpoint)
for _, dstSvc := range meshCatalog.ListOutboundServicesForIdentity(proxyIdentity) {
allowedServicesEndpoints[dstSvc] = meshCatalog.ListAllowedUpstreamEndpointsForService(proxyIdentity, dstSvc)
}
log.Trace().Msgf("Allowed outbound service endpoints for proxy with identity %s: %v", proxyIdentity, allowedServicesEndpoints)
return allowedServicesEndpoints
}