Skip to content

Commit

Permalink
fix dependencies distributor buildAttachedBinding Namespace
Browse files Browse the repository at this point in the history
Signed-off-by: huangyanfeng <huangyanfeng1992@gmail.com>
  • Loading branch information
yanfeng1992 committed Jan 29, 2023
1 parent 28223cf commit 67d5347
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 43 deletions.
70 changes: 27 additions & 43 deletions pkg/dependenciesdistributor/dependencies_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,36 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error {
return fmt.Errorf("invalid key")
}
klog.V(4).Infof("DependenciesDistributor start to reconcile object: %s", clusterWideKey)

bindingObjectList, err := d.resourceBindingLister.ByNamespace(clusterWideKey.Namespace).List(labels.Everything())
resourceBindingNamespaceKey := clusterWideKey.Namespace + "/" + names.GenerateBindingName(clusterWideKey.Kind, clusterWideKey.Name)
unstructuredObj, err := d.resourceBindingLister.Get(resourceBindingNamespaceKey)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("ResourceBinding(%s) not found.", resourceBindingNamespaceKey)
return nil
}
klog.Errorf("Failed to get ResourceBinding(%s): %v", resourceBindingNamespaceKey, err)
return err
}
bindingObject := &workv1alpha2.ResourceBinding{}
if err = helper.ConvertToTypedObject(unstructuredObj, bindingObject); err != nil {
klog.Errorf("Failed to convert ResourceBinding(%s) from unstructured object: %v", clusterWideKey.NamespaceKey(), err)
return err
}

var bindingObjectList []runtime.Object
for _, bindingSnapshot := range bindingObject.Spec.RequiredBy {
if bindingSnapshot.Namespace != "" {
binding, err := d.resourceBindingLister.Get(bindingSnapshot.Namespace + "/" + bindingSnapshot.Name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Errorf("ResourceBinding(%s) not found.", resourceBindingNamespaceKey)
continue
}
return err
}
bindingObjectList = append(bindingObjectList, binding)
}
}
bindingList, err := convertObjectsToResourceBindings(bindingObjectList)
if err != nil {
return err
Expand All @@ -185,17 +209,6 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error {
if !binding.DeletionTimestamp.IsZero() {
continue
}

matched, err := dependentObjectReferenceMatches(clusterWideKey, binding)
if err != nil {
klog.Errorf("Failed to evaluate if binding(%s/%s) need to sync dependencies: %v", binding.Namespace, binding.Name, err)
errs = append(errs, err)
continue
} else if !matched {
klog.V(4).Infof("No need to sync binding(%s/%s)", binding.Namespace, binding.Name)
continue
}

klog.V(4).Infof("Resource binding(%s/%s) is matched for resource(%s/%s)", binding.Namespace, binding.Name, clusterWideKey.Namespace, clusterWideKey.Name)
bindingKey, err := detector.ClusterWideKeyFunc(binding)
if err != nil {
Expand All @@ -210,35 +223,6 @@ func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error {
return utilerrors.NewAggregate(errs)
}

// dependentObjectReferenceMatches tells if the given object is referred by current resource binding.
func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBinding *workv1alpha2.ResourceBinding) (bool, error) {
dependencies, exist := referenceBinding.Annotations[bindingDependenciesAnnotationKey]
if !exist {
return false, nil
}

var dependenciesSlice []configv1alpha1.DependentObjectReference
err := json.Unmarshal([]byte(dependencies), &dependenciesSlice)
if err != nil {
return false, err
}

if len(dependenciesSlice) == 0 {
return false, nil
}

for _, dependence := range dependenciesSlice {
if objectKey.Version == dependence.APIVersion &&
objectKey.Kind == dependence.Kind &&
objectKey.Namespace == dependence.Namespace &&
objectKey.Name == dependence.Name {
return true, nil
}
}

return false, nil
}

// OnResourceBindingUpdate handles object update event and push the object to queue.
func (d *DependenciesDistributor) OnResourceBindingUpdate(oldObj, newObj interface{}) {
oldBindingObject := &workv1alpha2.ResourceBinding{}
Expand Down Expand Up @@ -549,7 +533,7 @@ func buildAttachedBinding(binding *workv1alpha2.ResourceBinding, object *unstruc
return &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: names.GenerateBindingName(object.GetKind(), object.GetName()),
Namespace: binding.GetNamespace(),
Namespace: object.GetNamespace(),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(object, object.GroupVersionKind()),
},
Expand Down
222 changes: 222 additions & 0 deletions test/e2e/dependenciesdistributor_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
package e2e

import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
"github.com/karmada-io/karmada/test/e2e/framework"
testhelper "github.com/karmada-io/karmada/test/helper"
Expand Down Expand Up @@ -326,4 +339,213 @@ var _ = ginkgo.Describe("[DependenciesDistributor] automatically propagate relev
})
})
})

