From 3a8fcf439dc6363d14ed008fb71f3e3f0feb1510 Mon Sep 17 00:00:00 2001 From: aattuluri <44482891+aattuluri@users.noreply.github.com> Date: Mon, 2 May 2022 15:09:27 -0700 Subject: [PATCH] Optimize GlobalTrafficPolicy fetching and endpoint generation time (#206) Signed-off-by: psikka1 --- .gitignore | 3 +- admiral/pkg/clusters/registry.go | 4 - admiral/pkg/clusters/registry_test.go | 32 - admiral/pkg/clusters/serviceentry.go | 50 +- admiral/pkg/clusters/serviceentry_test.go | 76 +- admiral/pkg/clusters/types.go | 271 +------ admiral/pkg/clusters/types_test.go | 751 ------------------ .../pkg/controller/admiral/globaltraffic.go | 106 ++- .../controller/admiral/globaltraffic_test.go | 178 ++++- admiral/pkg/controller/common/common.go | 127 +-- admiral/pkg/controller/common/common_test.go | 270 +------ 11 files changed, 413 insertions(+), 1455 deletions(-) diff --git a/.gitignore b/.gitignore index 358f6f8d..3fd17e09 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ .idea/vcs.xml out* -*.tar.gz \ No newline at end of file +*.tar.gz +*.out diff --git a/admiral/pkg/clusters/registry.go b/admiral/pkg/clusters/registry.go index e4412a0d..acba5273 100644 --- a/admiral/pkg/clusters/registry.go +++ b/admiral/pkg/clusters/registry.go @@ -5,12 +5,10 @@ import ( "fmt" "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/istio" - v12 "k8s.io/api/apps/v1" "k8s.io/client-go/rest" "sync" "time" - argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/secret" @@ -46,8 +44,6 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis gtpCache := &globalTrafficCache{} gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) gtpCache.mutex = &sync.Mutex{} w.AdmiralCache = &AdmiralCache{ diff --git a/admiral/pkg/clusters/registry_test.go b/admiral/pkg/clusters/registry_test.go index cac474b4..e05fa775 100644 --- a/admiral/pkg/clusters/registry_test.go +++ b/admiral/pkg/clusters/registry_test.go @@ -312,38 +312,6 @@ func TestAdded(t *testing.T) { } -func TestPodHandler(t *testing.T) { - - p := common.AdmiralParams{ - KubeconfigPath: "testdata/fake.config", - } - - rr, _ := InitAdmiral(context.Background(), p) - - rc, _ := createMockRemoteController(func(i interface{}) { - - }) - rr.RemoteControllers["test.cluster"] = rc - - ph := PodHandler{ - RemoteRegistry: rr, - } - - pod := k8sCoreV1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "test", - }, - Spec: k8sCoreV1.PodSpec{ - Hostname: "test.local", - }, - } - - ph.Added(&pod) - - ph.Deleted(&pod) -} - func TestGetServiceForDeployment(t *testing.T) { baseRc, _ := createMockRemoteController(func(i interface{}) { //res := i.(istio.Config) diff --git a/admiral/pkg/clusters/serviceentry.go b/admiral/pkg/clusters/serviceentry.go index 526a6ab4..04ad0ff6 100644 --- a/admiral/pkg/clusters/serviceentry.go +++ b/admiral/pkg/clusters/serviceentry.go @@ -6,6 +6,7 @@ import ( "math" "math/rand" "reflect" + "sort" "strconv" "strings" "time" @@ -70,6 +71,11 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s var serviceInstance *k8sV1.Service var weightedServices map[string]*WeightedService var rollout *admiral.RolloutClusterEntry + var gtps = make(map[string][]*v1.GlobalTrafficPolicy) + + var namespace string + + var gtpKey = common.ConstructGtpKey(sourceIdentity, env) start := time.Now() for _, rc := range remoteRegistry.RemoteControllers { @@ -87,7 +93,7 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s if serviceInstance == nil { continue } - + namespace = deploymentInstance.Namespace localMeshPorts := GetMeshPorts(rc.ClusterID, serviceInstance, deploymentInstance) cname = common.GetCname(deploymentInstance, common.GetWorkloadIdentifier(), common.GetHostnameSuffix()) @@ -106,7 +112,7 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s serviceInstance = sInstance.Service break } - + namespace = rolloutInstance.Namespace localMeshPorts := GetMeshPortsForRollout(rc.ClusterID, serviceInstance, rolloutInstance) cname = common.GetCnameForRollout(rolloutInstance, common.GetWorkloadIdentifier(), common.GetHostnameSuffix()) @@ -117,6 +123,11 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s continue } + gtpsInNamespace := rc.GlobalTraffic.Cache.Get(gtpKey, namespace) + if len(gtps) > 0 { + gtps[rc.ClusterID] = gtpsInNamespace + } + remoteRegistry.AdmiralCache.IdentityClusterCache.Put(sourceIdentity, rc.ClusterID, rc.ClusterID) remoteRegistry.AdmiralCache.CnameClusterCache.Put(cname, rc.ClusterID, rc.ClusterID) remoteRegistry.AdmiralCache.CnameIdentityCache.Store(cname, sourceIdentity) @@ -126,6 +137,9 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s util.LogElapsedTimeSince("BuildServiceEntry", sourceIdentity, env, "", start) + //cache the latest GTP in global cache to be reused during DR creation + updateGlobalGtpCache(remoteRegistry.AdmiralCache, sourceIdentity, env, gtps) + dependents := remoteRegistry.AdmiralCache.IdentityDependencyCache.Get(sourceIdentity).Copy() //handle local updates (source clusters first) @@ -215,6 +229,38 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s return serviceEntries } +//Does two things; +//i) Picks the GTP that was created most recently from the passed in GTP list (GTPs from all clusters) +//ii) Updates the global GTP cache with the selected GTP in i) +func updateGlobalGtpCache(cache *AdmiralCache, identity, env string, gtps map[string][]*v1.GlobalTrafficPolicy) { + defer util.LogElapsedTime("updateGlobalGtpCache", identity, env, "")() + gtpsOrdered := make([]*v1.GlobalTrafficPolicy, 0) + for _, gtpsInCluster := range gtps { + gtpsOrdered = append(gtpsOrdered, gtpsInCluster...) + } + if len(gtpsOrdered) == 0 { + cache.GlobalTrafficCache.Delete(env, identity) + return + } else if len(gtpsOrdered) > 1 { + //sort by creation time with most recent at the beginning + sort.Slice(gtpsOrdered, func(i, j int) bool { + iTime := gtpsOrdered[i].CreationTimestamp.Nanosecond() + jTime := gtpsOrdered[j].CreationTimestamp.Nanosecond() + return iTime > jTime + }) + } + + mostRecentGtp := gtpsOrdered[0] + + err := cache.GlobalTrafficCache.Put(mostRecentGtp) + + if err != nil { + log.Errorf("Error in updating GTP with name=%s in namespace=%s as actively used for identity=%s with err=%v", mostRecentGtp.Name, mostRecentGtp.Namespace, common.GetGtpKey(mostRecentGtp), err) + } else { + log.Infof("GTP with name=%s in namespace=%s is actively used for identity=%s", mostRecentGtp.Name, mostRecentGtp.Namespace, common.GetGtpKey(mostRecentGtp)) + } +} + func updateEndpointsForBlueGreen(rollout *argo.Rollout, weightedServices map[string]*WeightedService, cnames map[string]string, ep *networking.ServiceEntry_Endpoint, sourceCluster string, meshHost string) { activeServiceName := rollout.Spec.Strategy.BlueGreen.ActiveService diff --git a/admiral/pkg/clusters/serviceentry_test.go b/admiral/pkg/clusters/serviceentry_test.go index 2e9a1edf..40dee1a8 100644 --- a/admiral/pkg/clusters/serviceentry_test.go +++ b/admiral/pkg/clusters/serviceentry_test.go @@ -88,7 +88,6 @@ func TestAddServiceEntriesWithDr(t *testing.T) { gtpCache := &globalTrafficCache{} gtpCache.identityCache = make(map[string]*v13.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v14.Deployment) gtpCache.mutex = &sync.Mutex{} admiralCache.GlobalTrafficCache = gtpCache @@ -890,6 +889,8 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) { } s, e := admiral.NewServiceController(make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300)) + gtpc, e := admiral.NewGlobalTrafficController(make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300)) + cacheWithEntry := ServiceEntryAddressStore{ EntryAddresses: map[string]string{"test.test.mesh-se": common.LocalAddressPrefix + ".10.1"}, Addresses: []string{common.LocalAddressPrefix + ".10.1"}, @@ -912,6 +913,7 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) { RolloutController: r, ServiceController: s, VirtualServiceController: v, + GlobalTraffic: gtpc, } rc.ClusterID = "test.cluster" rr.RemoteControllers["test.cluster"] = rc @@ -1019,6 +1021,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) { t.Fail() } s, e := admiral.NewServiceController(make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300)) + gtpc, e := admiral.NewGlobalTrafficController(make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300)) cacheWithEntry := ServiceEntryAddressStore{ EntryAddresses: map[string]string{ @@ -1045,6 +1048,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) { RolloutController: r, ServiceController: s, VirtualServiceController: v, + GlobalTraffic: gtpc, } rc.ClusterID = "test.cluster" rr.RemoteControllers["test.cluster"] = rc @@ -1331,6 +1335,76 @@ func TestUpdateEndpointsForWeightedServices(t *testing.T) { } +func TestUpdateGlobalGtpCache(t *testing.T) { + + var ( + admiralCache = &AdmiralCache{GlobalTrafficCache: &globalTrafficCache{identityCache: make(map[string]*v13.GlobalTrafficPolicy), mutex: &sync.Mutex{}}} + + identity1 = "identity1" + + env_stage = "stage" + + gtp = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-30))), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{ + Policy: []*model.TrafficPolicy {{DnsPrefix: "hello"}}, + },} + + gtp2 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp2", Namespace: "namespace1", CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15))), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{ + Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp2"}}, + },} + + gtp3 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp3", Namespace: "namespace2", CreationTimestamp: v12.NewTime(time.Now()), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{ + Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp3"}}, + },} + + ) + + testCases := []struct { + name string + identity string + env string + gtps map[string][]*v13.GlobalTrafficPolicy + expectedGtp *v13.GlobalTrafficPolicy + }{ { + name: "Should return nil when no GTP present", + gtps: map[string][]*v13.GlobalTrafficPolicy{}, + identity: identity1, + env: env_stage, + expectedGtp: nil, + }, + { + name: "Should return the only existing gtp", + gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp}}, + identity: identity1, + env: env_stage, + expectedGtp: gtp, + }, + { + name: "Should return the gtp recently created within the cluster", + gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}}, + identity: identity1, + env: env_stage, + expectedGtp: gtp2, + }, + { + name: "Should return the gtp recently created from another cluster", + gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}, "c2": {gtp3}}, + identity: identity1, + env: env_stage, + expectedGtp: gtp3, + }, + } + + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + updateGlobalGtpCache(admiralCache, c.identity, c.env, c.gtps) + gtp := admiralCache.GlobalTrafficCache.GetFromIdentity(c.identity, c.env) + if !reflect.DeepEqual(c.expectedGtp, gtp) { + t.Errorf("Test %s failed expected gtp: %v got %v", c.name, c.expectedGtp, gtp) + } + }) + } +} + func isLower(s string) bool { for _, r := range s { if !unicode.IsLower(r) && unicode.IsLetter(r) { diff --git a/admiral/pkg/clusters/types.go b/admiral/pkg/clusters/types.go index 41fef53f..f0c34af9 100644 --- a/admiral/pkg/clusters/types.go +++ b/admiral/pkg/clusters/types.go @@ -3,7 +3,6 @@ package clusters import ( "context" "errors" - "fmt" argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral" @@ -12,7 +11,6 @@ import ( "github.com/istio-ecosystem/admiral/admiral/pkg/controller/secret" log "github.com/sirupsen/logrus" k8sAppsV1 "k8s.io/api/apps/v1" - k8sV1 "k8s.io/api/core/v1" k8s "k8s.io/client-go/kubernetes" "sync" "time" @@ -115,119 +113,37 @@ type globalTrafficCache struct { //map of global traffic policies key=environment.identity, value: GlobalTrafficPolicy object identityCache map[string]*v1.GlobalTrafficPolicy - //map of dependencies. key=namespace.globaltrafficpolicy name. value Deployment object - dependencyCache map[string]*k8sAppsV1.Deployment - - //map of dependencies. key=namespace.globaltrafficpolicy name. value Rollout object - dependencyRolloutCache map[string]*argo.Rollout - mutex *sync.Mutex } func (g *globalTrafficCache) GetFromIdentity(identity string, environment string) *v1.GlobalTrafficPolicy { - return g.identityCache[getCacheKey(environment, identity)] -} - -func (g *globalTrafficCache) GetDeployment(gtpName string) *k8sAppsV1.Deployment { - return g.dependencyCache[gtpName] -} - -func (g *globalTrafficCache) GetRollout(gtpName string) *argo.Rollout { - return g.dependencyRolloutCache[gtpName] -} - -func (g *globalTrafficCache) Put(gtp *v1.GlobalTrafficPolicy, deployment *k8sAppsV1.Deployment) error { - if gtp.Name == "" { - //no GTP, throw error - return errors.New("cannot add an empty globaltrafficpolicy to the cache") - } - defer g.mutex.Unlock() - g.mutex.Lock() - var gtpEnv = common.GetGtpEnv(gtp) - if deployment != nil && deployment.Labels != nil { - log.Infof("Adding Deployment with name %v and gtp with name %v to GTP cache. LabelMatch=%v env=%v", deployment.Name, gtp.Name, gtp.Labels[common.GetGlobalTrafficDeploymentLabel()], gtpEnv) - //we have a valid deployment - env := common.GetEnv(deployment) - identity := deployment.Labels[common.GetWorkloadIdentifier()] - key := getCacheKey(env, identity) - g.identityCache[key] = gtp - } else if g.dependencyCache[gtp.Name] != nil { - log.Infof("Adding gtp with name %v to GTP cache. LabelMatch=%v env=%v", gtp.Name, gtp.Labels[common.GetGlobalTrafficDeploymentLabel()], gtpEnv) - //The old GTP matched a deployment, the new one doesn't. So we need to clear that cache. - oldDeployment := g.dependencyCache[gtp.Name] - env := common.GetEnv(oldDeployment) - identity := oldDeployment.Labels[common.GetWorkloadIdentifier()] - key := getCacheKey(env, identity) - delete(g.identityCache, key) - } - - g.dependencyCache[gtp.Name] = deployment - - return nil + return g.identityCache[common.ConstructGtpKey(environment, identity)] } -func (g *globalTrafficCache) PutRollout(gtp *v1.GlobalTrafficPolicy, rollout *argo.Rollout) error { +func (g *globalTrafficCache) Put(gtp *v1.GlobalTrafficPolicy) error { if gtp.Name == "" { //no GTP, throw error return errors.New("cannot add an empty globaltrafficpolicy to the cache") } defer g.mutex.Unlock() g.mutex.Lock() + var gtpIdentity = gtp.Labels[common.GetGlobalTrafficDeploymentLabel()] var gtpEnv = common.GetGtpEnv(gtp) - if rollout != nil && rollout.Labels != nil { - log.Infof("Adding Rollout with name %v and gtp with name %v to GTP cache. LabelMatch=%v env=%v", rollout.Name, gtp.Name, gtp.Labels[common.GetGlobalTrafficDeploymentLabel()], gtpEnv) - //we have a valid rollout - env := common.GetEnvForRollout(rollout) - identity := rollout.Labels[common.GetWorkloadIdentifier()] - key := getCacheKey(env, identity) - g.identityCache[key] = gtp - } else if g.dependencyRolloutCache[gtp.Name] != nil { - log.Infof("Adding gtp with name %v to GTP cache. LabelMatch=%v env=%v", gtp.Name, gtp.Labels[common.GetGlobalTrafficDeploymentLabel()], gtpEnv) - //The old GTP matched a rollout, the new one doesn't. So we need to clear that cache. - oldRollout := g.dependencyRolloutCache[gtp.Name] - env := common.GetEnvForRollout(oldRollout) - identity := oldRollout.Labels[common.GetWorkloadIdentifier()] - key := getCacheKey(env, identity) - delete(g.identityCache, key) - } - g.dependencyRolloutCache[gtp.Name] = rollout + log.Infof("Adding GTP with name %v to GTP cache. LabelMatch=%v env=%v", gtp.Name, gtpIdentity, gtpEnv) + identity := gtp.Labels[common.GetGlobalTrafficDeploymentLabel()] + key := common.ConstructGtpKey(gtpEnv, identity) + g.identityCache[key] = gtp return nil } -func (g *globalTrafficCache) Delete(gtp *v1.GlobalTrafficPolicy) { - if gtp.Name == "" { - //no GTP, nothing to delete - return - } - defer g.mutex.Unlock() - g.mutex.Lock() - log.Infof("Deleting gtp with name %v to GTP cache. LabelMatch=%v env=%v", gtp.Name, gtp.Labels[common.GetGlobalTrafficDeploymentLabel()], common.GetGtpEnv(gtp)) - - deployment := g.dependencyCache[gtp.Name] - - if deployment != nil && deployment.Labels != nil { - - //we have a valid deployment - env := common.GetEnv(deployment) - identity := deployment.Labels[common.GetWorkloadIdentifier()] - key := getCacheKey(env, identity) - delete(g.identityCache, key) - } - rollout := g.dependencyRolloutCache[gtp.Name] - - if rollout != nil && rollout.Labels != nil { - - //we have a valid rollout - env := common.GetEnvForRollout(rollout) - identity := rollout.Labels[common.GetWorkloadIdentifier()] - key := getCacheKey(env, identity) +func (g *globalTrafficCache) Delete(identity string, environment string) { + key := common.ConstructGtpKey(environment, identity) + if _, ok := g.identityCache[key]; ok { + log.Infof("Deleting gtp with key=%s from global GTP cache", key) delete(g.identityCache, key) } - - delete(g.dependencyCache, gtp.Name) - delete(g.dependencyRolloutCache, gtp.Name) } type DeploymentHandler struct { @@ -235,11 +151,6 @@ type DeploymentHandler struct { ClusterID string } -type PodHandler struct { - RemoteRegistry *RemoteRegistry - ClusterID string -} - type NodeHandler struct { RemoteRegistry *RemoteRegistry ClusterID string @@ -286,105 +197,15 @@ func (dh *DependencyHandler) Deleted(obj *v1.Dependency) { } func (gtp *GlobalTrafficHandler) Added(obj *v1.GlobalTrafficPolicy) { - log.Infof(LogFormat, "Added", "trafficpolicy", obj.Name, gtp.ClusterID, "received") - - var matchedDeployments []k8sAppsV1.Deployment - var matchedRollouts []argo.Rollout - - //IMPORTANT: The deployment/Rollout matched with a GTP will not necessarily be from the same cluster. This is because the same service could be deployed in multiple clusters and we need to guarantee consistent behavior - for _, remoteCluster := range gtp.RemoteRegistry.RemoteControllers { - matchedDeployments = append(matchedDeployments, remoteCluster.DeploymentController.GetDeploymentByLabel(obj.Labels[common.GetGlobalTrafficDeploymentLabel()], obj.Namespace)...) - if gtp.RemoteRegistry.AdmiralCache.argoRolloutsEnabled { - matchedRollouts = append(matchedRollouts, remoteCluster.RolloutController.GetRolloutByLabel(obj.Labels[common.GetGlobalTrafficDeploymentLabel()], obj.Namespace)...) - } - } - - deployments := common.MatchDeploymentsToGTP(obj, matchedDeployments) - rollouts := common.MatchRolloutsToGTP(obj, matchedRollouts) - - if len(deployments) != 0 { - for _, deployment := range deployments { - err := gtp.RemoteRegistry.AdmiralCache.GlobalTrafficCache.Put(obj, &deployment) - if err != nil { - log.Errorf("Failed to add nw GTP to cache. Error=%v", err) - log.Infof(LogFormat, "Added", "trafficpolicy", obj.Name, gtp.ClusterID, "Failed") - } - } - } - - if len(rollouts) != 0 { - for _, rollout := range rollouts { - err := gtp.RemoteRegistry.AdmiralCache.GlobalTrafficCache.PutRollout(obj, &rollout) - if err != nil { - log.Errorf("Failed to add new GTP to cache. Error=%v", err) - log.Errorf(LogErrFormat, "Added", "trafficpolicy", obj.Name, gtp.ClusterID, "Failed") - } - } - } - - if len(deployments) == 0 && len(rollouts) == 0 { - log.Infof(LogErrFormat, "Added", "trafficpolicy", obj.Name, gtp.ClusterID, "Skipping, no matched deployments/rollouts") - } - + log.Infof(LogFormat, "Added", "globaltrafficpolicy", obj.Name, gtp.ClusterID, "received") } func (gtp *GlobalTrafficHandler) Updated(obj *v1.GlobalTrafficPolicy) { - log.Infof(LogFormat, "Updated", "trafficpolicy", obj.Name, gtp.ClusterID, "received") - - var matchedDeployments []k8sAppsV1.Deployment - var matchedRollouts []argo.Rollout - - //IMPORTANT: The deployment/Rollout matched with a GTP will not necessarily be from the same cluster. This is because the same service could be deployed in multiple clusters and we need to guarantee consistent behavior - for _, remoteCluster := range gtp.RemoteRegistry.RemoteControllers { - matchedDeployments = append(matchedDeployments, remoteCluster.DeploymentController.GetDeploymentByLabel(obj.Labels[common.GetGlobalTrafficDeploymentLabel()], obj.Namespace)...) - if gtp.RemoteRegistry.AdmiralCache.argoRolloutsEnabled { - matchedRollouts = append(matchedRollouts, remoteCluster.RolloutController.GetRolloutByLabel(obj.Labels[common.GetGlobalTrafficDeploymentLabel()], obj.Namespace)...) - } - } - - deployments := common.MatchDeploymentsToGTP(obj, matchedDeployments) - rollouts := common.MatchRolloutsToGTP(obj, matchedRollouts) - - if len(deployments) != 0 { - for _, deployment := range deployments { - err := gtp.RemoteRegistry.AdmiralCache.GlobalTrafficCache.Put(obj, &deployment) - if err != nil { - log.Errorf("Failed to add updated GTP to cache. Error=%v", err) - log.Infof(LogFormat, "Updated", "trafficpolicy", obj.Name, gtp.ClusterID, "Failed") - } - } - } else { - err := gtp.RemoteRegistry.AdmiralCache.GlobalTrafficCache.Put(obj, nil) - if err != nil { - log.Errorf("Failed to add updated GTP to cache. Error=%v", err) - log.Infof(LogFormat, "Updated", "trafficpolicy", obj.Name, gtp.ClusterID, "Failed") - } else { - log.Infof(LogErrFormat, "Updated", "trafficpolicy", obj.Name, gtp.ClusterID, "Skipping, no matched deployments") - } - } - - if len(rollouts) != 0 { - for _, rollout := range rollouts { - err := gtp.RemoteRegistry.AdmiralCache.GlobalTrafficCache.PutRollout(obj, &rollout) - if err != nil { - log.Errorf("Failed to add updated GTP to cache. Error=%v", err) - log.Infof(LogFormat, "Updated", "trafficpolicy", obj.Name, gtp.ClusterID, "Failed") - } - } - } else { - err := gtp.RemoteRegistry.AdmiralCache.GlobalTrafficCache.PutRollout(obj, nil) - if err != nil { - log.Errorf("Failed to add updated GTP to cache. Error=%v", err) - log.Infof(LogFormat, "Updated", "trafficpolicy", obj.Name, gtp.ClusterID, "Failed") - } else { - log.Infof(LogErrFormat, "Updated", "trafficpolicy", obj.Name, gtp.ClusterID, "Skipping, no matched rollouts") - } - } + log.Infof(LogFormat, "Updated", "globaltrafficpolicy", obj.Name, gtp.ClusterID, "received") } func (gtp *GlobalTrafficHandler) Deleted(obj *v1.GlobalTrafficPolicy) { - log.Infof(LogFormat, "Deleted", "trafficpolicy", obj.Name, gtp.ClusterID, "received") - gtp.RemoteRegistry.AdmiralCache.GlobalTrafficCache.Delete(obj) + log.Infof(LogFormat, "Deleted", "globaltrafficpolicy", obj.Name, gtp.ClusterID, "received") } func (pc *DeploymentHandler) Added(obj *k8sAppsV1.Deployment) { @@ -395,28 +216,6 @@ func (pc *DeploymentHandler) Deleted(obj *k8sAppsV1.Deployment) { HandleEventForDeployment(admiral.Delete, obj, pc.RemoteRegistry, pc.ClusterID) } -func (pc *PodHandler) Added(obj *k8sV1.Pod) { - log.Infof(LogFormat, "Event", "deployment", obj.Name, pc.ClusterID, "Received") - - globalIdentifier := common.GetPodGlobalIdentifier(obj) - - if len(globalIdentifier) == 0 { - log.Infof(LogFormat, "Event", "deployment", obj.Name, "", "Skipped as '"+common.GetWorkloadIdentifier()+" was not found', namespace="+obj.Namespace) - return - } - - //TODO Skip pod events until GTP is implemented - //modifyServiceEntryForNewServiceOrPod(obj.Namespace, globalIdentifier, pc.RemoteRegistry) -} - -func (pc *PodHandler) Deleted(obj *k8sV1.Pod) { - //TODO update subset service entries -} - -func getCacheKey(environment string, identity string) string { - return fmt.Sprintf("%s.%s", environment, identity) -} - func (rh *RolloutHandler) Added(obj *argo.Rollout) { HandleEventForRollout(admiral.Add, obj, rh.RemoteRegistry, rh.ClusterID) } @@ -440,27 +239,6 @@ func HandleEventForRollout(event admiral.EventType, obj *argo.Rollout, remoteReg return } - var matchedGTPs []v1.GlobalTrafficPolicy - for _, remoteCluster := range remoteRegistry.RemoteControllers { - matchedGTPs = append(matchedGTPs, remoteCluster.GlobalTraffic.GetGTPByLabel(obj.Labels[common.GetGlobalTrafficDeploymentLabel()], obj.Namespace)...) - } - - gtp := common.MatchGTPsToRollout(matchedGTPs, obj) - - if gtp != nil { - if event == admiral.Add { - err := remoteRegistry.AdmiralCache.GlobalTrafficCache.PutRollout(gtp, obj) - if err != nil { - log.Errorf("Failed to add Rollout to GTP cache. Error=%v", err) - } else { - log.Infof(LogFormat, "Event", "rollout", obj.Name, clusterName, "Matched to GTP name="+gtp.Name) - } - } else if event == admiral.Delete { - remoteRegistry.AdmiralCache.GlobalTrafficCache.Delete(gtp) - log.Infof(LogFormat, event, "rollout", obj.Name, clusterName, "Matched to GTP name="+gtp.Name) - } - } - env := common.GetEnvForRollout(obj) // Use the same function as added deployment function to update and put new service entry in place to replace old one @@ -478,27 +256,6 @@ func HandleEventForDeployment(event admiral.EventType, obj *k8sAppsV1.Deployment return } - var matchedGTPs []v1.GlobalTrafficPolicy - for _, remoteCluster := range remoteRegistry.RemoteControllers { - matchedGTPs = append(matchedGTPs, remoteCluster.GlobalTraffic.GetGTPByLabel(obj.Labels[common.GetGlobalTrafficDeploymentLabel()], obj.Namespace)...) - } - - gtp := common.MatchGTPsToDeployment(matchedGTPs, obj) - - if gtp != nil { - if event == admiral.Add { - err := remoteRegistry.AdmiralCache.GlobalTrafficCache.Put(gtp, obj) - if err != nil { - log.Errorf("Failed to add Deployment to GTP cache. Error=%v", err) - } else { - log.Infof(LogFormat, "Event", "deployment", obj.Name, clusterName, "Matched to GTP name="+gtp.Name) - } - } else if event == admiral.Delete { - remoteRegistry.AdmiralCache.GlobalTrafficCache.Delete(gtp) - log.Infof(LogFormat, event, "deployment", obj.Name, clusterName, "Matched to GTP name="+gtp.Name) - } - } - env := common.GetEnv(obj) // Use the same function as added deployment function to update and put new service entry in place to replace old one diff --git a/admiral/pkg/clusters/types_test.go b/admiral/pkg/clusters/types_test.go index f2fb24dc..3110ff8e 100644 --- a/admiral/pkg/clusters/types_test.go +++ b/admiral/pkg/clusters/types_test.go @@ -2,14 +2,11 @@ package clusters import ( "context" - "log" "sync" "testing" "time" argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" - argofake "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" admiralFake "github.com/istio-ecosystem/admiral/admiral/pkg/client/clientset/versioned/fake" @@ -18,7 +15,6 @@ import ( v12 "k8s.io/api/apps/v1" v13 "k8s.io/api/core/v1" time2 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" ) var ignoreUnexported = cmpopts.IgnoreUnexported(v1.GlobalTrafficPolicy{}.Status) @@ -43,352 +39,6 @@ func init() { common.InitializeConfig(p) } -func TestGlobalTrafficHandler(t *testing.T) { - //lots of setup - handler := GlobalTrafficHandler{} - registry := &RemoteRegistry{} - admiralCacle := &AdmiralCache{} - - gtpCache := &globalTrafficCache{} - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) - gtpCache.mutex = &sync.Mutex{} - - fakeClient := fake.NewSimpleClientset() - - deployment := v12.Deployment{} - deployment.Namespace = "namespace" - deployment.Name = "fake-app-deployment-qal" - deployment.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "qal"}, - }, - }, - } - deployment.Labels = map[string]string{"identity": "app1"} - - deployment2 := v12.Deployment{} - deployment2.Namespace = "namespace" - deployment2.Name = "fake-app-deployment-e2e" - deployment2.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "e2e"}, - }, - }, - } - deployment2.Labels = map[string]string{"identity": "app1"} - - deployment3 := v12.Deployment{} - deployment3.Namespace = "namespace" - deployment3.Name = "fake-app-deployment-prf-1" - deployment3.CreationTimestamp = time2.Now() - deployment3.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "prf"}, - }, - }, - } - deployment3.Labels = map[string]string{"identity": "app1"} - - deployment4 := v12.Deployment{} - deployment4.Namespace = "namespace" - deployment4.Name = "fake-app-deployment-prf-2" - deployment4.CreationTimestamp = time2.Date(2020, 1, 1, 1, 1, 1, 1, time.UTC) - deployment4.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "prf"}, - }, - }, - } - deployment4.Labels = map[string]string{"identity": "app1"} - - _, err := fakeClient.AppsV1().Deployments("namespace").Create(&deployment) - _, err = fakeClient.AppsV1().Deployments("namespace").Create(&deployment2) - _, err = fakeClient.AppsV1().Deployments("namespace").Create(&deployment3) - _, err = fakeClient.AppsV1().Deployments("namespace").Create(&deployment4) - if err != nil { - log.Fatalf("Failed to set up mock k8s client. Failing test. Error=%v", err) - } - - deploymentController := &admiral.DeploymentController{K8sClient: fakeClient} - remoteController := &RemoteController{} - remoteController.DeploymentController = deploymentController - - noRolloutsClient := argofake.NewSimpleClientset().ArgoprojV1alpha1() - rolloutController := &admiral.RolloutController{K8sClient: fakeClient, RolloutClient: noRolloutsClient} - remoteController.RolloutController = rolloutController - - registry.RemoteControllers = map[string]*RemoteController{"cluster-1": remoteController} - - admiralCacle.GlobalTrafficCache = gtpCache - registry.AdmiralCache = admiralCacle - handler.RemoteRegistry = registry - - e2eGtp := v1.GlobalTrafficPolicy{} - e2eGtp.Labels = map[string]string{"identity": "app1", "env": "e2e"} - e2eGtp.Namespace = "namespace" - e2eGtp.Name = "myGTP" - - noMatchGtp := v1.GlobalTrafficPolicy{} - noMatchGtp.Labels = map[string]string{"identity": "app2", "env": "e2e"} - noMatchGtp.Namespace = "namespace" - noMatchGtp.Name = "myGTP" - - prfGtp := v1.GlobalTrafficPolicy{} - prfGtp.Labels = map[string]string{"identity": "app1", "env": "prf"} - prfGtp.Namespace = "namespace" - prfGtp.Name = "myGTP" - - testCases := []struct { - name string - gtp *v1.GlobalTrafficPolicy - expectedIdentity string - expectedEnv string - expectedIdentityCacheValue *v1.GlobalTrafficPolicy - expectedDeploymentCacheValue *v12.Deployment - }{ - { - name: "Should return matching environment", - gtp: &e2eGtp, - expectedIdentity: "app1", - expectedEnv: "e2e", - expectedDeploymentCacheValue: &deployment2, - expectedIdentityCacheValue: &e2eGtp, - }, - { - name: "Should return nothing when identity labels don't match", - gtp: &noMatchGtp, - expectedIdentity: "app1", - expectedEnv: "e2e", - expectedDeploymentCacheValue: nil, - expectedIdentityCacheValue: nil, - }, - { - name: "Should return oldest deployment when multiple match", - gtp: &prfGtp, - expectedIdentity: "app1", - expectedEnv: "prf", - expectedDeploymentCacheValue: &deployment4, - expectedIdentityCacheValue: &prfGtp, - }, - } - - //Run the test for every provided case - for _, c := range testCases { - t.Run(c.name, func(t *testing.T) { - //clearing admiralCache - gtpCache = &globalTrafficCache{} - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) - gtpCache.mutex = &sync.Mutex{} - handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache = gtpCache - - //run the method under test then make assertions - handler.Added(c.gtp) - if !cmp.Equal(handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv), c.expectedIdentityCacheValue, ignoreUnexported) { - t.Fatalf("GTP Mismatch. Diff: %v", cmp.Diff(c.expectedIdentityCacheValue, handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv), ignoreUnexported)) - } - if !cmp.Equal(handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetDeployment(c.gtp.Name), c.expectedDeploymentCacheValue, ignoreUnexported) { - t.Fatalf("Deployment Mismatch. Diff: %v", cmp.Diff(c.expectedDeploymentCacheValue, handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetDeployment(c.gtp.Name), ignoreUnexported)) - } - - handler.Deleted(c.gtp) - - if handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv) != nil { - t.Fatalf("Delete failed for Identity Cache") - } - if handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetDeployment(c.gtp.Name) != nil { - t.Fatalf("Delete failed for Dependency Cache") - } - - }) - } -} - -func TestGlobalTrafficHandler_Updated(t *testing.T) { - //lots of setup - handler := GlobalTrafficHandler{} - registry := &RemoteRegistry{} - admiralCacle := &AdmiralCache{} - - gtpCache := &globalTrafficCache{} - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) - gtpCache.mutex = &sync.Mutex{} - - fakeClient := fake.NewSimpleClientset() - - deployment := v12.Deployment{} - deployment.Namespace = "namespace" - deployment.Name = "fake-app-deployment-qal" - deployment.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "qal"}, - }, - }, - } - deployment.Labels = map[string]string{"identity": "app1"} - - deployment2 := v12.Deployment{} - deployment2.Namespace = "namespace" - deployment2.Name = "fake-app-deployment-e2e" - deployment2.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "e2e"}, - }, - }, - } - deployment2.Labels = map[string]string{"identity": "app1"} - - deployment3 := v12.Deployment{} - deployment3.Namespace = "namespace" - deployment3.Name = "fake-app-deployment-prf-1" - deployment3.CreationTimestamp = time2.Now() - deployment3.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "prf"}, - }, - }, - } - deployment3.Labels = map[string]string{"identity": "app1"} - - deployment4 := v12.Deployment{} - deployment4.Namespace = "namespace" - deployment4.Name = "fake-app-deployment-prf-2" - deployment4.CreationTimestamp = time2.Date(2020, 1, 1, 1, 1, 1, 1, time.UTC) - deployment4.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "prf"}, - }, - }, - } - deployment4.Labels = map[string]string{"identity": "app1"} - - _, err := fakeClient.AppsV1().Deployments("namespace").Create(&deployment) - _, err = fakeClient.AppsV1().Deployments("namespace").Create(&deployment2) - _, err = fakeClient.AppsV1().Deployments("namespace").Create(&deployment3) - _, err = fakeClient.AppsV1().Deployments("namespace").Create(&deployment4) - if err != nil { - log.Fatalf("Failed to set up mock k8s client. Failing test. Error=%v", err) - } - - deploymentController := &admiral.DeploymentController{K8sClient: fakeClient} - remoteController := &RemoteController{} - remoteController.DeploymentController = deploymentController - - noRolloutsClient := argofake.NewSimpleClientset().ArgoprojV1alpha1() - rolloutController := &admiral.RolloutController{K8sClient: fakeClient, RolloutClient: noRolloutsClient} - remoteController.RolloutController = rolloutController - - registry.RemoteControllers = map[string]*RemoteController{"cluster-1": remoteController} - - admiralCacle.GlobalTrafficCache = gtpCache - registry.AdmiralCache = admiralCacle - handler.RemoteRegistry = registry - - e2eGtp := v1.GlobalTrafficPolicy{} - e2eGtp.Labels = map[string]string{"identity": "app1", "env": "e2e"} - e2eGtp.Namespace = "namespace" - e2eGtp.Name = "myGTP" - - e2eGtpExtraLabel := v1.GlobalTrafficPolicy{} - e2eGtpExtraLabel.Labels = map[string]string{"identity": "app1", "env": "e2e", "random": "foobar"} - e2eGtpExtraLabel.Namespace = "namespace" - e2eGtpExtraLabel.Name = "myGTP" - - noMatchGtp := v1.GlobalTrafficPolicy{} - noMatchGtp.Labels = map[string]string{"identity": "app2", "env": "e2e"} - noMatchGtp.Namespace = "namespace" - noMatchGtp.Name = "myGTP" - - prfGtp := v1.GlobalTrafficPolicy{} - prfGtp.Labels = map[string]string{"identity": "app1", "env": "prf"} - prfGtp.Namespace = "namespace" - prfGtp.Name = "myGTP" - - testCases := []struct { - name string - gtp *v1.GlobalTrafficPolicy - updatedGTP *v1.GlobalTrafficPolicy - expectedIdentity string - expectedEnv string - expectedIdentityCacheValue *v1.GlobalTrafficPolicy - expectedDeploymentCacheValue *v12.Deployment - }{ - { - name: "Should return matching environment", - gtp: &e2eGtp, - updatedGTP: &e2eGtpExtraLabel, - expectedIdentity: "app1", - expectedEnv: "e2e", - expectedDeploymentCacheValue: &deployment2, - expectedIdentityCacheValue: &e2eGtpExtraLabel, - }, - { - name: "Should return nothing when identity labels don't match after update", - gtp: &e2eGtp, - updatedGTP: &noMatchGtp, - expectedIdentity: "app1", - expectedEnv: "e2e", - expectedDeploymentCacheValue: nil, - expectedIdentityCacheValue: nil, - }, - { - name: "Should return oldest deployment when multiple match", - gtp: &e2eGtp, - updatedGTP: &prfGtp, - expectedIdentity: "app1", - expectedEnv: "prf", - expectedDeploymentCacheValue: &deployment4, - expectedIdentityCacheValue: &prfGtp, - }, - } - - //Run the test for every provided case - for _, c := range testCases { - t.Run(c.name, func(t *testing.T) { - //clearing admiralCache - gtpCache = &globalTrafficCache{} - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) - gtpCache.mutex = &sync.Mutex{} - handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache = gtpCache - - //run the method under test then make assertions - handler.Added(c.gtp) - handler.Updated(c.updatedGTP) - if !cmp.Equal(handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv), c.expectedIdentityCacheValue, ignoreUnexported) { - t.Fatalf("GTP Mismatch. Diff: %v", cmp.Diff(c.expectedIdentityCacheValue, handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv), ignoreUnexported)) - } - if !cmp.Equal(handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetDeployment(c.gtp.Name), c.expectedDeploymentCacheValue, ignoreUnexported) { - t.Fatalf("Deployment Mismatch. Diff: %v", cmp.Diff(c.expectedDeploymentCacheValue, handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetDeployment(c.gtp.Name), ignoreUnexported)) - } - - handler.Deleted(c.updatedGTP) - - if handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv) != nil { - t.Fatalf("Delete failed for Identity Cache") - } - if handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetDeployment(c.gtp.Name) != nil { - t.Fatalf("Delete failed for Dependency Cache") - } - - }) - } -} - func TestDeploymentHandler(t *testing.T) { p := common.AdmiralParams{ @@ -401,7 +51,6 @@ func TestDeploymentHandler(t *testing.T) { gtpCache := &globalTrafficCache{} gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) gtpCache.mutex = &sync.Mutex{} fakeCrdClient := admiralFake.NewSimpleClientset() @@ -461,7 +110,6 @@ func TestDeploymentHandler(t *testing.T) { t.Run(c.name, func(t *testing.T) { gtpCache = &globalTrafficCache{} gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) gtpCache.mutex = &sync.Mutex{} handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache = gtpCache @@ -471,114 +119,6 @@ func TestDeploymentHandler(t *testing.T) { } } -func TestGlobalTrafficCache(t *testing.T) { - deployment := v12.Deployment{} - deployment.Namespace = "namespace" - deployment.Name = "fake-app-deployment-qal" - deployment.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "e2e"}, - }, - }, - } - deployment.Labels = map[string]string{"identity": "app1"} - - deploymentNoEnv := v12.Deployment{} - deploymentNoEnv.Namespace = "namespace" - deploymentNoEnv.Name = "fake-app-deployment-qal" - deploymentNoEnv.Spec = v12.DeploymentSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1"}, - }, - }, - } - deploymentNoEnv.Labels = map[string]string{"identity": "app1"} - - e2eGtp := v1.GlobalTrafficPolicy{} - e2eGtp.Labels = map[string]string{"identity": "app1", "env": "e2e"} - e2eGtp.Namespace = "namespace" - e2eGtp.Name = "myGTP" - - gtpCache := &globalTrafficCache{} - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.mutex = &sync.Mutex{} - - //Struct of test case info. Name is required. - testCases := []struct { - name string - gtp *v1.GlobalTrafficPolicy - deployment *v12.Deployment - identity string - environment string - gtpName string - }{ - { - name: "Base case", - gtp: &e2eGtp, - deployment: &deployment, - identity: "app1", - environment: "e2e", - gtpName: "myGTP", - }, - { - name: "No Deployment", - gtp: &e2eGtp, - deployment: nil, - identity: "app1", - environment: "e2e", - gtpName: "myGTP", - }, - { - name: "Handles lack of environment label properly", - gtp: &e2eGtp, - deployment: &deploymentNoEnv, - identity: "app1", - environment: "default", - gtpName: "myGTP", - }, - } - - //Run the test for every provided case - for _, c := range testCases { - t.Run(c.name, func(t *testing.T) { - - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.mutex = &sync.Mutex{} - err := gtpCache.Put(c.gtp, c.deployment) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - foundGtp := gtpCache.GetFromIdentity(c.identity, c.environment) - if c.deployment != nil && !cmp.Equal(foundGtp, c.gtp, ignoreUnexported) { - t.Fatalf("GTP Mismatch: %v", cmp.Diff(foundGtp, c.gtp, ignoreUnexported)) - } - - //no deployment means there should be nothing in the identity cache - if c.deployment == nil && foundGtp != nil { - t.Fatalf("GTP Mismatch: %v", cmp.Diff(foundGtp, nil, ignoreUnexported)) - } - - foundDeployment := gtpCache.GetDeployment(c.gtpName) - if !cmp.Equal(foundDeployment, c.deployment, ignoreUnexported) { - t.Fatalf("Deployment Mismatch: %v", cmp.Diff(foundDeployment, c.deployment, ignoreUnexported)) - } - - gtpCache.Delete(c.gtp) - if len(gtpCache.dependencyCache) != 0 { - t.Fatalf("Dependency cache not cleared properly on delete") - } - if len(gtpCache.identityCache) != 0 { - t.Fatalf("Identity cache not cleared properly on delete") - } - }) - } - -} - func TestRolloutHandler(t *testing.T) { p := common.AdmiralParams{ @@ -591,7 +131,6 @@ func TestRolloutHandler(t *testing.T) { gtpCache := &globalTrafficCache{} gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) gtpCache.mutex = &sync.Mutex{} fakeCrdClient := admiralFake.NewSimpleClientset() @@ -656,7 +195,6 @@ func TestRolloutHandler(t *testing.T) { t.Run(c.name, func(t *testing.T) { gtpCache = &globalTrafficCache{} gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) gtpCache.mutex = &sync.Mutex{} handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache = gtpCache @@ -666,292 +204,3 @@ func TestRolloutHandler(t *testing.T) { }) } } - -func TestGlobalTrafficCacheForRollout(t *testing.T) { - rollout := argo.Rollout{} - rollout.Namespace = "namespace" - rollout.Name = "fake-app-rollout-qal" - rollout.Spec = argo.RolloutSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "e2e"}, - }, - }, - } - rollout.Labels = map[string]string{"identity": "app1"} - - rolloutNoEnv := argo.Rollout{} - rolloutNoEnv.Namespace = "namespace" - rolloutNoEnv.Name = "fake-app-rollout-qal" - rolloutNoEnv.Spec = argo.RolloutSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1"}, - }, - }, - } - rolloutNoEnv.Labels = map[string]string{"identity": "app1"} - - e2eGtp := v1.GlobalTrafficPolicy{} - e2eGtp.Labels = map[string]string{"identity": "app1", "env": "e2e"} - e2eGtp.Namespace = "namespace" - e2eGtp.Name = "myGTP" - - gtpCache := &globalTrafficCache{} - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) - gtpCache.mutex = &sync.Mutex{} - - //Struct of test case info. Name is required. - testCases := []struct { - name string - gtp *v1.GlobalTrafficPolicy - rollout *argo.Rollout - identity string - environment string - gtpName string - }{ - { - name: "Base case", - gtp: &e2eGtp, - rollout: &rollout, - identity: "app1", - environment: "e2e", - gtpName: "myGTP", - }, - { - name: "No rollout", - gtp: &e2eGtp, - rollout: nil, - identity: "app1", - environment: "e2e", - gtpName: "myGTP", - }, - { - name: "Handles lack of environment label properly", - gtp: &e2eGtp, - rollout: &rolloutNoEnv, - identity: "app1", - environment: "default", - gtpName: "myGTP", - }, - } - - //Run the test for every provided case - for _, c := range testCases { - t.Run(c.name, func(t *testing.T) { - - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) - gtpCache.mutex = &sync.Mutex{} - err := gtpCache.PutRollout(c.gtp, c.rollout) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - foundGtp := gtpCache.GetFromIdentity(c.identity, c.environment) - if c.rollout != nil && !cmp.Equal(foundGtp, c.gtp, ignoreUnexported) { - t.Fatalf("GTP Mismatch: %v", cmp.Diff(foundGtp, c.gtp, ignoreUnexported)) - } - - //no rollout means there should be nothing in the identity cache - if c.rollout == nil && foundGtp != nil { - t.Fatalf("GTP Mismatch: %v", cmp.Diff(foundGtp, nil, ignoreUnexported)) - } - - foundRollout := gtpCache.GetRollout(c.gtpName) - if !cmp.Equal(foundRollout, c.rollout, ignoreUnexported) { - t.Fatalf("Rollout Mismatch: %v", cmp.Diff(foundRollout, c.rollout, ignoreUnexported)) - } - - gtpCache.Delete(c.gtp) - if len(gtpCache.dependencyCache) != 0 { - t.Fatalf("Dependency cache not cleared properly on delete") - } - if len(gtpCache.identityCache) != 0 { - t.Fatalf("Identity cache not cleared properly on delete") - } - }) - } - -} - -func TestGlobalTrafficHandler_Updated_ForRollouts(t *testing.T) { - //lots of setup - handler := GlobalTrafficHandler{} - registry := &RemoteRegistry{} - admiralCacle := &AdmiralCache{} - - gtpCache := &globalTrafficCache{} - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.mutex = &sync.Mutex{} - - fakeClient := fake.NewSimpleClientset() - - rollout := argo.Rollout{} - rollout.Namespace = "namespace" - rollout.Name = "fake-app-rollout-qal" - rollout.Spec = argo.RolloutSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "qal"}, - }, - }, - } - rollout.Labels = map[string]string{"identity": "app1"} - - rollout2 := argo.Rollout{} - rollout2.Namespace = "namespace" - rollout2.Name = "fake-app-rollout-e2e" - rollout2.Spec = argo.RolloutSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "e2e"}, - }, - }, - } - rollout2.Labels = map[string]string{"identity": "app1"} - - rollout3 := argo.Rollout{} - rollout3.Namespace = "namespace" - rollout3.Name = "fake-app-rollout-prf-1" - rollout3.CreationTimestamp = time2.Now() - rollout3.Spec = argo.RolloutSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "prf"}, - }, - }, - } - rollout3.Labels = map[string]string{"identity": "app1"} - - rollout4 := argo.Rollout{} - rollout4.Namespace = "namespace" - rollout4.Name = "fake-app-rollout-prf-2" - rollout4.CreationTimestamp = time2.Date(2020, 1, 1, 1, 1, 1, 1, time.UTC) - rollout4.Spec = argo.RolloutSpec{ - Template: v13.PodTemplateSpec{ - ObjectMeta: time2.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "prf"}, - }, - }, - } - rollout4.Labels = map[string]string{"identity": "app1"} - - deploymentController := &admiral.DeploymentController{K8sClient: fakeClient} - remoteController := &RemoteController{} - remoteController.DeploymentController = deploymentController - - noRolloutsClient := argofake.NewSimpleClientset().ArgoprojV1alpha1() - noRolloutsClient.Rollouts("namespace").Create(&rollout) - - _, err := noRolloutsClient.Rollouts("namespace").Create(&rollout) - _, err = noRolloutsClient.Rollouts("namespace").Create(&rollout2) - _, err = noRolloutsClient.Rollouts("namespace").Create(&rollout3) - _, err = noRolloutsClient.Rollouts("namespace").Create(&rollout4) - if err != nil { - log.Fatalf("Failed to set up mock k8s client. Failing test. Error=%v", err) - } - - rolloutController := &admiral.RolloutController{K8sClient: fakeClient, RolloutClient: noRolloutsClient} - remoteController.RolloutController = rolloutController - - registry.RemoteControllers = map[string]*RemoteController{"cluster-1": remoteController} - - admiralCacle.GlobalTrafficCache = gtpCache - registry.AdmiralCache = admiralCacle - admiralCacle.argoRolloutsEnabled = true - handler.RemoteRegistry = registry - - e2eGtp := v1.GlobalTrafficPolicy{} - e2eGtp.Labels = map[string]string{"identity": "app1", "env": "e2e"} - e2eGtp.Namespace = "namespace" - e2eGtp.Name = "myGTP" - - e2eGtpExtraLabel := v1.GlobalTrafficPolicy{} - e2eGtpExtraLabel.Labels = map[string]string{"identity": "app1", "env": "e2e", "random": "foobar"} - e2eGtpExtraLabel.Namespace = "namespace" - e2eGtpExtraLabel.Name = "myGTP" - - noMatchGtp := v1.GlobalTrafficPolicy{} - noMatchGtp.Labels = map[string]string{"identity": "app2", "env": "e2e"} - noMatchGtp.Namespace = "namespace" - noMatchGtp.Name = "myGTP" - - prfGtp := v1.GlobalTrafficPolicy{} - prfGtp.Labels = map[string]string{"identity": "app1", "env": "prf"} - prfGtp.Namespace = "namespace" - prfGtp.Name = "myGTP" - - testCases := []struct { - name string - gtp *v1.GlobalTrafficPolicy - updatedGTP *v1.GlobalTrafficPolicy - expectedIdentity string - expectedEnv string - expectedIdentityCacheValue *v1.GlobalTrafficPolicy - expectedRolloutCacheValue *argo.Rollout - }{ - { - name: "Should return matching environment", - gtp: &e2eGtp, - updatedGTP: &e2eGtpExtraLabel, - expectedIdentity: "app1", - expectedEnv: "e2e", - expectedRolloutCacheValue: &rollout2, - expectedIdentityCacheValue: &e2eGtpExtraLabel, - }, - { - name: "Should return nothing when identity labels don't match after update", - gtp: &e2eGtp, - updatedGTP: &noMatchGtp, - expectedIdentity: "app1", - expectedEnv: "e2e", - expectedRolloutCacheValue: nil, - expectedIdentityCacheValue: nil, - }, - { - name: "Should return oldest rollout when multiple match", - gtp: &e2eGtp, - updatedGTP: &prfGtp, - expectedIdentity: "app1", - expectedEnv: "prf", - expectedRolloutCacheValue: &rollout4, - expectedIdentityCacheValue: &prfGtp, - }, - } - - //Run the test for every provided case - for _, c := range testCases { - t.Run(c.name, func(t *testing.T) { - //clearing admiralCache - gtpCache = &globalTrafficCache{} - gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy) - gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout) - gtpCache.dependencyCache = make(map[string]*v12.Deployment) - gtpCache.mutex = &sync.Mutex{} - handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache = gtpCache - - //run the method under test then make assertions - handler.Added(c.gtp) - handler.Updated(c.updatedGTP) - if !cmp.Equal(handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv), c.expectedIdentityCacheValue, ignoreUnexported) { - t.Fatalf("GTP Mismatch. Diff: %v", cmp.Diff(c.expectedIdentityCacheValue, handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv), ignoreUnexported)) - } - if !cmp.Equal(handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetRollout(c.gtp.Name), c.expectedRolloutCacheValue, ignoreUnexported) { - t.Fatalf("Rollout Mismatch. Diff: %v", cmp.Diff(c.expectedRolloutCacheValue, handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetRollout(c.gtp.Name), ignoreUnexported)) - } - - handler.Deleted(c.updatedGTP) - - if handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(c.expectedIdentity, c.expectedEnv) != nil { - t.Fatalf("Delete failed for Identity Cache") - } - if handler.RemoteRegistry.AdmiralCache.GlobalTrafficCache.GetRollout(c.gtp.Name) != nil { - t.Fatalf("Delete failed for Dependency Cache") - } - - }) - } -} diff --git a/admiral/pkg/controller/admiral/globaltraffic.go b/admiral/pkg/controller/admiral/globaltraffic.go index a7f3ccb5..61760266 100644 --- a/admiral/pkg/controller/admiral/globaltraffic.go +++ b/admiral/pkg/controller/admiral/globaltraffic.go @@ -8,6 +8,7 @@ import ( meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "sync" "time" clientset "github.com/istio-ecosystem/admiral/admiral/pkg/client/clientset/versioned" @@ -24,15 +25,85 @@ type GlobalTrafficHandler interface { type GlobalTrafficController struct { CrdClient clientset.Interface GlobalTrafficHandler GlobalTrafficHandler + Cache *gtpCache informer cache.SharedIndexInformer } +type gtpCache struct { + //map of gtps key=identity+env value is a map of gtps namespace -> map name -> gtp + cache map[string]map[string]map[string]*v1.GlobalTrafficPolicy + mutex *sync.Mutex +} + +func (p *gtpCache) Put(obj *v1.GlobalTrafficPolicy) { + defer p.mutex.Unlock() + p.mutex.Lock() + key := common.GetGtpKey(obj) + namespacesWithGtps := p.cache[key] + if namespacesWithGtps == nil { + namespacesWithGtps = make(map[string]map[string]*v1.GlobalTrafficPolicy) + } + namespaceGtps := namespacesWithGtps[obj.Namespace] + if namespaceGtps == nil { + namespaceGtps = make(map[string]*v1.GlobalTrafficPolicy) + } + if common.ShouldIgnoreResource(obj.ObjectMeta){ + delete(namespaceGtps, obj.Name) + } else { + namespaceGtps[obj.Name] = obj + } + + namespacesWithGtps[obj.Namespace] = namespaceGtps + p.cache[key] = namespacesWithGtps +} + +func (p *gtpCache) Delete(obj *v1.GlobalTrafficPolicy) { + defer p.mutex.Unlock() + p.mutex.Lock() + key := common.GetGtpKey(obj) + namespacesWithGtps := p.cache[key] + if namespacesWithGtps == nil { + return + } + namespaceGtps := namespacesWithGtps[obj.Namespace] + if namespaceGtps == nil { + return + } + delete(namespaceGtps, obj.Name) + namespacesWithGtps[obj.Namespace] = namespaceGtps + p.cache[key] = namespacesWithGtps +} + +//fetch gtps for a key from namespace +func (p *gtpCache) Get(key, namespace string) []*v1.GlobalTrafficPolicy { + defer p.mutex.Unlock() + p.mutex.Lock() + namespacesWithGtp := p.cache[key] + matchedGtps := make([]*v1.GlobalTrafficPolicy, 0) + for ns, gtps := range namespacesWithGtp { + if namespace == ns { + for _, gtp := range gtps { + logrus.Debugf("GTP match for identity=%s, from namespace=%v", key, ns) + //make a copy for safer iterations elsewhere + matchedGtps = append(matchedGtps, gtp.DeepCopy()) + } + } + } + return matchedGtps +} + func NewGlobalTrafficController(stopCh <-chan struct{}, handler GlobalTrafficHandler, configPath *rest.Config, resyncPeriod time.Duration) (*GlobalTrafficController, error) { globalTrafficController := GlobalTrafficController{} globalTrafficController.GlobalTrafficHandler = handler + gtpCache := gtpCache{} + gtpCache.cache = make(map[string]map[string]map[string]*v1.GlobalTrafficPolicy) + gtpCache.mutex = &sync.Mutex{} + + globalTrafficController.Cache = >pCache + var err error globalTrafficController.CrdClient, err = AdmiralCrdClientFromConfig(configPath) @@ -53,34 +124,19 @@ func NewGlobalTrafficController(stopCh <-chan struct{}, handler GlobalTrafficHan } func (d *GlobalTrafficController) Added(ojb interface{}) { - dep := ojb.(*v1.GlobalTrafficPolicy) - d.GlobalTrafficHandler.Added(dep) + gtp := ojb.(*v1.GlobalTrafficPolicy) + d.Cache.Put(gtp) + d.GlobalTrafficHandler.Added(gtp) } func (d *GlobalTrafficController) Updated(ojb interface{}, oldObj interface{}) { - dep := ojb.(*v1.GlobalTrafficPolicy) - d.GlobalTrafficHandler.Updated(dep) + gtp := ojb.(*v1.GlobalTrafficPolicy) + d.Cache.Put(gtp) + d.GlobalTrafficHandler.Updated(gtp) } func (d *GlobalTrafficController) Deleted(ojb interface{}) { - dep := ojb.(*v1.GlobalTrafficPolicy) - d.GlobalTrafficHandler.Deleted(dep) -} - -func (g *GlobalTrafficController) GetGTPByLabel(labelValue string, namespace string) []v1.GlobalTrafficPolicy { - matchLabel := common.GetGlobalTrafficDeploymentLabel() - labelOptions := meta_v1.ListOptions{} - labelOptions.LabelSelector = fmt.Sprintf("%s=%s", matchLabel, labelValue) - matchedDeployments, err := g.CrdClient.AdmiralV1().GlobalTrafficPolicies(namespace).List(labelOptions) - - if err != nil { - logrus.Errorf("Failed to list GTPs in cluster, error: %v", err) - return nil - } - - if matchedDeployments.Items == nil { - return []v1.GlobalTrafficPolicy{} - } - - return matchedDeployments.Items -} + gtp := ojb.(*v1.GlobalTrafficPolicy) + d.Cache.Delete(gtp) + d.GlobalTrafficHandler.Deleted(gtp) +} \ No newline at end of file diff --git a/admiral/pkg/controller/admiral/globaltraffic_test.go b/admiral/pkg/controller/admiral/globaltraffic_test.go index 89164eb0..5799ad82 100644 --- a/admiral/pkg/controller/admiral/globaltraffic_test.go +++ b/admiral/pkg/controller/admiral/globaltraffic_test.go @@ -4,9 +4,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model" v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" + "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" "github.com/istio-ecosystem/admiral/admiral/pkg/test" v12 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/clientcmd" + "reflect" + "sync" "testing" "time" ) @@ -74,28 +77,173 @@ func TestGlobalTrafficAddUpdateDelete(t *testing.T) { } -func TestGlobalTrafficGetByLabel(t *testing.T) { - config, err := clientcmd.BuildConfigFromFlags("", "../../test/resources/admins@fake-cluster.k8s.local") - if err != nil { - t.Errorf("%v", err) - } - stop := make(chan struct{}) - handler := test.MockGlobalTrafficHandler{} +func TestGlobalTrafficController_Updated(t *testing.T) { - globalTrafficController, err := NewGlobalTrafficController(stop, &handler, config, time.Duration(1000)) + var ( - if err != nil { - t.Errorf("Unexpected err %v", err) + gth = test.MockGlobalTrafficHandler{} + cache = gtpCache{ + cache: make(map[string]map[string]map[string]*v1.GlobalTrafficPolicy), + mutex: &sync.Mutex{}, + } + gtpController = GlobalTrafficController{ + GlobalTrafficHandler: >h, + Cache: &cache, + } + gtp = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", Labels: map[string]string{"identity": "id", "admiral.io/env": "stage"}}, Spec: model.GlobalTrafficPolicy{ + Policy: []*model.TrafficPolicy {{DnsPrefix: "hello"}}, + },} + gtpUpdated = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", Labels: map[string]string{"identity": "id", "admiral.io/env": "stage"}}, Spec: model.GlobalTrafficPolicy{ + Policy: []*model.TrafficPolicy {{DnsPrefix: "helloUpdated"}}, + },} + gtpUpdatedToIgnore = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", Labels: map[string]string{"identity": "id", "admiral.io/env": "stage"}, Annotations: map[string]string{"admiral.io/ignore": "true"}}} + + ) + + //add the base object to cache + gtpController.Added(>p) + + testCases := []struct { + name string + gtp *v1.GlobalTrafficPolicy + expectedGtps []*v1.GlobalTrafficPolicy + }{ + { + name: "Gtp with should be updated", + gtp: >pUpdated, + expectedGtps: []*v1.GlobalTrafficPolicy{>pUpdated}, + }, + { + name: "Should remove gtp from cache when update with Ignore annotation", + gtp: >pUpdatedToIgnore, + expectedGtps: []*v1.GlobalTrafficPolicy{}, + }, + } + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + gtpController.Updated(c.gtp, gtp) + gtpKey := common.GetGtpKey(c.gtp) + matchedGtps := gtpController.Cache.Get(gtpKey, c.gtp.Namespace) + if !reflect.DeepEqual(c.expectedGtps, matchedGtps) { + t.Errorf("Test %s failed; expected %v, got %v", c.name, c.expectedGtps, matchedGtps) + } + }) } +} - if globalTrafficController == nil { - t.Errorf("GlobalTraffic controller should never be nil without an error thrown") +func TestGlobalTrafficController_Deleted(t *testing.T) { + + var ( + gth = test.MockGlobalTrafficHandler{} + cache = gtpCache{ + cache: make(map[string]map[string]map[string]*v1.GlobalTrafficPolicy), + mutex: &sync.Mutex{}, + } + gtpController = GlobalTrafficController{ + GlobalTrafficHandler: >h, + Cache: &cache, + } + gtp = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", Labels: map[string]string{"identity": "id", "admiral.io/env": "stage"}}, Spec: model.GlobalTrafficPolicy{ + Policy: []*model.TrafficPolicy {{DnsPrefix: "hello"}}, + },} + + gtp2 = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp2", Namespace: "namespace1", Labels: map[string]string{"identity": "id", "admiral.io/env": "stage"}}, Spec: model.GlobalTrafficPolicy{ + Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp2"}}, + },} + + gtp3 = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp3", Namespace: "namespace2", Labels: map[string]string{"identity": "id2", "admiral.io/env": "stage"}}, Spec: model.GlobalTrafficPolicy{ + Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp3"}}, + },} + ) + + //add the base object to cache + gtpController.Added(>p) + gtpController.Added(>p2) + + testCases := []struct { + name string + gtp *v1.GlobalTrafficPolicy + expectedGtps []*v1.GlobalTrafficPolicy + }{ + { + name: "Should delete gtp", + gtp: >p, + expectedGtps: []*v1.GlobalTrafficPolicy{>p2}, + }, + { + name: "Deleting non existing gtp should be a no-op", + gtp: >p3, + expectedGtps: []*v1.GlobalTrafficPolicy{}, + }, } + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + gtpController.Deleted(c.gtp) + gtpKey := common.GetGtpKey(c.gtp) + matchedGtps := gtpController.Cache.Get(gtpKey, c.gtp.Namespace) + if !reflect.DeepEqual(c.expectedGtps, matchedGtps) { + t.Errorf("Test %s failed; expected %v, got %v", c.name, c.expectedGtps, matchedGtps) + } + }) + } +} + +func TestGlobalTrafficController_Added(t *testing.T) { + + var ( - gtps := globalTrafficController.GetGTPByLabel("payments", "namespace1") + gth = test.MockGlobalTrafficHandler{} + cache = gtpCache{ + cache: make(map[string]map[string]map[string]*v1.GlobalTrafficPolicy), + mutex: &sync.Mutex{}, + } + gtpController = GlobalTrafficController{ + GlobalTrafficHandler: >h, + Cache: &cache, + } + gtp = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", Labels: map[string]string{"identity": "id", "admiral.io/env": "stage"}},} + gtpWithIgnoreLabels = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtpWithIgnoreLabels", Namespace: "namespace2", Labels: map[string]string{"identity": "id2", "admiral.io/env": "stage"}, Annotations: map[string]string{"admiral.io/ignore": "true"}}} - if gtps != nil || len(gtps) > 0 { - t.Errorf("gtps is not empty") + gtp2 = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp2", Namespace: "namespace1", Labels: map[string]string{"identity": "id", "admiral.io/env": "stage"}}} + + gtp3 = v1.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp3", Namespace: "namespace3", Labels: map[string]string{"identity": "id", "admiral.io/env": "stage"}}} + ) + + testCases := []struct { + name string + gtp *v1.GlobalTrafficPolicy + expectedGtps []*v1.GlobalTrafficPolicy + }{ + { + name: "Gtp should be added to the cache", + gtp: >p, + expectedGtps: []*v1.GlobalTrafficPolicy{>p}, + }, + { + name: "Gtp with ignore annotation should not be added to the cache", + gtp: >pWithIgnoreLabels, + expectedGtps: []*v1.GlobalTrafficPolicy{}, + }, + { + name: "Should cache multiple gtps in a namespace", + gtp: >p2, + expectedGtps: []*v1.GlobalTrafficPolicy{>p, >p2}, + }, + { + name: "Should cache gtps in from multiple namespaces", + gtp: >p3, + expectedGtps: []*v1.GlobalTrafficPolicy{>p3}, + }, + } + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + gtpController.Added(c.gtp) + gtpKey := common.GetGtpKey(c.gtp) + matchedGtps := gtpController.Cache.Get(gtpKey, c.gtp.Namespace) + if !reflect.DeepEqual(c.expectedGtps, matchedGtps) { + t.Errorf("Test %s failed; expected %v, got %v", c.name, c.expectedGtps, matchedGtps) + } + }) } } diff --git a/admiral/pkg/controller/common/common.go b/admiral/pkg/controller/common/common.go index 77aa2e35..94a6ae10 100644 --- a/admiral/pkg/controller/common/common.go +++ b/admiral/pkg/controller/common/common.go @@ -1,7 +1,8 @@ package common import ( - "sort" + "fmt" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" "strings" v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" @@ -142,114 +143,44 @@ func GetValueForKeyFromDeployment(key string, deployment *k8sAppsV1.Deployment) return value } -//Returns the list of deployments to which this GTP should apply. It is assumed that all inputs already are an identity match -//If the GTP has an identity label, it should match all deployments which share that label -//If the GTP does not have an identity label, it should return all deployments without an identity label -//IMPORTANT: If an environment label is specified on either the GTP or the deployment, the same value must be specified on the other for them to match -func MatchDeploymentsToGTP(gtp *v1.GlobalTrafficPolicy, deployments []k8sAppsV1.Deployment) []k8sAppsV1.Deployment { - if gtp == nil || gtp.Name == "" { - log.Warn("Nil or empty GlobalTrafficPolicy provided for deployment match. Returning nil.") - return nil - } - - gtpEnv := GetGtpEnv(gtp) - - if len(deployments) == 0 { - return nil - } - - var envMatchedDeployments []k8sAppsV1.Deployment - - for _, deployment := range deployments { - deploymentEnvironment := GetEnv(&deployment) - if deploymentEnvironment == gtpEnv { - envMatchedDeployments = append(envMatchedDeployments, deployment) - } - } - - if len(envMatchedDeployments) == 0 { - return nil - } - - for _, deployment := range deployments { - log.Infof("Newly added GTP with name=%v matched with Deployment %v in namespace %v. Env=%v", gtp.Name, deployment.Name, deployment.Namespace, gtpEnv) - } - return envMatchedDeployments -} - -//Find the GTP that best matches the deployment. -//It's assumed that the set of GTPs passed in has already been matched via the GtpDeploymentLabel. Now it's our job to choose the best one. -//In order: -// - If one and only one GTP matches the env label of the deployment - use that one. Use "default" as the default env label for all GTPs and deployments. -// - If multiple GTPs match the deployment label, use the oldest one (Using an old one has less chance of new behavior which could impact workflows) -//IMPORTANT: If an environment label is specified on either the GTP or the deployment, the same value must be specified on the other for them to match -func MatchGTPsToDeployment(gtpList []v1.GlobalTrafficPolicy, deployment *k8sAppsV1.Deployment) *v1.GlobalTrafficPolicy { - if deployment == nil || deployment.Name == "" { - log.Warn("Nil or empty GlobalTrafficPolicy provided for deployment match. Returning nil.") - return nil - } - deploymentEnvironment := GetEnv(deployment) - - //If one and only one GTP matches the env label of the deployment - use that one - if len(gtpList) == 1 { - gtpEnv := GetGtpEnv(>pList[0]) - if gtpEnv == deploymentEnvironment { - log.Infof("Newly added deployment with name=%v matched with GTP %v in namespace %v. Env=%v", deployment.Name, gtpList[0].Name, deployment.Namespace, gtpEnv) - return >pList[0] - } else { - return nil - } - } - - if len(gtpList) == 0 { - return nil - } - - var envMatchedGTPList []v1.GlobalTrafficPolicy - - for _, gtp := range gtpList { - gtpEnv := GetGtpEnv(>p) - if gtpEnv == deploymentEnvironment { - envMatchedGTPList = append(envMatchedGTPList, gtp) - } - } - - //if one matches the environment from the gtp, return it - if len(envMatchedGTPList) == 1 { - log.Infof("Newly added deployment with name=%v matched with GTP %v in namespace %v. Env=%v", deployment.Name, envMatchedGTPList[0].Name, deployment.Namespace, deploymentEnvironment) - return &envMatchedGTPList[0] - } - - //No GTPs matched the environment label - if len(envMatchedGTPList) == 0 { - return nil - } - - //Using age as a tiebreak - sort.Slice(envMatchedGTPList, func(i, j int) bool { - iTime := envMatchedGTPList[i].CreationTimestamp.Nanosecond() - jTime := envMatchedGTPList[j].CreationTimestamp.Nanosecond() - return iTime < jTime - }) - - log.Warnf("Multiple GTPs found that match the deployment with name=%v in namespace %v. Using the oldest one, you may want to clean up your configs to prevent this in the future", deployment.Name, deployment.Namespace) - //return oldest gtp - log.Infof("Newly added deployment with name=%v matched with GTP %v in namespace %v. Env=%v", deployment.Name, envMatchedGTPList[0].Name, deployment.Namespace, deploymentEnvironment) - return &envMatchedGTPList[0] - -} - func GetGtpEnv(gtp *v1.GlobalTrafficPolicy) string { var environment = gtp.Annotations[GetEnvKey()] if len(environment) == 0 { environment = gtp.Labels[GetEnvKey()] } + if len(environment) == 0 { + environment = gtp.Spec.Selector[GetEnvKey()] + } if len(environment) == 0 { environment = gtp.Labels[Env] log.Warnf("Using deprecated approach to use env label for GTP, name=%v in namespace=%v", gtp.Name, gtp.Namespace) } + if len(environment) == 0 { + environment = gtp.Spec.Selector[Env] + log.Warnf("Using deprecated approach to use env label for GTP, name=%v in namespace=%v", gtp.Name, gtp.Namespace) + } if len(environment) == 0 { environment = Default } return environment } + +func GetGtpIdentity(gtp *v1.GlobalTrafficPolicy) string { + identity := gtp.Labels[GetGlobalTrafficDeploymentLabel()] + if len(identity) == 0 { + identity = gtp.Spec.Selector[GetGlobalTrafficDeploymentLabel()] + } + return identity +} + +func GetGtpKey(gtp *v1.GlobalTrafficPolicy) string { + return ConstructGtpKey(GetGtpEnv(gtp), GetGtpIdentity(gtp)) +} + +func ConstructGtpKey(env, identity string) string { + return fmt.Sprintf("%s.%s", env, identity) +} + +func ShouldIgnoreResource(metadata v12.ObjectMeta) bool { + return metadata.Annotations[AdmiralIgnoreAnnotation] == "true" || metadata.Labels[AdmiralIgnoreAnnotation] == "true" +} diff --git a/admiral/pkg/controller/common/common_test.go b/admiral/pkg/controller/common/common_test.go index dd0f94e9..d1daa8ff 100644 --- a/admiral/pkg/controller/common/common_test.go +++ b/admiral/pkg/controller/common/common_test.go @@ -293,274 +293,6 @@ func TestGetEnv(t *testing.T) { } } -func TestMatchDeploymentsToGTP(t *testing.T) { - deployment := k8sAppsV1.Deployment{} - deployment.Namespace = "namespace" - deployment.Name = "fake-app-deployment-qal" - deployment.CreationTimestamp = v1.Now() - deployment.Spec = k8sAppsV1.DeploymentSpec{ - Template: k8sCoreV1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "qal"}, - }, - }, - } - deployment.Labels = map[string]string{"identity": "app1"} - - deployment2 := k8sAppsV1.Deployment{} - deployment2.Namespace = "namespace" - deployment2.Name = "fake-app-deployment-e2e" - deployment2.CreationTimestamp = v1.Now() - deployment2.Spec = k8sAppsV1.DeploymentSpec{ - Template: k8sCoreV1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "e2e"}, - }, - }, - } - deployment2.Labels = map[string]string{"identity": "app1"} - - deployment3 := k8sAppsV1.Deployment{} - deployment3.Namespace = "namespace" - deployment3.Name = "fake-app-deployment-prf-1" - deployment3.CreationTimestamp = v1.Now() - deployment3.Spec = k8sAppsV1.DeploymentSpec{ - Template: k8sCoreV1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "prf"}, - }, - }, - } - deployment3.Labels = map[string]string{"identity": "app1"} - - deployment4 := k8sAppsV1.Deployment{} - deployment4.Namespace = "namespace" - deployment4.Name = "fake-app-deployment-prf-2" - deployment4.CreationTimestamp = v1.Date(2020, 1, 1, 1, 1, 1, 1, time.UTC) - deployment4.Spec = k8sAppsV1.DeploymentSpec{ - Template: k8sCoreV1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "prf"}, - }, - }, - } - deployment4.Labels = map[string]string{"identity": "app1"} - - e2eGtp := v12.GlobalTrafficPolicy{} - e2eGtp.Labels = map[string]string{"identity": "app1", "env": "e2e"} - e2eGtp.Namespace = "namespace" - e2eGtp.Name = "myGTP" - - prfGtp := v12.GlobalTrafficPolicy{} - prfGtp.Labels = map[string]string{"identity": "app1", "env": "prf"} - prfGtp.Namespace = "namespace" - prfGtp.Name = "myGTP" - - //Struct of test case info. Name is required. - testCases := []struct { - name string - gtp *v12.GlobalTrafficPolicy - deployments *[]k8sAppsV1.Deployment - expectedDeployments []k8sAppsV1.Deployment - }{ - { - name: "Should return nil when none have a matching environment", - gtp: &e2eGtp, - deployments: &[]k8sAppsV1.Deployment{deployment, deployment3, deployment4}, - expectedDeployments: nil, - }, - { - name: "Should return Match when there's one match", - gtp: &e2eGtp, - deployments: &[]k8sAppsV1.Deployment{deployment2}, - expectedDeployments: []k8sAppsV1.Deployment{deployment2}, - }, - { - name: "Should return Match when there's one match from a bigger list", - gtp: &e2eGtp, - deployments: &[]k8sAppsV1.Deployment{deployment, deployment2, deployment3, deployment4}, - expectedDeployments: []k8sAppsV1.Deployment{deployment2}, - }, - { - name: "Should return nil when there's no match", - gtp: &e2eGtp, - deployments: &[]k8sAppsV1.Deployment{}, - expectedDeployments: nil, - }, - { - name: "Should return nil when the GTP is invalid", - gtp: &v12.GlobalTrafficPolicy{}, - deployments: &[]k8sAppsV1.Deployment{deployment}, - expectedDeployments: nil, - }, - { - name: "Returns multiple matches", - gtp: &prfGtp, - deployments: &[]k8sAppsV1.Deployment{deployment, deployment2, deployment3, deployment4}, - expectedDeployments: []k8sAppsV1.Deployment{deployment3, deployment4}, - }, - } - - //Run the test for every provided case - for _, c := range testCases { - t.Run(c.name, func(t *testing.T) { - returned := MatchDeploymentsToGTP(c.gtp, *c.deployments) - if !cmp.Equal(returned, c.expectedDeployments) { - t.Fatalf("Deployment mismatch. Diff: %v", cmp.Diff(returned, c.expectedDeployments)) - } - }) - } -} - -func TestMatchGTPsToDeployment(t *testing.T) { - deployment := k8sAppsV1.Deployment{} - deployment.Namespace = "namespace" - deployment.Name = "fake-app-deployment-qal" - deployment.CreationTimestamp = v1.Now() - deployment.Spec = k8sAppsV1.DeploymentSpec{ - Template: k8sCoreV1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "qal"}, - }, - }, - } - deployment.Labels = map[string]string{"identity": "app1"} - - otherEnvDeployment := k8sAppsV1.Deployment{} - otherEnvDeployment.Namespace = "namespace" - otherEnvDeployment.Name = "fake-app-deployment-qal" - otherEnvDeployment.CreationTimestamp = v1.Now() - otherEnvDeployment.Spec = k8sAppsV1.DeploymentSpec{ - Template: k8sCoreV1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"identity": "app1", "env": "random"}, - }, - }, - } - otherEnvDeployment.Labels = map[string]string{"identity": "app1"} - - noEnvDeployment := k8sAppsV1.Deployment{} - noEnvDeployment.Namespace = "namespace" - noEnvDeployment.Name = "fake-app-deployment-qal" - noEnvDeployment.CreationTimestamp = v1.Now() - noEnvDeployment.Spec = k8sAppsV1.DeploymentSpec{ - Template: k8sCoreV1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"identity": "app1"}, - }, - }, - } - noEnvDeployment.Labels = map[string]string{"identity": "app1"} - - e2eGtp := v12.GlobalTrafficPolicy{} - e2eGtp.CreationTimestamp = v1.Now() - e2eGtp.Labels = map[string]string{"identity": "app1", "env": "e2e"} - e2eGtp.Namespace = "namespace" - e2eGtp.Name = "myGTP-e2e" - - prfGtp := v12.GlobalTrafficPolicy{} - prfGtp.CreationTimestamp = v1.Now() - prfGtp.Labels = map[string]string{"identity": "app1", "env": "prf"} - prfGtp.Namespace = "namespace" - prfGtp.Name = "myGTP-prf" - - qalGtp := v12.GlobalTrafficPolicy{} - qalGtp.CreationTimestamp = v1.Now() - qalGtp.Labels = map[string]string{"identity": "app1", "env": "qal"} - qalGtp.Namespace = "namespace" - qalGtp.Name = "myGTP" - - qalGtpOld := v12.GlobalTrafficPolicy{} - qalGtpOld.CreationTimestamp = v1.Date(2020, 1, 1, 1, 1, 1, 1, time.UTC) - qalGtpOld.Labels = map[string]string{"identity": "app1", "env": "qal"} - qalGtpOld.Namespace = "namespace" - qalGtpOld.Name = "myGTP" - - noEnvGTP := v12.GlobalTrafficPolicy{} - noEnvGTP.CreationTimestamp = v1.Now() - noEnvGTP.Labels = map[string]string{"identity": "app1"} - noEnvGTP.Namespace = "namespace" - noEnvGTP.Name = "myGTP" - - noEnvGTPOld := v12.GlobalTrafficPolicy{} - noEnvGTPOld.CreationTimestamp = v1.Date(2020, 1, 1, 1, 1, 1, 1, time.UTC) - noEnvGTPOld.Labels = map[string]string{"identity": "app1"} - noEnvGTPOld.Namespace = "namespace" - noEnvGTPOld.Name = "myGTP" - - testCases := []struct { - name string - gtp *[]v12.GlobalTrafficPolicy - deployment *k8sAppsV1.Deployment - expectedGTP *v12.GlobalTrafficPolicy - }{ - { - name: "Should return no deployment when none have a matching env", - gtp: &[]v12.GlobalTrafficPolicy{e2eGtp, prfGtp, qalGtp, qalGtpOld}, - deployment: &otherEnvDeployment, - expectedGTP: nil, - }, - { - name: "Should return no deployment when the GTP doesn't have an environment", - gtp: &[]v12.GlobalTrafficPolicy{noEnvGTP, noEnvGTPOld}, - deployment: &otherEnvDeployment, - expectedGTP: nil, - }, - { - name: "Should return no deployment when no deployments have an environment", - gtp: &[]v12.GlobalTrafficPolicy{e2eGtp, prfGtp}, - deployment: &noEnvDeployment, - expectedGTP: nil, - }, - { - name: "Should match a GTP and deployment when both have no env label", - gtp: &[]v12.GlobalTrafficPolicy{e2eGtp, prfGtp, qalGtp, qalGtpOld, noEnvGTP, noEnvGTPOld}, - deployment: &noEnvDeployment, - expectedGTP: &noEnvGTPOld, - }, - { - name: "Should return Match when there's one match", - gtp: &[]v12.GlobalTrafficPolicy{qalGtp}, - deployment: &deployment, - expectedGTP: &qalGtp, - }, - { - name: "Should return Match when there's one match from a bigger list", - gtp: &[]v12.GlobalTrafficPolicy{e2eGtp, prfGtp, qalGtp}, - deployment: &deployment, - expectedGTP: &qalGtp, - }, - { - name: "Should handle multiple matches properly", - gtp: &[]v12.GlobalTrafficPolicy{e2eGtp, prfGtp, qalGtp, qalGtpOld}, - deployment: &deployment, - expectedGTP: &qalGtpOld, - }, - { - name: "Should return nil when there's no match", - gtp: &[]v12.GlobalTrafficPolicy{}, - deployment: &deployment, - expectedGTP: nil, - }, - { - name: "Should return nil the deployment is invalid", - gtp: &[]v12.GlobalTrafficPolicy{}, - deployment: &k8sAppsV1.Deployment{}, - expectedGTP: nil, - }, - } - - for _, c := range testCases { - t.Run(c.name, func(t *testing.T) { - returned := MatchGTPsToDeployment(*c.gtp, c.deployment) - if !cmp.Equal(returned, c.expectedGTP, ignoreUnexported) { - t.Fatalf("Deployment mismatch. Diff: %v", cmp.Diff(returned, c.expectedGTP, ignoreUnexported)) - } - }) - } - -} - func TestGetGtpEnv(t *testing.T) { envNewAnnotationGtp := v12.GlobalTrafficPolicy{} @@ -624,4 +356,4 @@ func TestGetGtpEnv(t *testing.T) { }) } -} +} \ No newline at end of file