Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generate additional endpoints based on labels set on deployments or rollouts #285

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func GetRootCmd(args []string) *cobra.Command {
"List of identities which should be excluded from getting processed")
rootCmd.PersistentFlags().StringArrayVar(&params.AdditionalEndpointSuffixes, "additional_endpoint_suffixes", []string{},
"Suffixes that Admiral should use to generate additional endpoints through VirtualServices")
rootCmd.PersistentFlags().StringArrayVar(&params.AdditionalEndpointLabelFilters, "additional_endpoint_label_filters", []string{},
"Labels that admiral will check on deployment/rollout before creating additional endpoints. '*' would indicate generating additional endpoints for all deployment/rollouts")
return rootCmd
}

Expand Down
82 changes: 56 additions & 26 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,23 @@ func modifyServiceEntryForNewServiceOrPod(
}

var (
cname string
namespace string
serviceInstance *k8sV1.Service
rollout *argo.Rollout
deployment *k8sAppsV1.Deployment
start = time.Now()
gtpKey = common.ConstructGtpKey(env, sourceIdentity)
clusters = remoteRegistry.GetClusterIds()
gtps = make(map[string][]*v1.GlobalTrafficPolicy)
weightedServices = make(map[string]*WeightedService)
cnames = make(map[string]string)
sourceServices = make(map[string]*k8sV1.Service)
sourceWeightedServices = make(map[string]map[string]*WeightedService)
sourceDeployments = make(map[string]*k8sAppsV1.Deployment)
sourceRollouts = make(map[string]*argo.Rollout)
serviceEntries = make(map[string]*networking.ServiceEntry)
cname string
namespace string
serviceInstance *k8sV1.Service
rollout *argo.Rollout
deployment *k8sAppsV1.Deployment
start = time.Now()
gtpKey = common.ConstructGtpKey(env, sourceIdentity)
clusters = remoteRegistry.GetClusterIds()
gtps = make(map[string][]*v1.GlobalTrafficPolicy)
weightedServices = make(map[string]*WeightedService)
cnames = make(map[string]string)
sourceServices = make(map[string]*k8sV1.Service)
sourceWeightedServices = make(map[string]map[string]*WeightedService)
sourceDeployments = make(map[string]*k8sAppsV1.Deployment)
sourceRollouts = make(map[string]*argo.Rollout)
serviceEntries = make(map[string]*networking.ServiceEntry)
isAdditionalEndpointGenerationEnabled bool
)

for _, clusterId := range clusters {
Expand Down Expand Up @@ -193,17 +194,25 @@ func modifyServiceEntryForNewServiceOrPod(
var meshPorts map[string]uint32
blueGreenStrategy := isBlueGreenStrategy(sourceRollouts[sourceCluster])

var deploymentRolloutLabels map[string]string
if len(sourceDeployments) > 0 {
meshPorts = GetMeshPorts(sourceCluster, serviceInstance, sourceDeployments[sourceCluster])
deployment = sourceDeployments[sourceCluster]
deploymentRolloutLabels = deployment.Labels
} else {
meshPorts = GetMeshPortsForRollout(sourceCluster, serviceInstance, sourceRollouts[sourceCluster])
rollout := sourceRollouts[sourceCluster]
deploymentRolloutLabels = rollout.Labels
}

// check if additional endpoint generation is required
isAdditionalEndpointGenerationEnabled = doGenerateAdditionalEndpoints(deploymentRolloutLabels)

for key, serviceEntry := range serviceEntries {
if len(serviceEntry.Endpoints) == 0 {
AddServiceEntriesWithDr(
ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster},
map[string]*networking.ServiceEntry{key: serviceEntry})
map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled)
}
clusterIngress, _ := rc.ServiceController.Cache.GetLoadBalancer(common.GetAdmiralParams().LabelSet.GatewayApp, common.NamespaceIstioSystem)
for _, ep := range serviceEntry.Endpoints {
Expand All @@ -215,7 +224,7 @@ func modifyServiceEntryForNewServiceOrPod(
updateEndpointsForBlueGreen(sourceRollouts[sourceCluster], sourceWeightedServices[sourceCluster], cnames, ep, sourceCluster, key)
AddServiceEntriesWithDr(
ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster},
map[string]*networking.ServiceEntry{key: serviceEntry})
map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled)
//swap it back to use for next iteration
ep.Address = clusterIngress
ep.Ports = oldPorts
Expand All @@ -226,14 +235,14 @@ func modifyServiceEntryForNewServiceOrPod(
updateEndpointsForWeightedServices(se, sourceWeightedServices[sourceCluster], clusterIngress, meshPorts)
AddServiceEntriesWithDr(
ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster},
map[string]*networking.ServiceEntry{key: se})
map[string]*networking.ServiceEntry{key: se}, isAdditionalEndpointGenerationEnabled)
} else {
ep.Address = localFqdn
oldPorts := ep.Ports
ep.Ports = meshPorts
AddServiceEntriesWithDr(
ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster},
map[string]*networking.ServiceEntry{key: serviceEntry})
map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled)
// swap it back to use for next iteration
ep.Address = clusterIngress
ep.Ports = oldPorts
Expand Down Expand Up @@ -270,7 +279,7 @@ func modifyServiceEntryForNewServiceOrPod(
remoteRegistry.AdmiralCache.CnameDependentClusterCache.Put(cname, clusterId, clusterId)
}

