Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Minor refactor for cronjobtrigger. Add some tests (#640)
Browse files Browse the repository at this point in the history
* Minor refactor for cronjobtrigger. Add some tests

* Fix tests

* Add debug info

* Allow trigger deletion with RBAC

* Review
  • Loading branch information
andresmgot committed Mar 26, 2018
1 parent ecfb926 commit fab33b7
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 186 deletions.
2 changes: 1 addition & 1 deletion kubeless-rbac.jsonnet
Expand Up @@ -30,7 +30,7 @@ local controller_roles = [
{
apiGroups: ["kubeless.io"],
resources: ["functions", "kafkatriggers", "httptriggers", "cronjobtriggers"],
verbs: ["get", "list", "watch", "update"],
verbs: ["get", "list", "watch", "update", "delete"],
},
{
apiGroups: ["batch"],
Expand Down
76 changes: 27 additions & 49 deletions pkg/controller/cronjob_trigger_controller.go
Expand Up @@ -203,13 +203,10 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
}

// CronJob Trigger object should be deleted, so remove associated cronjob and remove the finalizer
_, err := c.clientset.BatchV2alpha1().CronJobs(ns).Get(fmt.Sprintf("trigger-%s", name), metav1.GetOptions{})
if err == nil {
err = c.clientset.BatchV2alpha1().CronJobs(ns).Delete(fmt.Sprintf("trigger-%s", name), &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to remove CronJob created for CronJobTrigger Obj: %s due to: %v: ", key, err)
return err
}
err = c.clientset.BatchV1beta1().CronJobs(ns).Delete(name, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to remove CronJob created for CronJobTrigger Obj: %s due to: %v: ", key, err)
return err
}

// remove finalizer from the cronjob trigger object, so that we dont have to process any further and object can be deleted
Expand Down Expand Up @@ -237,18 +234,12 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
return err
}

restIface := c.clientset.BatchV2alpha1().RESTClient()
groupVersion, err := c.getResouceGroupVersion("cronjobs")
if err != nil {
return err
}

functionObj, err := c.functionInformer.Lister().Functions(ns).Get(cronJobtriggerObj.Spec.FunctionName)
if err != nil {
c.logger.Errorf("Unable to find the function %s in the namespace %s. Received %s: ", cronJobtriggerObj.Spec.FunctionName, ns, err)
return err
}
err = utils.EnsureCronJob(restIface, functionObj, cronJobtriggerObj, or, groupVersion)
err = utils.EnsureCronJob(c.clientset, functionObj, cronJobtriggerObj.Spec.Schedule, or)
if err != nil {
return err
}
Expand All @@ -257,53 +248,41 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
return nil
}

func (c *CronJobTriggerController) getResouceGroupVersion(target string) (string, error) {
resources, err := c.clientset.Discovery().ServerResources()
if err != nil {
return "", err
}
groupVersion := ""
for _, resource := range resources {
for _, apiResource := range resource.APIResources {
if apiResource.Name == target {
groupVersion = resource.GroupVersion
break
}
}
}
if groupVersion == "" {
return "", fmt.Errorf("Resource %s not found in any group", target)
}
return groupVersion, nil
}

func (c *CronJobTriggerController) functionAddedDeletedUpdated(obj interface{}, deleted bool) {
func (c *CronJobTriggerController) functionAddedDeletedUpdated(obj interface{}, deleted bool) error {
functionObj, ok := obj.(*kubelessApi.Function)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
c.logger.Errorf("Couldn't get object from tombstone %#v", obj)
return
err := fmt.Errorf("Couldn't get object from tombstone %#v", obj)
c.logger.Errorf(err.Error())
return err
}
functionObj, ok = tombstone.Obj.(*kubelessApi.Function)
if !ok {
c.logger.Errorf("Tombstone contained object that is not a Pod %#v", obj)
return
err := fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj)
c.logger.Errorf(err.Error())
return err
}
}