ginkgo.Context("across namespace dependencies propagation testing", func() {
var crdGroup string
var randStr string
var crdSpecNames apiextensionsv1.CustomResourceDefinitionNames
var crd *apiextensionsv1.CustomResourceDefinition
var crdPolicy *policyv1alpha1.ClusterPropagationPolicy
var crNamespace, crName string
var crGVR schema.GroupVersionResource
var crAPIVersion string
var cr *unstructured.Unstructured
var crPolicy *policyv1alpha1.PropagationPolicy
var initClusterNames, updateClusterNames []string

ginkgo.BeforeEach(func() {
initClusterNames = []string{"member1"}
updateClusterNames = []string{"member2"}
crdGroup = fmt.Sprintf("example-%s.karmada.io", rand.String(RandomStrLength))
randStr = rand.String(RandomStrLength)
crdSpecNames = apiextensionsv1.CustomResourceDefinitionNames{
Kind: fmt.Sprintf("Foo%s", randStr),
ListKind: fmt.Sprintf("Foo%sList", randStr),
Plural: fmt.Sprintf("foo%ss", randStr),
Singular: fmt.Sprintf("foo%s", randStr),
}
crd = testhelper.NewCustomResourceDefinition(crdGroup, crdSpecNames, apiextensionsv1.NamespaceScoped)
crdPolicy = testhelper.NewClusterPropagationPolicy(crd.Name, []policyv1alpha1.ResourceSelector{
{
APIVersion: crd.APIVersion,
Kind: crd.Kind,
Name: crd.Name,
},
}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
ClusterNames: framework.ClusterNames(),
},
})
})

ginkgo.BeforeEach(func() {
framework.CreateClusterPropagationPolicy(karmadaClient, crdPolicy)
framework.CreateCRD(dynamicClient, crd)
framework.WaitCRDPresentOnClusters(karmadaClient, framework.ClusterNames(),
fmt.Sprintf("%s/%s", crd.Spec.Group, "v1alpha1"), crd.Spec.Names.Kind)
ginkgo.DeferCleanup(func() {
framework.RemoveClusterPropagationPolicy(karmadaClient, crdPolicy.Name)
framework.RemoveCRD(dynamicClient, crd.Name)
framework.WaitCRDDisappearedOnClusters(framework.ClusterNames(), crd.Name)
})
})

