diff --git a/pkg/dependenciesdistributor/dependencies_distributor.go b/pkg/dependenciesdistributor/dependencies_distributor.go index 5eeb9f794716..b5de46962594 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor.go +++ b/pkg/dependenciesdistributor/dependencies_distributor.go @@ -169,12 +169,36 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error { return fmt.Errorf("invalid key") } klog.V(4).Infof("DependenciesDistributor start to reconcile object: %s", clusterWideKey) - - bindingObjectList, err := d.resourceBindingLister.ByNamespace(clusterWideKey.Namespace).List(labels.Everything()) + resourceBindingNamespaceKey := clusterWideKey.Namespace + "/" + names.GenerateBindingName(clusterWideKey.Kind, clusterWideKey.Name) + unstructuredObj, err := d.resourceBindingLister.Get(resourceBindingNamespaceKey) if err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("ResourceBinding(%s) not found.", resourceBindingNamespaceKey) + return nil + } + klog.Errorf("Failed to get ResourceBinding(%s): %v", resourceBindingNamespaceKey, err) + return err + } + bindingObject := &workv1alpha2.ResourceBinding{} + if err = helper.ConvertToTypedObject(unstructuredObj, bindingObject); err != nil { + klog.Errorf("Failed to convert ResourceBinding(%s) from unstructured object: %v", clusterWideKey.NamespaceKey(), err) return err } + var bindingObjectList []runtime.Object + for _, bindingSnapshot := range bindingObject.Spec.RequiredBy { + if bindingSnapshot.Namespace != "" { + binding, err := d.resourceBindingLister.Get(bindingSnapshot.Namespace + "/" + bindingSnapshot.Name) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Errorf("ResourceBinding(%s) not found.", resourceBindingNamespaceKey) + continue + } + return err + } + bindingObjectList = append(bindingObjectList, binding) + } + } bindingList, err := convertObjectsToResourceBindings(bindingObjectList) if err != nil { return err @@ -185,17 +209,6 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error { if !binding.DeletionTimestamp.IsZero() { continue } - - matched, err := dependentObjectReferenceMatches(clusterWideKey, binding) - if err != nil { - klog.Errorf("Failed to evaluate if binding(%s/%s) need to sync dependencies: %v", binding.Namespace, binding.Name, err) - errs = append(errs, err) - continue - } else if !matched { - klog.V(4).Infof("No need to sync binding(%s/%s)", binding.Namespace, binding.Name) - continue - } - klog.V(4).Infof("Resource binding(%s/%s) is matched for resource(%s/%s)", binding.Namespace, binding.Name, clusterWideKey.Namespace, clusterWideKey.Name) bindingKey, err := detector.ClusterWideKeyFunc(binding) if err != nil { @@ -210,35 +223,6 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error { return utilerrors.NewAggregate(errs) } -// dependentObjectReferenceMatches tells if the given object is referred by current resource binding. -func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBinding *workv1alpha2.ResourceBinding) (bool, error) { - dependencies, exist := referenceBinding.Annotations[bindingDependenciesAnnotationKey] - if !exist { - return false, nil - } - - var dependenciesSlice []configv1alpha1.DependentObjectReference - err := json.Unmarshal([]byte(dependencies), &dependenciesSlice) - if err != nil { - return false, err - } - - if len(dependenciesSlice) == 0 { - return false, nil - } - - for _, dependence := range dependenciesSlice { - if objectKey.Version == dependence.APIVersion && - objectKey.Kind == dependence.Kind && - objectKey.Namespace == dependence.Namespace && - objectKey.Name == dependence.Name { - return true, nil - } - } - - return false, nil -} - // OnResourceBindingUpdate handles object update event and push the object to queue. func (d *DependenciesDistributor) OnResourceBindingUpdate(oldObj, newObj interface{}) { oldBindingObject := &workv1alpha2.ResourceBinding{} @@ -549,7 +533,7 @@ func buildAttachedBinding(binding *workv1alpha2.ResourceBinding, object *unstruc return &workv1alpha2.ResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: names.GenerateBindingName(object.GetKind(), object.GetName()), - Namespace: binding.GetNamespace(), + Namespace: object.GetNamespace(), OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(object, object.GroupVersionKind()), }, diff --git a/test/e2e/dependenciesdistributor_test.go b/test/e2e/dependenciesdistributor_test.go index 652909f66892..4b16a8d3d4fc 100644 --- a/test/e2e/dependenciesdistributor_test.go +++ b/test/e2e/dependenciesdistributor_test.go @@ -1,13 +1,26 @@ package e2e import ( + "context" + "fmt" + "time" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" "github.com/karmada-io/karmada/test/e2e/framework" testhelper "github.com/karmada-io/karmada/test/helper" @@ -326,4 +339,213 @@ var _ = ginkgo.Describe("[DependenciesDistributor] automatically propagate relev }) }) }) + + ginkgo.Context("across namespace dependencies propagation testing", func() { + var crdGroup string + var randStr string + var crdSpecNames apiextensionsv1.CustomResourceDefinitionNames + var crd *apiextensionsv1.CustomResourceDefinition + var crdPolicy *policyv1alpha1.ClusterPropagationPolicy + var crNamespace, crName string + var crGVR schema.GroupVersionResource + var crAPIVersion string + var cr *unstructured.Unstructured + var crPolicy *policyv1alpha1.PropagationPolicy + var initClusterNames, updateClusterNames []string + + ginkgo.BeforeEach(func() { + initClusterNames = []string{"member1"} + updateClusterNames = []string{"member2"} + crdGroup = fmt.Sprintf("example-%s.karmada.io", rand.String(RandomStrLength)) + randStr = rand.String(RandomStrLength) + crdSpecNames = apiextensionsv1.CustomResourceDefinitionNames{ + Kind: fmt.Sprintf("Foo%s", randStr), + ListKind: fmt.Sprintf("Foo%sList", randStr), + Plural: fmt.Sprintf("foo%ss", randStr), + Singular: fmt.Sprintf("foo%s", randStr), + } + crd = testhelper.NewCustomResourceDefinition(crdGroup, crdSpecNames, apiextensionsv1.NamespaceScoped) + crdPolicy = testhelper.NewClusterPropagationPolicy(crd.Name, []policyv1alpha1.ResourceSelector{ + { + APIVersion: crd.APIVersion, + Kind: crd.Kind, + Name: crd.Name, + }, + }, policyv1alpha1.Placement{ + ClusterAffinity: &policyv1alpha1.ClusterAffinity{ + ClusterNames: framework.ClusterNames(), + }, + }) + }) + + ginkgo.BeforeEach(func() { + framework.CreateClusterPropagationPolicy(karmadaClient, crdPolicy) + framework.CreateCRD(dynamicClient, crd) + framework.WaitCRDPresentOnClusters(karmadaClient, framework.ClusterNames(), + fmt.Sprintf("%s/%s", crd.Spec.Group, "v1alpha1"), crd.Spec.Names.Kind) + ginkgo.DeferCleanup(func() { + framework.RemoveClusterPropagationPolicy(karmadaClient, crdPolicy.Name) + framework.RemoveCRD(dynamicClient, crd.Name) + framework.WaitCRDDisappearedOnClusters(framework.ClusterNames(), crd.Name) + }) + }) + + ginkgo.JustBeforeEach(func() { + ginkgo.By(fmt.Sprintf("create cr(%s/%s)", crNamespace, crName), func() { + _, err := dynamicClient.Resource(crGVR).Namespace(crNamespace).Create(context.TODO(), cr, metav1.CreateOptions{}) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + framework.CreatePropagationPolicy(karmadaClient, crPolicy) + ginkgo.DeferCleanup(func() { + ginkgo.By(fmt.Sprintf("remove cr(%s/%s)", crNamespace, crName), func() { + err := dynamicClient.Resource(crGVR).Namespace(crNamespace).Delete(context.TODO(), crName, metav1.DeleteOptions{}) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + framework.RemovePropagationPolicy(karmadaClient, crPolicy.Namespace, crPolicy.Name) + }) + }) + + ginkgo.When("across namespace configmap propagate automatically", func() { + var configMapName string + var configMap *corev1.ConfigMap + var customizationConfigMap *configv1alpha1.ResourceInterpreterCustomization + ginkgo.BeforeEach(func() { + configMapName = configMapNamePrefix + rand.String(RandomStrLength) + configMap = testhelper.NewConfigMap("default", configMapName, map[string]string{"user": "karmada"}) + crNamespace = testNamespace + crName = crdNamePrefix + rand.String(RandomStrLength) + crGVR = schema.GroupVersionResource{Group: crd.Spec.Group, Version: "v1alpha1", Resource: crd.Spec.Names.Plural} + crAPIVersion = fmt.Sprintf("%s/%s", crd.Spec.Group, "v1alpha1") + cr = testhelper.NewCustomResourceWithConfigMap(crAPIVersion, crd.Spec.Names.Kind, crNamespace, crName, configMapName) + customizationConfigMap = testhelper.NewResourceInterpreterCustomization( + "interpreter-customization"+rand.String(RandomStrLength), + configv1alpha1.CustomizationTarget{ + APIVersion: cr.GetAPIVersion(), + Kind: cr.GetKind(), + }, + configv1alpha1.CustomizationRules{ + DependencyInterpretation: &configv1alpha1.DependencyInterpretation{ + LuaScript: ` +function GetDependencies(desiredObj) + dependencies = {} + if desiredObj.spec.resource ~= nil and desiredObj.spec.resource.kind == 'ConfigMap' then + dependObj = {} + dependObj.apiVersion = 'v1' + dependObj.kind = 'ConfigMap' + dependObj.name = desiredObj.spec.resource.name + dependObj.namespace = desiredObj.spec.resource.namespace + dependencies[1] = dependObj + end + return dependencies +end `, + }, + }) + crPolicy = testhelper.NewPropagationPolicy(crNamespace, crName, []policyv1alpha1.ResourceSelector{ + { + APIVersion: crAPIVersion, + Kind: crd.Spec.Names.Kind, + Name: crName, + }, + }, policyv1alpha1.Placement{ + ClusterAffinity: &policyv1alpha1.ClusterAffinity{ + ClusterNames: initClusterNames, + }, + }) + crPolicy.Spec.PropagateDeps = true + }) + + ginkgo.BeforeEach(func() { + framework.CreateConfigMap(kubeClient, configMap) + framework.CreateResourceInterpreterCustomization(karmadaClient, customizationConfigMap) + // Wait for resource interpreter informer synced. + time.Sleep(time.Second) + ginkgo.DeferCleanup(func() { + framework.RemoveConfigMap(kubeClient, configMap.Namespace, configMapName) + framework.DeleteResourceInterpreterCustomization(karmadaClient, customizationConfigMap.Name) + }) + }) + + ginkgo.It("across namespace configmap automatically propagation testing", func() { + ginkgo.By("check if cr present on member clusters", func() { + for _, cluster := range initClusterNames { + clusterDynamicClient := framework.GetClusterDynamicClient(cluster) + gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil()) + klog.Infof("Waiting for cr(%s/%s) present on cluster(%s)", crNamespace, crName, cluster) + err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) { + _, err = clusterDynamicClient.Resource(crGVR).Namespace(crNamespace).Get(context.TODO(), crName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + }) + + ginkgo.By("check if the configmap is propagated automatically", func() { + framework.WaitConfigMapPresentOnClustersFitWith(initClusterNames, configMap.Namespace, configMapName, + func(configmap *corev1.ConfigMap) bool { + return true + }) + }) + + ginkgo.By("updating propagation policy's clusterNames", func() { + patch := []map[string]interface{}{ + { + "op": "replace", + "path": "/spec/placement/clusterAffinity/clusterNames", + "value": updateClusterNames, + }, + } + + framework.PatchPropagationPolicy(karmadaClient, crPolicy.Namespace, crPolicy.Name, patch, types.JSONPatchType) + ginkgo.By("check if cr present on member clusters", func() { + for _, cluster := range updateClusterNames { + clusterDynamicClient := framework.GetClusterDynamicClient(cluster) + gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil()) + klog.Infof("Waiting for cr(%s/%s) present on cluster(%s)", crNamespace, crName, cluster) + err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) { + _, err = clusterDynamicClient.Resource(crGVR).Namespace(crNamespace).Get(context.TODO(), crName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + }) + framework.WaitConfigMapPresentOnClustersFitWith(updateClusterNames, configMap.Namespace, configMapName, + func(configmap *corev1.ConfigMap) bool { + return true + }) + }) + ginkgo.By("updating configmap's data", func() { + patch := []map[string]interface{}{ + { + "op": "replace", + "path": "/data/user", + "value": "karmada-e2e", + }, + } + + framework.UpdateConfigMapWithPatch(kubeClient, configMap.Namespace, configMapName, patch, types.JSONPatchType) + framework.WaitConfigMapPresentOnClustersFitWith(updateClusterNames, configMap.Namespace, configMapName, + func(configmap *corev1.ConfigMap) bool { + for key, value := range configmap.Data { + if key == "user" && value == "karmada-e2e" { + return true + } + } + return false + }) + }) + }) + }) + }) }) diff --git a/test/helper/resource.go b/test/helper/resource.go index e8a4ceeb874f..3e6b2934a544 100644 --- a/test/helper/resource.go +++ b/test/helper/resource.go @@ -302,6 +302,29 @@ func NewCustomResource(apiVersion, kind, namespace, name string) *unstructured.U } } +// NewCustomResourceWithConfigMap will build a CR object depended on across ns ConfigMap. +func NewCustomResourceWithConfigMap(apiVersion, kind, namespace, name, configMapName string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": kind, + "metadata": map[string]string{ + "namespace": namespace, + "name": name, + }, + "spec": map[string]interface{}{ + "resource": map[string]string{ + "apiVersion": "v1", + "kind": "ConfigMap", + "name": configMapName, + "namespace": "default", + }, + "clusters": []map[string]string{}, + }, + }, + } +} + // NewJob will build a job object. func NewJob(namespace string, name string) *batchv1.Job { return &batchv1.Job{