diff --git a/admiral/pkg/clusters/envoyfilter.go b/admiral/pkg/clusters/envoyfilter.go index d46dbb3b..5840c265 100644 --- a/admiral/pkg/clusters/envoyfilter.go +++ b/admiral/pkg/clusters/envoyfilter.go @@ -1,6 +1,8 @@ package clusters import ( + "bytes" + "encoding/gob" "fmt" "github.com/gogo/protobuf/types" v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" @@ -18,11 +20,18 @@ var ( ) const hostsKey = "hosts: " -func createOrUpdateEnvoyFilter( rc *RemoteController, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType, workloadIdentityKey string, admiralCache *AdmiralCache) (*networking.EnvoyFilter, error) { - +func createOrUpdateEnvoyFilter( rc *RemoteController, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType, workloadIdentityKey string, admiralCache *AdmiralCache, workloadSelectorString string) (*networking.EnvoyFilter, error) { workloadSelectors := make(map[string]string) - workloadSelectors[common.GetWorkloadIdentifier()] = workloadIdentityKey - workloadSelectors[common.GetEnvKey()] = common.GetRoutingPolicyEnv(routingPolicy) + buffer := new(bytes.Buffer) + buffer.WriteString(workloadSelectorString) + decoder := gob.NewDecoder(buffer) + + // Decoding the serialized data into map + err := decoder.Decode(&workloadSelectors) + if err != nil { + log.Error("error during decoding map data") + return nil, err + } envoyfilterSpec := constructEnvoyFilterStruct(routingPolicy, workloadSelectors) diff --git a/admiral/pkg/clusters/envoyfilter_test.go b/admiral/pkg/clusters/envoyfilter_test.go index b42d8347..2d8e8ffc 100644 --- a/admiral/pkg/clusters/envoyfilter_test.go +++ b/admiral/pkg/clusters/envoyfilter_test.go @@ -1,7 +1,9 @@ package clusters import ( + "bytes" "context" + "encoding/gob" "errors" "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model" v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" @@ -82,21 +84,28 @@ func TestCreateOrUpdateEnvoyFilter(t *testing.T) { Status: v1.RoutingPolicyStatus{}, } + selectors := map[string]string{"one":"test1", "two":"test2"} + + workloadSelectorBuffer := new(bytes.Buffer) + encoder := gob.NewEncoder(workloadSelectorBuffer) + err := encoder.Encode(selectors) + assert.Nil(t, err) + getSha1 = getSha1Error - envoyfilter, err := createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "barstage", registry.AdmiralCache) + envoyfilter, err := createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "barstage", registry.AdmiralCache, workloadSelectorBuffer.String()) assert.NotNil(t, err) assert.Nil(t, envoyfilter) getSha1 = common.GetSha1 - envoyfilter, err = createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "bar", registry.AdmiralCache) - assert.Equal(t, "bar", envoyfilter.Spec.WorkloadSelector.GetLabels()["identity"]) - assert.Equal(t, "stage", envoyfilter.Spec.WorkloadSelector.GetLabels()["admiral.io/env"]) + envoyfilter, err = createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "bar", registry.AdmiralCache, workloadSelectorBuffer.String()) + assert.Equal(t, "test1", envoyfilter.Spec.WorkloadSelector.GetLabels()["one"]) + assert.Equal(t, "test2", envoyfilter.Spec.WorkloadSelector.GetLabels()["two"]) assert.Equal(t, "test-dynamicrouting-d0fdd-1.13", envoyfilter.Name) - envoyfilter, err = createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Update, "bar", registry.AdmiralCache) + envoyfilter, err = createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Update, "bar", registry.AdmiralCache, workloadSelectorBuffer.String()) assert.Nil(t, err) @@ -105,7 +114,7 @@ func TestCreateOrUpdateEnvoyFilter(t *testing.T) { return true, nil, errors.New("error creating envoyfilter") }, ) - envoyfilter3, err := createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "bar2", registry.AdmiralCache) + envoyfilter3, err := createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "bar2", registry.AdmiralCache, workloadSelectorBuffer.String()) assert.NotNil(t, err) assert.Nil(t, envoyfilter3) diff --git a/admiral/pkg/clusters/registry.go b/admiral/pkg/clusters/registry.go index 5fc242d8..fc76b064 100644 --- a/admiral/pkg/clusters/registry.go +++ b/admiral/pkg/clusters/registry.go @@ -60,6 +60,7 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis w.AdmiralCache = &AdmiralCache{ IdentityClusterCache: common.NewMapOfMaps(), + WorkloadSelectorCache: common.NewMapOfMaps(), CnameClusterCache: common.NewMapOfMaps(), CnameDependentClusterCache: common.NewMapOfMaps(), ClusterLocalityCache: common.NewMapOfMaps(), diff --git a/admiral/pkg/clusters/serviceentry.go b/admiral/pkg/clusters/serviceentry.go index 6ad3934d..341b61db 100644 --- a/admiral/pkg/clusters/serviceentry.go +++ b/admiral/pkg/clusters/serviceentry.go @@ -141,6 +141,10 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s } remoteRegistry.AdmiralCache.IdentityClusterCache.Put(sourceIdentity, rc.ClusterID, rc.ClusterID) + workloadSelectors := GetServiceSelector(rc.ClusterID, serviceInstance) + if workloadSelectors != "" { + remoteRegistry.AdmiralCache.WorkloadSelectorCache.Put(sourceIdentity, rc.ClusterID, workloadSelectors) + } remoteRegistry.AdmiralCache.CnameClusterCache.Put(cname, rc.ClusterID, rc.ClusterID) remoteRegistry.AdmiralCache.CnameIdentityCache.Store(cname, sourceIdentity) sourceServices[rc.ClusterID] = serviceInstance diff --git a/admiral/pkg/clusters/types.go b/admiral/pkg/clusters/types.go index dd7009fd..4862e655 100644 --- a/admiral/pkg/clusters/types.go +++ b/admiral/pkg/clusters/types.go @@ -43,6 +43,7 @@ type AdmiralCache struct { CnameDependentClusterCache *common.MapOfMaps CnameIdentityCache *sync.Map IdentityClusterCache *common.MapOfMaps + WorkloadSelectorCache *common.MapOfMaps ClusterLocalityCache *common.MapOfMaps IdentityDependencyCache *common.MapOfMaps SubsetServiceEntryIdentityCache *sync.Map @@ -242,13 +243,15 @@ func (r RoutingPolicyHandler) processroutingPolicy(dependents map[string]string, // Check if the dependent exists in this remoteCluster. If so, we create an envoyFilter with dependent identity as workload selector if _, ok := r.RemoteRegistry.AdmiralCache.IdentityClusterCache.Get(dependent).Copy()[remoteController.ClusterID]; ok { + if selectors, ok := r.RemoteRegistry.AdmiralCache.WorkloadSelectorCache.Get(dependent).Copy()[remoteController.ClusterID]; ok { - filter, err := createOrUpdateEnvoyFilter(remoteController, routingPolicy, eventType, dependent, r.RemoteRegistry.AdmiralCache) - if err != nil { - // Best effort create - log.Errorf(LogErrFormat, eventType, "routingpolicy", routingPolicy.Name, remoteController.ClusterID, err) - } else { - log.Infof("msg=%s name=%s cluster=%s", "created envoyfilter", filter.Name, remoteController.ClusterID) + filter, err := createOrUpdateEnvoyFilter(remoteController, routingPolicy, eventType, dependent, r.RemoteRegistry.AdmiralCache, selectors) + if err != nil { + // Best effort create + log.Errorf(LogErrFormat, eventType, "routingpolicy", routingPolicy.Name, remoteController.ClusterID, err) + } else { + log.Infof("msg=%s name=%s cluster=%s", "created envoyfilter", filter.Name, remoteController.ClusterID) + } } } } diff --git a/admiral/pkg/clusters/util.go b/admiral/pkg/clusters/util.go index e4f1550b..0a65f6b7 100644 --- a/admiral/pkg/clusters/util.go +++ b/admiral/pkg/clusters/util.go @@ -1,6 +1,8 @@ package clusters import ( + "bytes" + "encoding/gob" "errors" argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" @@ -27,6 +29,23 @@ func GetMeshPortsForRollout(clusterName string, destService *k8sV1.Service, return ports } +func GetServiceSelector(clusterName string, destService *k8sV1.Service) string { + var selectors = destService.Spec.Selector + if len(selectors) == 0{ + return "" + } + buffer := new(bytes.Buffer) + encoder := gob.NewEncoder(buffer) + // Encoding the map + err := encoder.Encode(selectors) + if err != nil { + log.Warnf(LogErrFormat, "GetServiceLabels", "Failed to encode map", destService.Name, clusterName, err) + }else { + log.Debugf(LogFormat, "GetServiceLabels", "Encoded Map", destService.Name, clusterName, selectors) + } + return buffer.String() +} + func getMeshPortsHelper(meshPorts string, destService *k8sV1.Service, clusterName string) map[string]uint32 { var ports = make(map[string]uint32) diff --git a/admiral/pkg/clusters/util_test.go b/admiral/pkg/clusters/util_test.go index 216a483b..582b2ab1 100644 --- a/admiral/pkg/clusters/util_test.go +++ b/admiral/pkg/clusters/util_test.go @@ -1,18 +1,20 @@ package clusters import ( + "bytes" + "encoding/gob" "errors" argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" + "github.com/stretchr/testify/assert" k8sAppsV1 "k8s.io/api/apps/v1" coreV1 "k8s.io/api/core/v1" + k8sV1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "reflect" "strconv" "testing" - - k8sV1 "k8s.io/api/core/v1" ) func TestGetMeshPorts(t *testing.T) { @@ -215,6 +217,62 @@ func TestValidateConfigmapBeforePutting(t *testing.T) { } +func TestGetServiceSelector(t *testing.T) { + + selector := map[string]string {"app":"test1"} + buffer := new(bytes.Buffer) + + e := gob.NewEncoder(buffer) + + // Encoding the map + err := e.Encode(selector) + assert.Nil(t, err) + + testCases := []struct { + name string + clusterName string + service k8sV1.Service + expected string + }{ + { + name: "should return a selectors based on service", + clusterName: "test-cluster", + service: k8sV1.Service{ + ObjectMeta: v1.ObjectMeta{Name: "server", Labels: map[string]string{"asset": "Intuit.platform.mesh.server"}}, + Spec: k8sV1.ServiceSpec{Selector: selector}, + }, + expected: buffer.String(), + }, + { + name: "should return empty selectors", + clusterName: "test-cluster", + service: k8sV1.Service{ + ObjectMeta: v1.ObjectMeta{Name: "server", Labels: map[string]string{"asset": "Intuit.platform.mesh.server"}}, + Spec: k8sV1.ServiceSpec{Selector: map[string]string{}}, + }, + expected: "", + }, + { + name: "should return nil", + clusterName: "test-cluster", + service: k8sV1.Service{ + ObjectMeta: v1.ObjectMeta{Name: "server", Labels: map[string]string{"asset": "Intuit.platform.mesh.server"}}, + Spec: k8sV1.ServiceSpec{Selector: nil}, + }, + expected: "", + }, + } + + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + selectors := GetServiceSelector(c.clusterName,&c.service) + if !reflect.DeepEqual(selectors, c.expected) { + t.Errorf("Wanted selectors: %v, got: %v", c.expected, selectors) + } + }) + } +} + func TestGetMeshPortsForRollout(t *testing.T) { annotatedPort := 8090