c.logger.Infof("Processing update to function object %s Namespace: %s", functionObj.Name, functionObj.Namespace)
if deleted {
//check if func is scheduled or not
cronJobName := fmt.Sprintf("trigger-%s", functionObj.ObjectMeta.Name)
_, err := c.clientset.BatchV2alpha1().CronJobs(functionObj.ObjectMeta.Namespace).Get(cronJobName, metav1.GetOptions{})
if err == nil {
err = c.clientset.BatchV2alpha1().CronJobs(functionObj.ObjectMeta.Namespace).Delete(cronJobName, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to delete cronjob %s created for the function %s in namespace %s, Error: %s", cronJobName, functionObj.ObjectMeta.Name, functionObj.ObjectMeta.Namespace, err)
c.logger.Infof("Function %s deleted. Removing associated cronjob trigger", functionObj.Name)
cjtList, err := c.kubelessclient.KubelessV1beta1().CronJobTriggers(functionObj.Namespace).List(metav1.ListOptions{})
if err != nil {
return err
}
for _, cjt := range cjtList.Items {
if cjt.Spec.FunctionName == functionObj.Name {
err = c.kubelessclient.KubelessV1beta1().CronJobTriggers(functionObj.Namespace).Delete(cjt.Name, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to delete cronjobtrigger created for the function %s in namespace %s, Error: %s", functionObj.ObjectMeta.Name, functionObj.ObjectMeta.Namespace, err)
return err
}
}
}
}
return nil
}

func (c *CronJobTriggerController) cronJobTriggerObjHasFinalizer(triggerObj *kubelessApi.CronJobTrigger) bool {
Expand Down Expand Up @@ -351,9 +330,8 @@ func cronJobTriggerObjChanged(oldObj, newObj *kubelessApi.CronJobTrigger) bool {
if oldObj.ResourceVersion != newObj.ResourceVersion {
return true
}
newSpec := &newObj.Spec
oldSpec := &oldObj.Spec

newSpec := newObj.Spec
oldSpec := oldObj.Spec
if newSpec.Schedule != oldSpec.Schedule {
return true
}
Expand Down
150 changes: 150 additions & 0 deletions pkg/controller/cronjob_trigger_controller_test.go
@@ -0,0 +1,150 @@
package controller

import (
"testing"
"time"

kubelessApi "github.com/kubeless/kubeless/pkg/apis/kubeless/v1beta1"
kubelessFake "github.com/kubeless/kubeless/pkg/client/clientset/versioned/fake"
"github.com/sirupsen/logrus"
batchv1beta1 "k8s.io/api/batch/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

func TestFunctionAddedUpdated(t *testing.T) {
myNsFoo := metav1.ObjectMeta{
Namespace: "myns",
Name: "foo",
}

f := kubelessApi.Function{
ObjectMeta: myNsFoo,
}

cjtrigger := kubelessApi.CronJobTrigger{
ObjectMeta: myNsFoo,
}

triggerClientset := kubelessFake.NewSimpleClientset(&f, &cjtrigger)

cronjob := batchv1beta1.CronJob{
ObjectMeta: myNsFoo,
}
clientset := fake.NewSimpleClientset(&cronjob)

controller := CronJobTriggerController{
clientset: clientset,
kubelessclient: triggerClientset,
logger: logrus.WithField("controller", "cronjob-trigger-controller"),
}

// no-op for when the function is not deleted
err := controller.functionAddedDeletedUpdated(&f, false)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

list, err := controller.kubelessclient.KubelessV1beta1().CronJobTriggers("myns").List(metav1.ListOptions{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(list.Items) != 1 || list.Items[0].ObjectMeta.Name != "foo" {
t.Errorf("Missing trigger in list: %v", list.Items)
}
}

func TestFunctionDeleted(t *testing.T) {
myNsFoo := metav1.ObjectMeta{
Namespace: "myns",
Name: "foo",
}

f := kubelessApi.Function{
ObjectMeta: myNsFoo,
}

cjtrigger := kubelessApi.CronJobTrigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "myns",
Name: "foo-trigger",
},
Spec: kubelessApi.CronJobTriggerSpec{
FunctionName: "foo",
},
}

triggerClientset := kubelessFake.NewSimpleClientset(&f, &cjtrigger)

cronjob := batchv1beta1.CronJob{
ObjectMeta: myNsFoo,
}
clientset := fake.NewSimpleClientset(&cronjob)

controller := CronJobTriggerController{
clientset: clientset,
kubelessclient: triggerClientset,
logger: logrus.WithField("controller", "cronjob-trigger-controller"),
}

// no-op for when the function is not deleted
err := controller.functionAddedDeletedUpdated(&f, true)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

list, err := controller.kubelessclient.KubelessV1beta1().CronJobTriggers("myns").List(metav1.ListOptions{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(list.Items) != 0 {
t.Errorf("Trigger should be deleted from list: %v", list.Items)
}
}

func TestCronJobTriggerObjChanged(t *testing.T) {
type testObj struct {
old *kubelessApi.CronJobTrigger
new *kubelessApi.CronJobTrigger
expectedChanged bool
}
t1 := metav1.Time{
Time: time.Now(),
}
t2 := metav1.Time{
Time: time.Now(),
}
testObjs := []testObj{
{
old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
expectedChanged: false,
},
{
old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &t1}},
new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &t2}},
expectedChanged: true,
},
{
old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
expectedChanged: true,
},
{
old: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
new: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
expectedChanged: false,
},
{
old: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "*/10 * * * *"}},
new: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
expectedChanged: true,
},
}
for _, to := range testObjs {
changed := cronJobTriggerObjChanged(to.old, to.new)
if changed != to.expectedChanged {
t.Errorf("%v != %v expected to be %v", to.old, to.new, to.expectedChanged)
}
}
}
20 changes: 10 additions & 10 deletions pkg/utils/k8sutil.go
Expand Up @@ -36,7 +36,7 @@ import (

"k8s.io/api/autoscaling/v2beta1"
batchv1 "k8s.io/api/batch/v1"
batchv2alpha1 "k8s.io/api/batch/v2alpha1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
clientsetAPIExtensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func doRESTReq(restIface rest.Interface, groupVersion, verb, resource, elem, nam
}

// EnsureCronJob creates/updates a function cron job
func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJobObj *kubelessApi.CronJobTrigger, or []metav1.OwnerReference, groupVersion string) error {
func EnsureCronJob(client kubernetes.Interface, funcObj *kubelessApi.Function, schedule string, or []metav1.OwnerReference) error {
var maxSucccessfulHist, maxFailedHist int32
maxSucccessfulHist = 3
maxFailedHist = 1
Expand All @@ -1119,18 +1119,18 @@ func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJob
headersString = headersString + " -H \"event-time: " + timestamp.String() + "\""
headersString = headersString + " -H \"event-type: application/json\""
headersString = headersString + " -H \"event-namespace: cronjobtrigger.kubeless.io\""
job := &batchv2alpha1.CronJob{
job := &batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: funcObj.ObjectMeta.Namespace,
Labels: funcObj.ObjectMeta.Labels,
OwnerReferences: or,
},
Spec: batchv2alpha1.CronJobSpec{
Schedule: cronJobObj.Spec.Schedule,
Spec: batchv1beta1.CronJobSpec{
Schedule: schedule,
SuccessfulJobsHistoryLimit: &maxSucccessfulHist,
FailedJobsHistoryLimit: &maxFailedHist,
JobTemplate: batchv2alpha1.JobTemplateSpec{
JobTemplate: batchv1beta1.JobTemplateSpec{
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &activeDeadlineSeconds,
Template: v1.PodTemplateSpec{
Expand All @@ -1152,17 +1152,17 @@ func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJob

// We need to use directly the REST API since the endpoint
// for CronJobs changes from Kubernetes 1.8
err = doRESTReq(client, groupVersion, "create", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, job, nil)
_, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Create(job)
if err != nil && k8sErrors.IsAlreadyExists(err) {
newCronJob := batchv2alpha1.CronJob{}
err = doRESTReq(client, groupVersion, "get", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, nil, &newCronJob)
newCronJob := &batchv1beta1.CronJob{}
newCronJob, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
return err
}
newCronJob.ObjectMeta.Labels = funcObj.ObjectMeta.Labels
newCronJob.ObjectMeta.OwnerReferences = or
newCronJob.Spec = job.Spec
err = doRESTReq(client, groupVersion, "update", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, &newCronJob, nil)
_, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Update(newCronJob)
}
return err
}
Expand Down

0 comments on commit fab33b7

Please sign in to comment.