ginkgo.JustBeforeEach(func() {
ginkgo.By(fmt.Sprintf("create cr(%s/%s)", crNamespace, crName), func() {
_, err := dynamicClient.Resource(crGVR).Namespace(crNamespace).Create(context.TODO(), cr, metav1.CreateOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
framework.CreatePropagationPolicy(karmadaClient, crPolicy)
ginkgo.DeferCleanup(func() {
ginkgo.By(fmt.Sprintf("remove cr(%s/%s)", crNamespace, crName), func() {
err := dynamicClient.Resource(crGVR).Namespace(crNamespace).Delete(context.TODO(), crName, metav1.DeleteOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
framework.RemovePropagationPolicy(karmadaClient, crPolicy.Namespace, crPolicy.Name)
})
})

ginkgo.When("across namespace configmap propagate automatically", func() {
var configMapName string
var configMap *corev1.ConfigMap
var customizationConfigMap *configv1alpha1.ResourceInterpreterCustomization
ginkgo.BeforeEach(func() {
configMapName = configMapNamePrefix + rand.String(RandomStrLength)
configMap = testhelper.NewConfigMap("default", configMapName, map[string]string{"user": "karmada"})
crNamespace = testNamespace
crName = crdNamePrefix + rand.String(RandomStrLength)
crGVR = schema.GroupVersionResource{Group: crd.Spec.Group, Version: "v1alpha1", Resource: crd.Spec.Names.Plural}
crAPIVersion = fmt.Sprintf("%s/%s", crd.Spec.Group, "v1alpha1")
cr = testhelper.NewCustomResourceWithConfigMap(crAPIVersion, crd.Spec.Names.Kind, crNamespace, crName, configMapName)
customizationConfigMap = testhelper.NewResourceInterpreterCustomization(
"interpreter-customization"+rand.String(RandomStrLength),
configv1alpha1.CustomizationTarget{
APIVersion: cr.GetAPIVersion(),
Kind: cr.GetKind(),
},
configv1alpha1.CustomizationRules{
DependencyInterpretation: &configv1alpha1.DependencyInterpretation{
LuaScript: `
function GetDependencies(desiredObj)
dependencies = {}
if desiredObj.spec.resource ~= nil and desiredObj.spec.resource.kind == 'ConfigMap' then
dependObj = {}
dependObj.apiVersion = 'v1'
dependObj.kind = 'ConfigMap'
dependObj.name = desiredObj.spec.resource.name
dependObj.namespace = desiredObj.spec.resource.namespace
dependencies[1] = dependObj
end
return dependencies
end `,
},
})
crPolicy = testhelper.NewPropagationPolicy(crNamespace, crName, []policyv1alpha1.ResourceSelector{
{
APIVersion: crAPIVersion,
Kind: crd.Spec.Names.Kind,
Name: crName,
},
}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
ClusterNames: initClusterNames,
},
})
crPolicy.Spec.PropagateDeps = true
})

ginkgo.BeforeEach(func() {
framework.CreateConfigMap(kubeClient, configMap)
framework.CreateResourceInterpreterCustomization(karmadaClient, customizationConfigMap)
// Wait for resource interpreter informer synced.
time.Sleep(time.Second)
ginkgo.DeferCleanup(func() {
framework.RemoveConfigMap(kubeClient, configMap.Namespace, configMapName)
framework.DeleteResourceInterpreterCustomization(karmadaClient, customizationConfigMap.Name)
})
})

ginkgo.It("across namespace configmap automatically propagation testing", func() {
ginkgo.By("check if cr present on member clusters", func() {
for _, cluster := range initClusterNames {
clusterDynamicClient := framework.GetClusterDynamicClient(cluster)
gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil())
klog.Infof("Waiting for cr(%s/%s) present on cluster(%s)", crNamespace, crName, cluster)
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
_, err = clusterDynamicClient.Resource(crGVR).Namespace(crNamespace).Get(context.TODO(), crName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
})

ginkgo.By("check if the configmap is propagated automatically", func() {
framework.WaitConfigMapPresentOnClustersFitWith(initClusterNames, configMap.Namespace, configMapName,
func(configmap *corev1.ConfigMap) bool {
return true
})
})

ginkgo.By("updating propagation policy's clusterNames", func() {
patch := []map[string]interface{}{
{
"op": "replace",
"path": "/spec/placement/clusterAffinity/clusterNames",
"value": updateClusterNames,
},
}

framework.PatchPropagationPolicy(karmadaClient, crPolicy.Namespace, crPolicy.Name, patch, types.JSONPatchType)
ginkgo.By("check if cr present on member clusters", func() {
for _, cluster := range updateClusterNames {
clusterDynamicClient := framework.GetClusterDynamicClient(cluster)
gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil())
klog.Infof("Waiting for cr(%s/%s) present on cluster(%s)", crNamespace, crName, cluster)
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
_, err = clusterDynamicClient.Resource(crGVR).Namespace(crNamespace).Get(context.TODO(), crName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
})
framework.WaitConfigMapPresentOnClustersFitWith(updateClusterNames, configMap.Namespace, configMapName,
func(configmap *corev1.ConfigMap) bool {
return true
})
})
ginkgo.By("updating configmap's data", func() {
patch := []map[string]interface{}{
{
"op": "replace",
"path": "/data/user",
"value": "karmada-e2e",
},
}

framework.UpdateConfigMapWithPatch(kubeClient, configMap.Namespace, configMapName, patch, types.JSONPatchType)
framework.WaitConfigMapPresentOnClustersFitWith(updateClusterNames, configMap.Namespace, configMapName,
func(configmap *corev1.ConfigMap) bool {
for key, value := range configmap.Data {
if key == "user" && value == "karmada-e2e" {
return true
}
}
return false
})
})
})
})
})
})
23 changes: 23 additions & 0 deletions test/helper/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,29 @@ func NewCustomResource(apiVersion, kind, namespace, name string) *unstructured.U
}
}

// NewCustomResourceWithConfigMap will build a CR object depended on across ns ConfigMap.
func NewCustomResourceWithConfigMap(apiVersion, kind, namespace, name, configMapName string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiVersion,
"kind": kind,
"metadata": map[string]string{
"namespace": namespace,
"name": name,
},
"spec": map[string]interface{}{
"resource": map[string]string{
"apiVersion": "v1",
"kind": "ConfigMap",
"name": configMapName,
"namespace": "default",
},
"clusters": []map[string]string{},
},
},
}
}

// NewJob will build a job object.
func NewJob(namespace string, name string) *batchv1.Job {
return &batchv1.Job{
Expand Down

0 comments on commit 67d5347

Please sign in to comment.