diff --git a/kubeless-rbac.jsonnet b/kubeless-rbac.jsonnet index 5325fdb6e..8a334de14 100644 --- a/kubeless-rbac.jsonnet +++ b/kubeless-rbac.jsonnet @@ -30,7 +30,7 @@ local controller_roles = [ { apiGroups: ["kubeless.io"], resources: ["functions", "kafkatriggers", "httptriggers", "cronjobtriggers"], - verbs: ["get", "list", "watch", "update"], + verbs: ["get", "list", "watch", "update", "delete"], }, { apiGroups: ["batch"], diff --git a/pkg/controller/cronjob_trigger_controller.go b/pkg/controller/cronjob_trigger_controller.go index b1093c24e..694e5735a 100644 --- a/pkg/controller/cronjob_trigger_controller.go +++ b/pkg/controller/cronjob_trigger_controller.go @@ -203,13 +203,10 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error { } // CronJob Trigger object should be deleted, so remove associated cronjob and remove the finalizer - _, err := c.clientset.BatchV2alpha1().CronJobs(ns).Get(fmt.Sprintf("trigger-%s", name), metav1.GetOptions{}) - if err == nil { - err = c.clientset.BatchV2alpha1().CronJobs(ns).Delete(fmt.Sprintf("trigger-%s", name), &metav1.DeleteOptions{}) - if err != nil && !k8sErrors.IsNotFound(err) { - c.logger.Errorf("Failed to remove CronJob created for CronJobTrigger Obj: %s due to: %v: ", key, err) - return err - } + err = c.clientset.BatchV1beta1().CronJobs(ns).Delete(name, &metav1.DeleteOptions{}) + if err != nil && !k8sErrors.IsNotFound(err) { + c.logger.Errorf("Failed to remove CronJob created for CronJobTrigger Obj: %s due to: %v: ", key, err) + return err } // remove finalizer from the cronjob trigger object, so that we dont have to process any further and object can be deleted @@ -237,18 +234,12 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error { return err } - restIface := c.clientset.BatchV2alpha1().RESTClient() - groupVersion, err := c.getResouceGroupVersion("cronjobs") - if err != nil { - return err - } - functionObj, err := c.functionInformer.Lister().Functions(ns).Get(cronJobtriggerObj.Spec.FunctionName) if err != nil { c.logger.Errorf("Unable to find the function %s in the namespace %s. Received %s: ", cronJobtriggerObj.Spec.FunctionName, ns, err) return err } - err = utils.EnsureCronJob(restIface, functionObj, cronJobtriggerObj, or, groupVersion) + err = utils.EnsureCronJob(c.clientset, functionObj, cronJobtriggerObj.Spec.Schedule, or) if err != nil { return err } @@ -257,53 +248,41 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error { return nil } -func (c *CronJobTriggerController) getResouceGroupVersion(target string) (string, error) { - resources, err := c.clientset.Discovery().ServerResources() - if err != nil { - return "", err - } - groupVersion := "" - for _, resource := range resources { - for _, apiResource := range resource.APIResources { - if apiResource.Name == target { - groupVersion = resource.GroupVersion - break - } - } - } - if groupVersion == "" { - return "", fmt.Errorf("Resource %s not found in any group", target) - } - return groupVersion, nil -} - -func (c *CronJobTriggerController) functionAddedDeletedUpdated(obj interface{}, deleted bool) { +func (c *CronJobTriggerController) functionAddedDeletedUpdated(obj interface{}, deleted bool) error { functionObj, ok := obj.(*kubelessApi.Function) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - c.logger.Errorf("Couldn't get object from tombstone %#v", obj) - return + err := fmt.Errorf("Couldn't get object from tombstone %#v", obj) + c.logger.Errorf(err.Error()) + return err } functionObj, ok = tombstone.Obj.(*kubelessApi.Function) if !ok { - c.logger.Errorf("Tombstone contained object that is not a Pod %#v", obj) - return + err := fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj) + c.logger.Errorf(err.Error()) + return err } } c.logger.Infof("Processing update to function object %s Namespace: %s", functionObj.Name, functionObj.Namespace) if deleted { - //check if func is scheduled or not - cronJobName := fmt.Sprintf("trigger-%s", functionObj.ObjectMeta.Name) - _, err := c.clientset.BatchV2alpha1().CronJobs(functionObj.ObjectMeta.Namespace).Get(cronJobName, metav1.GetOptions{}) - if err == nil { - err = c.clientset.BatchV2alpha1().CronJobs(functionObj.ObjectMeta.Namespace).Delete(cronJobName, &metav1.DeleteOptions{}) - if err != nil && !k8sErrors.IsNotFound(err) { - c.logger.Errorf("Failed to delete cronjob %s created for the function %s in namespace %s, Error: %s", cronJobName, functionObj.ObjectMeta.Name, functionObj.ObjectMeta.Namespace, err) + c.logger.Infof("Function %s deleted. Removing associated cronjob trigger", functionObj.Name) + cjtList, err := c.kubelessclient.KubelessV1beta1().CronJobTriggers(functionObj.Namespace).List(metav1.ListOptions{}) + if err != nil { + return err + } + for _, cjt := range cjtList.Items { + if cjt.Spec.FunctionName == functionObj.Name { + err = c.kubelessclient.KubelessV1beta1().CronJobTriggers(functionObj.Namespace).Delete(cjt.Name, &metav1.DeleteOptions{}) + if err != nil && !k8sErrors.IsNotFound(err) { + c.logger.Errorf("Failed to delete cronjobtrigger created for the function %s in namespace %s, Error: %s", functionObj.ObjectMeta.Name, functionObj.ObjectMeta.Namespace, err) + return err + } } } } + return nil } func (c *CronJobTriggerController) cronJobTriggerObjHasFinalizer(triggerObj *kubelessApi.CronJobTrigger) bool { @@ -351,9 +330,8 @@ func cronJobTriggerObjChanged(oldObj, newObj *kubelessApi.CronJobTrigger) bool { if oldObj.ResourceVersion != newObj.ResourceVersion { return true } - newSpec := &newObj.Spec - oldSpec := &oldObj.Spec - + newSpec := newObj.Spec + oldSpec := oldObj.Spec if newSpec.Schedule != oldSpec.Schedule { return true } diff --git a/pkg/controller/cronjob_trigger_controller_test.go b/pkg/controller/cronjob_trigger_controller_test.go new file mode 100644 index 000000000..eb0b59826 --- /dev/null +++ b/pkg/controller/cronjob_trigger_controller_test.go @@ -0,0 +1,150 @@ +package controller + +import ( + "testing" + "time" + + kubelessApi "github.com/kubeless/kubeless/pkg/apis/kubeless/v1beta1" + kubelessFake "github.com/kubeless/kubeless/pkg/client/clientset/versioned/fake" + "github.com/sirupsen/logrus" + batchv1beta1 "k8s.io/api/batch/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func TestFunctionAddedUpdated(t *testing.T) { + myNsFoo := metav1.ObjectMeta{ + Namespace: "myns", + Name: "foo", + } + + f := kubelessApi.Function{ + ObjectMeta: myNsFoo, + } + + cjtrigger := kubelessApi.CronJobTrigger{ + ObjectMeta: myNsFoo, + } + + triggerClientset := kubelessFake.NewSimpleClientset(&f, &cjtrigger) + + cronjob := batchv1beta1.CronJob{ + ObjectMeta: myNsFoo, + } + clientset := fake.NewSimpleClientset(&cronjob) + + controller := CronJobTriggerController{ + clientset: clientset, + kubelessclient: triggerClientset, + logger: logrus.WithField("controller", "cronjob-trigger-controller"), + } + + // no-op for when the function is not deleted + err := controller.functionAddedDeletedUpdated(&f, false) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + list, err := controller.kubelessclient.KubelessV1beta1().CronJobTriggers("myns").List(metav1.ListOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if len(list.Items) != 1 || list.Items[0].ObjectMeta.Name != "foo" { + t.Errorf("Missing trigger in list: %v", list.Items) + } +} + +func TestFunctionDeleted(t *testing.T) { + myNsFoo := metav1.ObjectMeta{ + Namespace: "myns", + Name: "foo", + } + + f := kubelessApi.Function{ + ObjectMeta: myNsFoo, + } + + cjtrigger := kubelessApi.CronJobTrigger{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "myns", + Name: "foo-trigger", + }, + Spec: kubelessApi.CronJobTriggerSpec{ + FunctionName: "foo", + }, + } + + triggerClientset := kubelessFake.NewSimpleClientset(&f, &cjtrigger) + + cronjob := batchv1beta1.CronJob{ + ObjectMeta: myNsFoo, + } + clientset := fake.NewSimpleClientset(&cronjob) + + controller := CronJobTriggerController{ + clientset: clientset, + kubelessclient: triggerClientset, + logger: logrus.WithField("controller", "cronjob-trigger-controller"), + } + + // no-op for when the function is not deleted + err := controller.functionAddedDeletedUpdated(&f, true) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + list, err := controller.kubelessclient.KubelessV1beta1().CronJobTriggers("myns").List(metav1.ListOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if len(list.Items) != 0 { + t.Errorf("Trigger should be deleted from list: %v", list.Items) + } +} + +func TestCronJobTriggerObjChanged(t *testing.T) { + type testObj struct { + old *kubelessApi.CronJobTrigger + new *kubelessApi.CronJobTrigger + expectedChanged bool + } + t1 := metav1.Time{ + Time: time.Now(), + } + t2 := metav1.Time{ + Time: time.Now(), + } + testObjs := []testObj{ + { + old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + expectedChanged: false, + }, + { + old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &t1}}, + new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &t2}}, + expectedChanged: true, + }, + { + old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + expectedChanged: true, + }, + { + old: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}}, + new: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}}, + expectedChanged: false, + }, + { + old: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "*/10 * * * *"}}, + new: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}}, + expectedChanged: true, + }, + } + for _, to := range testObjs { + changed := cronJobTriggerObjChanged(to.old, to.new) + if changed != to.expectedChanged { + t.Errorf("%v != %v expected to be %v", to.old, to.new, to.expectedChanged) + } + } +} diff --git a/pkg/utils/k8sutil.go b/pkg/utils/k8sutil.go index 854b9fc4e..c5426d168 100644 --- a/pkg/utils/k8sutil.go +++ b/pkg/utils/k8sutil.go @@ -36,7 +36,7 @@ import ( "k8s.io/api/autoscaling/v2beta1" batchv1 "k8s.io/api/batch/v1" - batchv2alpha1 "k8s.io/api/batch/v2alpha1" + batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/api/core/v1" "k8s.io/api/extensions/v1beta1" clientsetAPIExtensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -1093,7 +1093,7 @@ func doRESTReq(restIface rest.Interface, groupVersion, verb, resource, elem, nam } // EnsureCronJob creates/updates a function cron job -func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJobObj *kubelessApi.CronJobTrigger, or []metav1.OwnerReference, groupVersion string) error { +func EnsureCronJob(client kubernetes.Interface, funcObj *kubelessApi.Function, schedule string, or []metav1.OwnerReference) error { var maxSucccessfulHist, maxFailedHist int32 maxSucccessfulHist = 3 maxFailedHist = 1 @@ -1119,18 +1119,18 @@ func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJob headersString = headersString + " -H \"event-time: " + timestamp.String() + "\"" headersString = headersString + " -H \"event-type: application/json\"" headersString = headersString + " -H \"event-namespace: cronjobtrigger.kubeless.io\"" - job := &batchv2alpha1.CronJob{ + job := &batchv1beta1.CronJob{ ObjectMeta: metav1.ObjectMeta{ Name: jobName, Namespace: funcObj.ObjectMeta.Namespace, Labels: funcObj.ObjectMeta.Labels, OwnerReferences: or, }, - Spec: batchv2alpha1.CronJobSpec{ - Schedule: cronJobObj.Spec.Schedule, + Spec: batchv1beta1.CronJobSpec{ + Schedule: schedule, SuccessfulJobsHistoryLimit: &maxSucccessfulHist, FailedJobsHistoryLimit: &maxFailedHist, - JobTemplate: batchv2alpha1.JobTemplateSpec{ + JobTemplate: batchv1beta1.JobTemplateSpec{ Spec: batchv1.JobSpec{ ActiveDeadlineSeconds: &activeDeadlineSeconds, Template: v1.PodTemplateSpec{ @@ -1152,17 +1152,17 @@ func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJob // We need to use directly the REST API since the endpoint // for CronJobs changes from Kubernetes 1.8 - err = doRESTReq(client, groupVersion, "create", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, job, nil) + _, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Create(job) if err != nil && k8sErrors.IsAlreadyExists(err) { - newCronJob := batchv2alpha1.CronJob{} - err = doRESTReq(client, groupVersion, "get", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, nil, &newCronJob) + newCronJob := &batchv1beta1.CronJob{} + newCronJob, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Get(jobName, metav1.GetOptions{}) if err != nil { return err } newCronJob.ObjectMeta.Labels = funcObj.ObjectMeta.Labels newCronJob.ObjectMeta.OwnerReferences = or newCronJob.Spec = job.Spec - err = doRESTReq(client, groupVersion, "update", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, &newCronJob, nil) + _, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Update(newCronJob) } return err } diff --git a/pkg/utils/k8sutil_test.go b/pkg/utils/k8sutil_test.go index fc9902e79..12d3938d8 100644 --- a/pkg/utils/k8sutil_test.go +++ b/pkg/utils/k8sutil_test.go @@ -16,7 +16,6 @@ import ( "github.com/kubeless/kubeless/pkg/langruntime" v2beta1 "k8s.io/api/autoscaling/v2beta1" - batchv2alpha1 "k8s.io/api/batch/v2alpha1" "k8s.io/api/core/v1" "k8s.io/api/extensions/v1beta1" xv1beta1 "k8s.io/api/extensions/v1beta1" @@ -26,7 +25,6 @@ import ( "k8s.io/apimachinery/pkg/apimachinery" "k8s.io/apimachinery/pkg/apimachinery/registered" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" @@ -636,145 +634,55 @@ func TestEnsureCronJob(t *testing.T) { Timeout: "120", }, } - c := &kubelessApi.CronJobTrigger{ - ObjectMeta: metav1.ObjectMeta{ - Name: f1Name, - Namespace: ns, - }, - Spec: kubelessApi.CronJobTriggerSpec{ - Schedule: "*/10 * * * *", - FunctionName: f1Name, - }, - } expectedMeta := metav1.ObjectMeta{ Name: "trigger-" + f1Name, Namespace: ns, OwnerReferences: or, } - client := fakeRESTClient(func(req *http.Request) (*http.Response, error) { - header := http.Header{} - header.Set("Content-Type", runtime.ContentTypeJSON) - listObj := batchv2alpha1.CronJobList{} - if req.Method == "POST" { - reqCronJobBytes, err := ioutil.ReadAll(req.Body) - if err != nil { - t.Fatal(err) - } - cronJob := batchv2alpha1.CronJob{} - err = json.Unmarshal(reqCronJobBytes, &cronJob) - if err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(expectedMeta, cronJob.ObjectMeta) { - t.Errorf("Unexpected metadata metadata. Expecting\n%+v \nReceived:\n%+v", expectedMeta, cronJob.ObjectMeta) - } - if *cronJob.Spec.SuccessfulJobsHistoryLimit != int32(3) { - t.Errorf("Unexpected SuccessfulJobsHistoryLimit: %d", *cronJob.Spec.SuccessfulJobsHistoryLimit) - } - if *cronJob.Spec.FailedJobsHistoryLimit != int32(1) { - t.Errorf("Unexpected FailedJobsHistoryLimit: %d", *cronJob.Spec.FailedJobsHistoryLimit) - } - if *cronJob.Spec.JobTemplate.Spec.ActiveDeadlineSeconds != int64(120) { - t.Errorf("Unexpected ActiveDeadlineSeconds: %d", *cronJob.Spec.JobTemplate.Spec.ActiveDeadlineSeconds) - } - expectedCommand := []string{"curl", "-Lv", fmt.Sprintf("http://%s.%s.svc.cluster.local:8080", f1Name, ns)} - args := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Args - // skip event headers data (i.e -H "event-id: cronjob-controller-2018-03-05T05:55:41.990784027Z" etc) - foundCommand := []string{args[0], args[1], args[len(args)-1]} - if !reflect.DeepEqual(foundCommand, expectedCommand) { - t.Errorf("Unexpected command %s expexted %s", foundCommand, expectedCommand) - } - } else { - t.Fatalf("unexpected verb %s", req.Method) - } - switch req.URL.Path { - case "/apis/batch/v2alpha1/namespaces/default/cronjobs": - return &http.Response{ - StatusCode: 200, - Header: header, - Body: objBody(&listObj), - }, nil - default: - t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) - return nil, nil - } - }) - err := EnsureCronJob(client, f1, c, or, "batch/v2alpha1") + clientset := fake.NewSimpleClientset() + + err := EnsureCronJob(clientset, f1, "* * * * *", or) + if err != nil { + t.Errorf("Unexpected error: %s", err) + } + cronJob, err := clientset.BatchV1beta1().CronJobs(ns).Get(fmt.Sprintf("trigger-%s", f1.Name), metav1.GetOptions{}) if err != nil { t.Errorf("Unexpected error: %s", err) } + if !reflect.DeepEqual(expectedMeta, cronJob.ObjectMeta) { + t.Errorf("Unexpected metadata metadata. Expecting\n%+v \nReceived:\n%+v", expectedMeta, cronJob.ObjectMeta) + } + if *cronJob.Spec.SuccessfulJobsHistoryLimit != int32(3) { + t.Errorf("Unexpected SuccessfulJobsHistoryLimit: %d", *cronJob.Spec.SuccessfulJobsHistoryLimit) + } + if *cronJob.Spec.FailedJobsHistoryLimit != int32(1) { + t.Errorf("Unexpected FailedJobsHistoryLimit: %d", *cronJob.Spec.FailedJobsHistoryLimit) + } + if *cronJob.Spec.JobTemplate.Spec.ActiveDeadlineSeconds != int64(120) { + t.Errorf("Unexpected ActiveDeadlineSeconds: %d", *cronJob.Spec.JobTemplate.Spec.ActiveDeadlineSeconds) + } + expectedCommand := []string{"curl", "-Lv", fmt.Sprintf("http://%s.%s.svc.cluster.local:8080", f1Name, ns)} + args := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Args + // skip event headers data (i.e -H "event-id: cronjob-controller-2018-03-05T05:55:41.990784027Z" etc) + foundCommand := []string{args[0], args[1], args[len(args)-1]} + if !reflect.DeepEqual(foundCommand, expectedCommand) { + t.Errorf("Unexpected command %s expexted %s", foundCommand, expectedCommand) + } // It should update the existing cronJob if it is already created - updateCalled := false - client = fakeRESTClient(func(req *http.Request) (*http.Response, error) { - header := http.Header{} - header.Set("Content-Type", runtime.ContentTypeJSON) - switch req.Method { - case "POST": - return &http.Response{ - StatusCode: http.StatusConflict, - Header: header, - Body: objBody(nil), - }, nil - case "GET": - previousCronJob := batchv2alpha1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "123456", - }, - } - return &http.Response{ - StatusCode: 200, - Header: header, - Body: objBody(&previousCronJob), - }, nil - case "PUT": - updateCalled = true - reqCronJobBytes, err := ioutil.ReadAll(req.Body) - if err != nil { - t.Fatal(err) - } - cronJob := batchv2alpha1.CronJob{} - err = json.Unmarshal(reqCronJobBytes, &cronJob) - if err != nil { - t.Fatal(err) - } - if cronJob.ObjectMeta.ResourceVersion != "123456" { - t.Error("Expecting that the object to update contains the previous information") - } - listObj := batchv2alpha1.CronJobList{} - return &http.Response{ - StatusCode: 200, - Header: header, - Body: objBody(&listObj), - }, nil - default: - t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) - return nil, nil - } - }) - err = EnsureCronJob(client, f1, c, or, "batch/v2alpha1") + newSchedule := "*/10 * * * *" + err = EnsureCronJob(clientset, f1, newSchedule, or) if err != nil { t.Errorf("Unexpected error: %s", err) } - if !updateCalled { - t.Errorf("Expect the update method to be called") + updatedCronJob, err := clientset.BatchV1beta1().CronJobs(ns).Get(fmt.Sprintf("trigger-%s", f1.Name), metav1.GetOptions{}) + if err != nil { + t.Errorf("Unexpected error: %s", err) + } + if updatedCronJob.Spec.Schedule != newSchedule { + t.Errorf("Unexpected schedule %s expecting %s", updatedCronJob.Spec.Schedule, newSchedule) } - - // IT should change the endpoint - client = fakeRESTClient(func(req *http.Request) (*http.Response, error) { - header := http.Header{} - header.Set("Content-Type", runtime.ContentTypeJSON) - if req.URL.Path != "/apis/batch/v1beta1/namespaces/default/cronjobs" { - t.Errorf("Unexpected URL %s", req.URL.Path) - } - return &http.Response{ - StatusCode: 200, - Header: header, - Body: objBody(nil), - }, nil - }) - err = EnsureCronJob(client, f1, c, or, "batch/v1beta1") } func doesNotContain(envs []v1.EnvVar, env v1.EnvVar) bool { diff --git a/script/libtest.bash b/script/libtest.bash index 754fb30f6..38726fd8e 100644 --- a/script/libtest.bash +++ b/script/libtest.bash @@ -335,4 +335,16 @@ sts_restart() { k8s_wait_for_uniq_pod -l kubeless=kafka -n kubeless wait_for_kubeless_kafka_server_ready } +verify_clean_object() { + local type=${1:?}; shift + local name=${1:?}; shift + echo_info "Checking if "${type}" exists for function "${name}"... " + local -i cnt=${TEST_MAX_WAIT_SEC:?} + until [[ ! $(kubectl get ${type} 2>&1 | grep ${name}) ]]; do + ((cnt=cnt-1)) || return 1 + sleep 1 + echo_info "$(kubectl get ${type} 2>&1 | grep ${name})" + done + echo_info "${type}/${name} is gone" +} # vim: sw=4 ts=4 et si diff --git a/tests/integration-tests.bats b/tests/integration-tests.bats index a8ce4b597..ce3fb22bc 100644 --- a/tests/integration-tests.bats +++ b/tests/integration-tests.bats @@ -161,6 +161,8 @@ load ../script/libtest # without having to wait verify_function scheduled-get-python kubeless_function_delete scheduled-get-python + verify_clean_object cronjobtrigger scheduled-get-python + verify_clean_object cronjob trigger-scheduled-get-python } @test "Test no-errors" { if kubectl logs -n kubeless -l kubeless=controller | grep "level=error"; then