From 86ecd8f596fb650f63801bbdba73064f8208552f Mon Sep 17 00:00:00 2001 From: huangyanfeng Date: Thu, 12 Jan 2023 11:20:43 +0800 Subject: [PATCH] dependencies distributor support resource accros namespace Signed-off-by: huangyanfeng --- .../dependencies_distributor.go | 93 +++----- test/e2e/dependenciesdistributor_test.go | 222 ++++++++++++++++++ test/helper/resource.go | 23 ++ 3 files changed, 280 insertions(+), 58 deletions(-) diff --git a/pkg/dependenciesdistributor/dependencies_distributor.go b/pkg/dependenciesdistributor/dependencies_distributor.go index 5eeb9f794716..eabf624e113c 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor.go +++ b/pkg/dependenciesdistributor/dependencies_distributor.go @@ -2,10 +2,12 @@ package dependenciesdistributor import ( "context" - "encoding/json" "fmt" + "hash/fnv" + "strings" "time" + "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -15,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -34,6 +37,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" + hashutil "github.com/karmada-io/karmada/pkg/util/hash" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" ) @@ -42,9 +46,9 @@ const ( // bindingDependedByLabelKeyPrefix is the prefix to a label key specifying an attached binding referred by which independent binding. // the key is in the label of an attached binding which should be unique, because resource like secret can be referred by multiple deployments. bindingDependedByLabelKeyPrefix = "resourcebinding.karmada.io/depended-by-" - // bindingDependenciesAnnotationKey represents the key of dependencies data (json serialized) - // in the annotations of an independent binding. - bindingDependenciesAnnotationKey = "resourcebinding.karmada.io/dependencies" + //bindingDependedOnLabelKeyPrefix is the prefix to a label key specifying a binding depended on one object + // the key is in the label of a binding which should be unique + bindingDependedOnLabelKeyPrefix = "resourcebinding.karmada.io/depended-on-" ) var supportedTypes = []schema.GroupVersionResource{ @@ -170,7 +174,9 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error { } klog.V(4).Infof("DependenciesDistributor start to reconcile object: %s", clusterWideKey) - bindingObjectList, err := d.resourceBindingLister.ByNamespace(clusterWideKey.Namespace).List(labels.Everything()) + bindingDependedLabelKey := generateBindingDependedLabelKey(clusterWideKey.Kind, clusterWideKey.Version, clusterWideKey.Name, clusterWideKey.Namespace) + selector := labels.SelectorFromSet(map[string]string{bindingDependedLabelKey: "true"}) + bindingObjectList, err := d.resourceBindingLister.List(selector) if err != nil { return err } @@ -185,17 +191,6 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error { if !binding.DeletionTimestamp.IsZero() { continue } - - matched, err := dependentObjectReferenceMatches(clusterWideKey, binding) - if err != nil { - klog.Errorf("Failed to evaluate if binding(%s/%s) need to sync dependencies: %v", binding.Namespace, binding.Name, err) - errs = append(errs, err) - continue - } else if !matched { - klog.V(4).Infof("No need to sync binding(%s/%s)", binding.Namespace, binding.Name) - continue - } - klog.V(4).Infof("Resource binding(%s/%s) is matched for resource(%s/%s)", binding.Namespace, binding.Name, clusterWideKey.Namespace, clusterWideKey.Name) bindingKey, err := detector.ClusterWideKeyFunc(binding) if err != nil { @@ -210,35 +205,6 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error { return utilerrors.NewAggregate(errs) } -// dependentObjectReferenceMatches tells if the given object is referred by current resource binding. -func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBinding *workv1alpha2.ResourceBinding) (bool, error) { - dependencies, exist := referenceBinding.Annotations[bindingDependenciesAnnotationKey] - if !exist { - return false, nil - } - - var dependenciesSlice []configv1alpha1.DependentObjectReference - err := json.Unmarshal([]byte(dependencies), &dependenciesSlice) - if err != nil { - return false, err - } - - if len(dependenciesSlice) == 0 { - return false, nil - } - - for _, dependence := range dependenciesSlice { - if objectKey.Version == dependence.APIVersion && - objectKey.Kind == dependence.Kind && - objectKey.Namespace == dependence.Namespace && - objectKey.Name == dependence.Name { - return true, nil - } - } - - return false, nil -} - // OnResourceBindingUpdate handles object update event and push the object to queue. func (d *DependenciesDistributor) OnResourceBindingUpdate(oldObj, newObj interface{}) { oldBindingObject := &workv1alpha2.ResourceBinding{} @@ -414,26 +380,30 @@ func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding * } func (d *DependenciesDistributor) recordDependenciesForIndependentBinding(binding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) error { - dependenciesBytes, err := json.Marshal(dependencies) - if err != nil { - klog.Errorf("Failed to marshal dependencies of binding(%s/%s): %v", binding.Namespace, binding.Name, err) - return err + objectLabels := binding.GetLabels() + if objectLabels == nil { + objectLabels = make(map[string]string) } - objectAnnotation := binding.GetAnnotations() - if objectAnnotation == nil { - objectAnnotation = make(map[string]string, 1) + newObjectLabels := make(map[string]string) + for k, v := range objectLabels { + if !strings.HasPrefix(k, bindingDependedOnLabelKeyPrefix) { + newObjectLabels[k] = v + } } - // dependencies are not updated, no need to update annotation. - if oldDependencies, exist := objectAnnotation[bindingDependenciesAnnotationKey]; exist && oldDependencies == string(dependenciesBytes) { - return nil + for _, dependence := range dependencies { + dependencyKey := generateBindingDependedLabelKey(dependence.Kind, dependence.APIVersion, dependence.Name, dependence.Namespace) + newObjectLabels[dependencyKey] = "true" } - objectAnnotation[bindingDependenciesAnnotationKey] = string(dependenciesBytes) + // labels are not updated, no need to update binding. + if cmp.Equal(objectLabels, newObjectLabels) { + return nil + } return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - binding.SetAnnotations(objectAnnotation) + binding.SetLabels(newObjectLabels) updateErr := d.Client.Update(context.TODO(), binding) if updateErr == nil { return nil @@ -495,6 +465,13 @@ func generateBindingDependedByLabelKey(bindingNamespace, bindingName string) str return fmt.Sprintf(bindingDependedByLabelKeyPrefix + bindHashKey) } +func generateBindingDependedLabelKey(kind, apiVersion, name, namespace string) string { + dependedName := generateDependencyKey(kind, apiVersion, name, namespace) + hash := fnv.New32a() + hashutil.DeepHashObject(hash, dependedName) + return fmt.Sprintf(bindingDependedOnLabelKeyPrefix + rand.SafeEncodeString(fmt.Sprint(hash.Sum32()))) +} + func generateDependencyKey(kind, apiVersion, name, namespace string) string { if len(namespace) == 0 { return kind + "-" + apiVersion + "-" + name @@ -549,7 +526,7 @@ func buildAttachedBinding(binding *workv1alpha2.ResourceBinding, object *unstruc return &workv1alpha2.ResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: names.GenerateBindingName(object.GetKind(), object.GetName()), - Namespace: binding.GetNamespace(), + Namespace: object.GetNamespace(), OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(object, object.GroupVersionKind()), }, diff --git a/test/e2e/dependenciesdistributor_test.go b/test/e2e/dependenciesdistributor_test.go index 652909f66892..4b16a8d3d4fc 100644 --- a/test/e2e/dependenciesdistributor_test.go +++ b/test/e2e/dependenciesdistributor_test.go @@ -1,13 +1,26 @@ package e2e import ( + "context" + "fmt" + "time" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" "github.com/karmada-io/karmada/test/e2e/framework" testhelper "github.com/karmada-io/karmada/test/helper" @@ -326,4 +339,213 @@ var _ = ginkgo.Describe("[DependenciesDistributor] automatically propagate relev }) }) }) + + ginkgo.Context("across namespace dependencies propagation testing", func() { + var crdGroup string + var randStr string + var crdSpecNames apiextensionsv1.CustomResourceDefinitionNames + var crd *apiextensionsv1.CustomResourceDefinition + var crdPolicy *policyv1alpha1.ClusterPropagationPolicy + var crNamespace, crName string + var crGVR schema.GroupVersionResource + var crAPIVersion string + var cr *unstructured.Unstructured + var crPolicy *policyv1alpha1.PropagationPolicy + var initClusterNames, updateClusterNames []string + + ginkgo.BeforeEach(func() { + initClusterNames = []string{"member1"} + updateClusterNames = []string{"member2"} + crdGroup = fmt.Sprintf("example-%s.karmada.io", rand.String(RandomStrLength)) + randStr = rand.String(RandomStrLength) + crdSpecNames = apiextensionsv1.CustomResourceDefinitionNames{ + Kind: fmt.Sprintf("Foo%s", randStr), + ListKind: fmt.Sprintf("Foo%sList", randStr), + Plural: fmt.Sprintf("foo%ss", randStr), + Singular: fmt.Sprintf("foo%s", randStr), + } + crd = testhelper.NewCustomResourceDefinition(crdGroup, crdSpecNames, apiextensionsv1.NamespaceScoped) + crdPolicy = testhelper.NewClusterPropagationPolicy(crd.Name, []policyv1alpha1.ResourceSelector{ + { + APIVersion: crd.APIVersion, + Kind: crd.Kind, + Name: crd.Name, + }, + }, policyv1alpha1.Placement{ + ClusterAffinity: &policyv1alpha1.ClusterAffinity{ + ClusterNames: framework.ClusterNames(), + }, + }) + }) + + ginkgo.BeforeEach(func() { + framework.CreateClusterPropagationPolicy(karmadaClient, crdPolicy) + framework.CreateCRD(dynamicClient, crd) + framework.WaitCRDPresentOnClusters(karmadaClient, framework.ClusterNames(), + fmt.Sprintf("%s/%s", crd.Spec.Group, "v1alpha1"), crd.Spec.Names.Kind) + ginkgo.DeferCleanup(func() { + framework.RemoveClusterPropagationPolicy(karmadaClient, crdPolicy.Name) + framework.RemoveCRD(dynamicClient, crd.Name) + framework.WaitCRDDisappearedOnClusters(framework.ClusterNames(), crd.Name) + }) + }) + + ginkgo.JustBeforeEach(func() { + ginkgo.By(fmt.Sprintf("create cr(%s/%s)", crNamespace, crName), func() { + _, err := dynamicClient.Resource(crGVR).Namespace(crNamespace).Create(context.TODO(), cr, metav1.CreateOptions{}) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + framework.CreatePropagationPolicy(karmadaClient, crPolicy) + ginkgo.DeferCleanup(func() { + ginkgo.By(fmt.Sprintf("remove cr(%s/%s)", crNamespace, crName), func() { + err := dynamicClient.Resource(crGVR).Namespace(crNamespace).Delete(context.TODO(), crName, metav1.DeleteOptions{}) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + framework.RemovePropagationPolicy(karmadaClient, crPolicy.Namespace, crPolicy.Name) + }) + }) + + ginkgo.When("across namespace configmap propagate automatically", func() { + var configMapName string + var configMap *corev1.ConfigMap + var customizationConfigMap *configv1alpha1.ResourceInterpreterCustomization + ginkgo.BeforeEach(func() { + configMapName = configMapNamePrefix + rand.String(RandomStrLength) + configMap = testhelper.NewConfigMap("default", configMapName, map[string]string{"user": "karmada"}) + crNamespace = testNamespace + crName = crdNamePrefix + rand.String(RandomStrLength) + crGVR = schema.GroupVersionResource{Group: crd.Spec.Group, Version: "v1alpha1", Resource: crd.Spec.Names.Plural} + crAPIVersion = fmt.Sprintf("%s/%s", crd.Spec.Group, "v1alpha1") + cr = testhelper.NewCustomResourceWithConfigMap(crAPIVersion, crd.Spec.Names.Kind, crNamespace, crName, configMapName) + customizationConfigMap = testhelper.NewResourceInterpreterCustomization( + "interpreter-customization"+rand.String(RandomStrLength), + configv1alpha1.CustomizationTarget{ + APIVersion: cr.GetAPIVersion(), + Kind: cr.GetKind(), + }, + configv1alpha1.CustomizationRules{ + DependencyInterpretation: &configv1alpha1.DependencyInterpretation{ + LuaScript: ` +function GetDependencies(desiredObj) + dependencies = {} + if desiredObj.spec.resource ~= nil and desiredObj.spec.resource.kind == 'ConfigMap' then + dependObj = {} + dependObj.apiVersion = 'v1' + dependObj.kind = 'ConfigMap' + dependObj.name = desiredObj.spec.resource.name + dependObj.namespace = desiredObj.spec.resource.namespace + dependencies[1] = dependObj + end + return dependencies +end `, + }, + }) + crPolicy = testhelper.NewPropagationPolicy(crNamespace, crName, []policyv1alpha1.ResourceSelector{ + { + APIVersion: crAPIVersion, + Kind: crd.Spec.Names.Kind, + Name: crName, + }, + }, policyv1alpha1.Placement{ + ClusterAffinity: &policyv1alpha1.ClusterAffinity{ + ClusterNames: initClusterNames, + }, + }) + crPolicy.Spec.PropagateDeps = true + }) + + ginkgo.BeforeEach(func() { + framework.CreateConfigMap(kubeClient, configMap) + framework.CreateResourceInterpreterCustomization(karmadaClient, customizationConfigMap) + // Wait for resource interpreter informer synced. + time.Sleep(time.Second) + ginkgo.DeferCleanup(func() { + framework.RemoveConfigMap(kubeClient, configMap.Namespace, configMapName) + framework.DeleteResourceInterpreterCustomization(karmadaClient, customizationConfigMap.Name) + }) + }) + + ginkgo.It("across namespace configmap automatically propagation testing", func() { + ginkgo.By("check if cr present on member clusters", func() { + for _, cluster := range initClusterNames { + clusterDynamicClient := framework.GetClusterDynamicClient(cluster) + gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil()) + klog.Infof("Waiting for cr(%s/%s) present on cluster(%s)", crNamespace, crName, cluster) + err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) { + _, err = clusterDynamicClient.Resource(crGVR).Namespace(crNamespace).Get(context.TODO(), crName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + }) + + ginkgo.By("check if the configmap is propagated automatically", func() { + framework.WaitConfigMapPresentOnClustersFitWith(initClusterNames, configMap.Namespace, configMapName, + func(configmap *corev1.ConfigMap) bool { + return true + }) + }) + + ginkgo.By("updating propagation policy's clusterNames", func() { + patch := []map[string]interface{}{ + { + "op": "replace", + "path": "/spec/placement/clusterAffinity/clusterNames", + "value": updateClusterNames, + }, + } + + framework.PatchPropagationPolicy(karmadaClient, crPolicy.Namespace, crPolicy.Name, patch, types.JSONPatchType) + ginkgo.By("check if cr present on member clusters", func() { + for _, cluster := range updateClusterNames { + clusterDynamicClient := framework.GetClusterDynamicClient(cluster) + gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil()) + klog.Infof("Waiting for cr(%s/%s) present on cluster(%s)", crNamespace, crName, cluster) + err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) { + _, err = clusterDynamicClient.Resource(crGVR).Namespace(crNamespace).Get(context.TODO(), crName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + }) + framework.WaitConfigMapPresentOnClustersFitWith(updateClusterNames, configMap.Namespace, configMapName, + func(configmap *corev1.ConfigMap) bool { + return true + }) + }) + ginkgo.By("updating configmap's data", func() { + patch := []map[string]interface{}{ + { + "op": "replace", + "path": "/data/user", + "value": "karmada-e2e", + }, + } + + framework.UpdateConfigMapWithPatch(kubeClient, configMap.Namespace, configMapName, patch, types.JSONPatchType) + framework.WaitConfigMapPresentOnClustersFitWith(updateClusterNames, configMap.Namespace, configMapName, + func(configmap *corev1.ConfigMap) bool { + for key, value := range configmap.Data { + if key == "user" && value == "karmada-e2e" { + return true + } + } + return false + }) + }) + }) + }) + }) }) diff --git a/test/helper/resource.go b/test/helper/resource.go index e8a4ceeb874f..3e6b2934a544 100644 --- a/test/helper/resource.go +++ b/test/helper/resource.go @@ -302,6 +302,29 @@ func NewCustomResource(apiVersion, kind, namespace, name string) *unstructured.U } } +// NewCustomResourceWithConfigMap will build a CR object depended on across ns ConfigMap. +func NewCustomResourceWithConfigMap(apiVersion, kind, namespace, name, configMapName string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": kind, + "metadata": map[string]string{ + "namespace": namespace, + "name": name, + }, + "spec": map[string]interface{}{ + "resource": map[string]string{ + "apiVersion": "v1", + "kind": "ConfigMap", + "name": configMapName, + "namespace": "default", + }, + "clusters": []map[string]string{}, + }, + }, + } +} + // NewJob will build a job object. func NewJob(namespace string, name string) *batchv1.Job { return &batchv1.Job{