Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Minor refactor for cronjobtrigger. Add some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Andres committed Mar 22, 2018
1 parent cf3f36e commit 0759396
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 60 deletions.
73 changes: 22 additions & 51 deletions pkg/controller/cronjob_trigger_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,10 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
}

// CronJob Trigger object should be deleted, so remove associated cronjob and remove the finalizer
_, err := c.clientset.BatchV2alpha1().CronJobs(ns).Get(fmt.Sprintf("trigger-%s", name), metav1.GetOptions{})
if err == nil {
err = c.clientset.BatchV2alpha1().CronJobs(ns).Delete(fmt.Sprintf("trigger-%s", name), &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to remove CronJob created for CronJobTrigger Obj: %s due to: %v: ", key, err)
return err
}
err = c.clientset.BatchV1beta1().CronJobs(ns).Delete(name, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to remove CronJob created for CronJobTrigger Obj: %s due to: %v: ", key, err)
return err
}

// remove finalizer from the cronjob trigger object, so that we dont have to process any further and object can be deleted
Expand Down Expand Up @@ -237,18 +234,12 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
return err
}

restIface := c.clientset.BatchV2alpha1().RESTClient()
groupVersion, err := c.getResouceGroupVersion("cronjobs")
if err != nil {
return err
}

functionObj, err := c.functionInformer.Lister().Functions(ns).Get(cronJobtriggerObj.Spec.FunctionName)
if err != nil {
c.logger.Errorf("Unable to find the function %s in the namespace %s. Received %s: ", cronJobtriggerObj.Spec.FunctionName, ns, err)
return err
}
err = utils.EnsureCronJob(restIface, functionObj, cronJobtriggerObj, or, groupVersion)
err = utils.EnsureCronJob(c.clientset, functionObj, cronJobtriggerObj, or)
if err != nil {
return err
}
Expand All @@ -257,53 +248,33 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
return nil
}

func (c *CronJobTriggerController) getResouceGroupVersion(target string) (string, error) {
resources, err := c.clientset.Discovery().ServerResources()
if err != nil {
return "", err
}
groupVersion := ""
for _, resource := range resources {
for _, apiResource := range resource.APIResources {
if apiResource.Name == target {
groupVersion = resource.GroupVersion
break
}
}
}
if groupVersion == "" {
return "", fmt.Errorf("Resource %s not found in any group", target)
}
return groupVersion, nil
}

func (c *CronJobTriggerController) functionAddedDeletedUpdated(obj interface{}, deleted bool) {
func (c *CronJobTriggerController) functionAddedDeletedUpdated(obj interface{}, deleted bool) error {
functionObj, ok := obj.(*kubelessApi.Function)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
c.logger.Errorf("Couldn't get object from tombstone %#v", obj)
return
err := fmt.Errorf("Couldn't get object from tombstone %#v", obj)
c.logger.Errorf(err.Error())
return err
}
functionObj, ok = tombstone.Obj.(*kubelessApi.Function)
if !ok {
c.logger.Errorf("Tombstone contained object that is not a Pod %#v", obj)
return
err := fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj)
c.logger.Errorf(err.Error())
return err
}
}

c.logger.Infof("Processing update to function object %s Namespace: %s", functionObj.Name, functionObj.Namespace)
if deleted {
//check if func is scheduled or not
cronJobName := fmt.Sprintf("trigger-%s", functionObj.ObjectMeta.Name)
_, err := c.clientset.BatchV2alpha1().CronJobs(functionObj.ObjectMeta.Namespace).Get(cronJobName, metav1.GetOptions{})
if err == nil {
err = c.clientset.BatchV2alpha1().CronJobs(functionObj.ObjectMeta.Namespace).Delete(cronJobName, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to delete cronjob %s created for the function %s in namespace %s, Error: %s", cronJobName, functionObj.ObjectMeta.Name, functionObj.ObjectMeta.Namespace, err)
}
c.logger.Infof("Function %s deleted. Removing associated cronjob trigger", functionObj.Name)
err := c.kubelessclient.KubelessV1beta1().CronJobTriggers(functionObj.Namespace).Delete(functionObj.Name, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to delete cronjobtrigger created for the function %s in namespace %s, Error: %s", functionObj.ObjectMeta.Name, functionObj.ObjectMeta.Namespace, err)
return err
}
}
return nil
}