AddServiceEntriesWithDr(ctx, remoteRegistry, dependentClusters, serviceEntries)
AddServiceEntriesWithDr(ctx, remoteRegistry, dependentClusters, serviceEntries, isAdditionalEndpointGenerationEnabled)

util.LogElapsedTimeSince("WriteServiceEntryToDependentClusters", sourceIdentity, env, "", start)

Expand Down Expand Up @@ -510,7 +519,9 @@ func copySidecar(sidecar *v1alpha3.Sidecar) *v1alpha3.Sidecar {
}

//AddServiceEntriesWithDr will create the default service entries and also additional ones specified in GTP
func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClusters map[string]string, serviceEntries map[string]*networking.ServiceEntry) {
func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClusters map[string]string,
serviceEntries map[string]*networking.ServiceEntry, isAdditionalEndpointsEnabled bool) {

cache := rr.AdmiralCache
syncNamespace := common.GetSyncNamespace()
for _, se := range serviceEntries {
Expand Down Expand Up @@ -587,11 +598,13 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus
cache.SeClusterCache.Delete(seDr.ServiceEntry.Hosts[0])

// Delete additional endpoints if any
if isAdditionalEndpointsEnabled() {
if isAdditionalEndpointsEnabled {
err := deleteAdditionalEndpoints(ctx, rc, identityId, env, syncNamespace)
if err != nil {
log.Error(err)
}
} else {
log.Infof(LogFormat, "Delete", "VirtualService", env+"."+identityId, sourceCluster, "skipped deleting additional endpoints through VirtualService in "+syncNamespace+" namespace")
}

}
Expand Down Expand Up @@ -621,11 +634,13 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus
cache.SeClusterCache.Put(newServiceEntry.Spec.Hosts[0], rc.ClusterID, rc.ClusterID)

// Create additional endpoints if necessary
if isAdditionalEndpointsEnabled() {
if isAdditionalEndpointsEnabled {
err := createAdditionalEndpoints(ctx, rc, identityId, env, newServiceEntry.Spec.Hosts[0], syncNamespace)
if err != nil {
log.Error(err)
}
} else {
log.Infof(LogFormat, "Create", "VirtualService", env+"."+identityId, sourceCluster, "skipped creating additional endpoints through VirtualService in "+syncNamespace+" namespace")
}
}
}
Expand All @@ -642,13 +657,28 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus
}
}

func isAdditionalEndpointsEnabled() bool {
// This func returns a bool to indicate if additional endpoints generation is needed
// based on the following conditions.
// 1. Additional endpoint suffixes have been configured in the admiral params
// 2. The rollout/deployment labels passed contains any of the allowed labels
// configured in the admiral params 'additional_endpoint_label_filters'
func doGenerateAdditionalEndpoints(labels map[string]string) bool {
additionalEndpointSuffixes := common.GetAdditionalEndpointSuffixes()
if len(additionalEndpointSuffixes) <= 0 {
log.Debugf("no additional endpoints configured")
return false
}
return true
// Check if admiral configured allowed labels are in the passed labels map
additionalEndpointAnnotationFilters := common.GetAdditionalEndpointLabelFilters()
for _, filter := range additionalEndpointAnnotationFilters {
if filter == "*" {
return true
}
if _, ok := labels[filter]; ok {
return true
}
}
return false
}

func validateAdditionalEndpointParams(identity, env string) error {
Expand Down
64 changes: 63 additions & 1 deletion admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func TestAddServiceEntriesWithDr(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, tt.serviceEntries)
AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, tt.serviceEntries, false)
if tt.dnsPrefix != "" && tt.dnsPrefix != "default" {
tt.serviceEntries["se1"].Hosts = []string{tt.dnsPrefix + ".e2e.foo.global"}
}
Expand Down Expand Up @@ -2781,3 +2781,65 @@ func TestGetAdmiralGeneratedVirtualService(t *testing.T) {
})
}
}