func (c *CronJobTriggerController) cronJobTriggerObjHasFinalizer(triggerObj *kubelessApi.CronJobTrigger) bool {
Expand Down Expand Up @@ -349,12 +320,12 @@ func cronJobTriggerObjChanged(oldObj, newObj *kubelessApi.CronJobTrigger) bool {
}
// If the new and old CronJob trigger object's resource version is same
if oldObj.ResourceVersion != newObj.ResourceVersion {
return true
return false
}
newSpec := &newObj.Spec
oldSpec := &oldObj.Spec

newSpec := newObj.Spec
oldSpec := oldObj.Spec
if newSpec.Schedule != oldSpec.Schedule {
logrus.Info("changed!")
return true
}

Expand Down
139 changes: 139 additions & 0 deletions pkg/controller/cronjob_trigger_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package controller

import (
"testing"
"time"

kubelessApi "github.com/kubeless/kubeless/pkg/apis/kubeless/v1beta1"
kubelessFake "github.com/kubeless/kubeless/pkg/client/clientset/versioned/fake"
"github.com/sirupsen/logrus"
batchv1beta1 "k8s.io/api/batch/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

func TestFunctionAddedUpdated(t *testing.T) {
myNsFoo := metav1.ObjectMeta{
Namespace: "myns",
Name: "foo",
}

f := kubelessApi.Function{
ObjectMeta: myNsFoo,
}

cjtrigger := kubelessApi.CronJobTrigger{
ObjectMeta: myNsFoo,
}

triggerClientset := kubelessFake.NewSimpleClientset(&f, &cjtrigger)

cronjob := batchv1beta1.CronJob{
ObjectMeta: myNsFoo,
}
clientset := fake.NewSimpleClientset(&cronjob)

controller := CronJobTriggerController{
clientset: clientset,
kubelessclient: triggerClientset,
logger: logrus.WithField("controller", "cronjob-trigger-controller"),
}

// no-op for when the function is not deleted
err := controller.functionAddedDeletedUpdated(&f, false)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

list, err := controller.kubelessclient.KubelessV1beta1().CronJobTriggers("myns").List(metav1.ListOptions{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(list.Items) != 1 || list.Items[0].ObjectMeta.Name != "foo" {
t.Errorf("Missing trigger in list: %v", list.Items)
}
}

func TestFunctionDeleted(t *testing.T) {
myNsFoo := metav1.ObjectMeta{
Namespace: "myns",
Name: "foo",
}

f := kubelessApi.Function{
ObjectMeta: myNsFoo,
}

cjtrigger := kubelessApi.CronJobTrigger{
ObjectMeta: myNsFoo,
}

triggerClientset := kubelessFake.NewSimpleClientset(&f, &cjtrigger)

cronjob := batchv1beta1.CronJob{
ObjectMeta: myNsFoo,
}
clientset := fake.NewSimpleClientset(&cronjob)

controller := CronJobTriggerController{
clientset: clientset,
kubelessclient: triggerClientset,
logger: logrus.WithField("controller", "cronjob-trigger-controller"),
}

// no-op for when the function is not deleted
err := controller.functionAddedDeletedUpdated(&f, true)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

list, err := controller.kubelessclient.KubelessV1beta1().CronJobTriggers("myns").List(metav1.ListOptions{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(list.Items) != 0 {
t.Errorf("Trigger should be deleted from list: %v", list.Items)
}
}

func TestCronJobTriggerObjChanged(t *testing.T) {
type testObj struct {
old *kubelessApi.CronJobTrigger
new *kubelessApi.CronJobTrigger
expectedChanged bool
}
t1 := metav1.Time{
Time: time.Now(),
}
t2 := metav1.Time{
Time: time.Now(),
}
testObjs := []testObj{
testObj{
old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
expectedChanged: false,
},
testObj{
old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &t1}},
new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &t2}},
expectedChanged: true,
},
testObj{
old: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
new: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
expectedChanged: false,
},
testObj{
old: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "*/10 * * * *"}},
new: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
expectedChanged: true,
},
}
for _, to := range testObjs {
changed := cronJobTriggerObjChanged(to.old, to.new)
if changed != to.expectedChanged {
t.Errorf("%v != %v expected to be %v", to.old, to.new, to.expectedChanged)
}
}
}
18 changes: 9 additions & 9 deletions pkg/utils/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