func TestDoGenerateAdditionalEndpoints(t *testing.T) {

testcases := []struct {
name string
labels map[string]string
additionalEndpointSuffixes []string
additionalEndpointLabelFilters []string
expectedResult bool
}{
{
name: "Given additional endpoint suffixes and labels, when no additional endpoint suffixes are set, then the func should return false",
labels: map[string]string{"foo": "bar"},
expectedResult: false,
},
{
name: "Given additional endpoint suffixes and labels, when no additional endpoint labels filters are set, then the func should return false",
labels: map[string]string{"foo": "bar"},
additionalEndpointSuffixes: []string{"fuzz"},
expectedResult: false,
},
{
name: "Given additional endpoint suffixes and labels, when additional endpoint labels filters contains '*', then the func should return true",
labels: map[string]string{"foo": "bar"},
additionalEndpointSuffixes: []string{"fuzz"},
additionalEndpointLabelFilters: []string{"*"},
expectedResult: true,
},
{
name: "Given additional endpoint suffixes and labels, when additional endpoint label filters do not include any key in labels, then it should return false",
labels: map[string]string{"foo": "bar"},
additionalEndpointSuffixes: []string{"fuzz"},
additionalEndpointLabelFilters: []string{"baz"},
expectedResult: false,
},
{
name: "Given additional endpoint suffixes and labels, when additional endpoint labels filters contains one of the keys in the labels, then it should return true",
labels: map[string]string{"foo": "bar"},
additionalEndpointSuffixes: []string{"fuzz"},
additionalEndpointLabelFilters: []string{"foo"},
expectedResult: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {

admiralParams := common.AdmiralParams{
AdditionalEndpointSuffixes: tc.additionalEndpointSuffixes,
AdditionalEndpointLabelFilters: tc.additionalEndpointLabelFilters,
}
common.ResetSync()
common.InitializeConfig(admiralParams)

actual := doGenerateAdditionalEndpoints(tc.labels)

if actual != tc.expectedResult {
t.Errorf("expected %t, got %t", tc.expectedResult, actual)
}
})
}
}
4 changes: 4 additions & 0 deletions admiral/pkg/controller/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func GetAdditionalEndpointSuffixes() []string {
return admiralParams.AdditionalEndpointSuffixes
}

func GetAdditionalEndpointLabelFilters() []string {
return admiralParams.AdditionalEndpointLabelFilters
}

func GetHostnameSuffix() string {
return admiralParams.HostnameSuffix
}
Expand Down
49 changes: 25 additions & 24 deletions admiral/pkg/controller/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,31 @@ type SidecarEgressMap struct {
}

type AdmiralParams struct {
ArgoRolloutsEnabled bool
KubeconfigPath string
CacheRefreshDuration time.Duration
ClusterRegistriesNamespace string
DependenciesNamespace string
SyncNamespace string
EnableSAN bool
SANPrefix string
SecretResolver string
LabelSet *LabelSet
LogLevel int
HostnameSuffix string
PreviewHostnamePrefix string
MetricsEnabled bool
WorkloadSidecarUpdate string
WorkloadSidecarName string
AdmiralStateCheckerName string
DRStateStoreConfigPath string
ServiceEntryIPPrefix string
EnvoyFilterVersion string
EnvoyFilterAdditionalConfig string
EnableRoutingPolicy bool
ExcludedIdentityList []string
AdditionalEndpointSuffixes []string
ArgoRolloutsEnabled bool
KubeconfigPath string
CacheRefreshDuration time.Duration
ClusterRegistriesNamespace string
DependenciesNamespace string
SyncNamespace string
EnableSAN bool
SANPrefix string
SecretResolver string
LabelSet *LabelSet
LogLevel int
HostnameSuffix string
PreviewHostnamePrefix string
MetricsEnabled bool
WorkloadSidecarUpdate string
WorkloadSidecarName string
AdmiralStateCheckerName string
DRStateStoreConfigPath string
ServiceEntryIPPrefix string
EnvoyFilterVersion string
EnvoyFilterAdditionalConfig string
EnableRoutingPolicy bool
ExcludedIdentityList []string
AdditionalEndpointSuffixes []string
AdditionalEndpointLabelFilters []string
}

func (b AdmiralParams) String() string {
Expand Down