"k8s.io/api/autoscaling/v2beta1"
batchv1 "k8s.io/api/batch/v1"
batchv2alpha1 "k8s.io/api/batch/v2alpha1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
clientsetAPIExtensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -960,7 +960,7 @@ func doRESTReq(restIface rest.Interface, groupVersion, verb, resource, elem, nam
}

// EnsureCronJob creates/updates a function cron job
func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJobObj *kubelessApi.CronJobTrigger, or []metav1.OwnerReference, groupVersion string) error {
func EnsureCronJob(client kubernetes.Interface, funcObj *kubelessApi.Function, cronJobObj *kubelessApi.CronJobTrigger, or []metav1.OwnerReference) error {
var maxSucccessfulHist, maxFailedHist int32
maxSucccessfulHist = 3
maxFailedHist = 1
Expand All @@ -986,18 +986,18 @@ func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJob
headersString = headersString + " -H \"event-time: " + timestamp.String() + "\""
headersString = headersString + " -H \"event-type: application/json\""
headersString = headersString + " -H \"event-namespace: cronjobtrigger.kubeless.io\""
job := &batchv2alpha1.CronJob{
job := &batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: funcObj.ObjectMeta.Namespace,
Labels: funcObj.ObjectMeta.Labels,
OwnerReferences: or,
},
Spec: batchv2alpha1.CronJobSpec{
Spec: batchv1beta1.CronJobSpec{
Schedule: cronJobObj.Spec.Schedule,
SuccessfulJobsHistoryLimit: &maxSucccessfulHist,
FailedJobsHistoryLimit: &maxFailedHist,
JobTemplate: batchv2alpha1.JobTemplateSpec{
JobTemplate: batchv1beta1.JobTemplateSpec{
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &activeDeadlineSeconds,
Template: v1.PodTemplateSpec{
Expand All @@ -1019,17 +1019,17 @@ func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJob

// We need to use directly the REST API since the endpoint
// for CronJobs changes from Kubernetes 1.8
err = doRESTReq(client, groupVersion, "create", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, job, nil)
_, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Create(job)
if err != nil && k8sErrors.IsAlreadyExists(err) {
newCronJob := batchv2alpha1.CronJob{}
err = doRESTReq(client, groupVersion, "get", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, nil, &newCronJob)
newCronJob := &batchv1beta1.CronJob{}
newCronJob, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
return err
}
newCronJob.ObjectMeta.Labels = funcObj.ObjectMeta.Labels
newCronJob.ObjectMeta.OwnerReferences = or
newCronJob.Spec = job.Spec
err = doRESTReq(client, groupVersion, "update", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, &newCronJob, nil)
_, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Update(newCronJob)
}
return err
}
Expand Down
11 changes: 11 additions & 0 deletions script/libtest.bash
Original file line number Diff line number Diff line change
Expand Up @@ -335,4 +335,15 @@ sts_restart() {
k8s_wait_for_uniq_pod -l kubeless=kafka -n kubeless
wait_for_kubeless_kafka_server_ready
}
verify_clean_object() {
local type=${1:?}; shift
local name=${1:?}; shift
echo_info "Checking if "${type}" exists for function "${name}"... "
local -i cnt=${TEST_MAX_WAIT_SEC:?}
until [[ ! $(kubectl get ${type} | grep ${name}) ]]; do
((cnt=cnt-1)) || return 1
sleep 1
done
echo_info "${type}/${name} is gone"
}
# vim: sw=4 ts=4 et si
2 changes: 2 additions & 0 deletions tests/integration-tests.bats
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ load ../script/libtest
# without having to wait
verify_function scheduled-get-python
kubeless_function_delete scheduled-get-python
verify_clean_object cronjobtrigger scheduled-get-python
verify_clean_object cronjob trigger-scheduled-get-python
}
@test "Test no-errors" {
if kubectl logs -n kubeless -l kubeless=controller | grep "level=error"; then
Expand Down

0 comments on commit 0759396

Please sign in to comment.