Skip to content

Commit

Permalink
refactor: several functions for Status & Conditions handling into pkg…
Browse files Browse the repository at this point in the history
… util functions

Signed-off-by: Tobo Atchou <tobo.atchou@gmail.com>
  • Loading branch information
tobotg committed May 1, 2023
1 parent 57908a4 commit 7a59733
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 64 deletions.
5 changes: 3 additions & 2 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/scaling/executor"
kedautil "github.com/kedacore/keda/v2/pkg/util"
version "github.com/kedacore/keda/v2/version"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func (r *ScaledObjectReconciler) createAndDeployNewHPA(ctx context.Context, logg
status := scaledObject.Status.DeepCopy()
status.HpaName = hpaName

err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
err = kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "Error updating scaledObject status with used hpaName")
return err
Expand Down Expand Up @@ -237,7 +238,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context,

updateHealthStatus(scaledObject, externalMetricNames, status)

err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
err = kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "Error updating scaledObject status with used externalMetricNames")
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// +kubebuilder:rbac:groups=keda.sh,resources=scaledjobs;scaledjobs/finalizers;scaledjobs/status,verbs="*"
Expand Down Expand Up @@ -124,7 +124,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// ensure Status Conditions are initialized
if !scaledJob.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil {
if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil {
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil {
if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, err
Expand Down
7 changes: 4 additions & 3 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects;scaledobjects/finalizers;scaledobjects/status,verbs="*"
Expand Down Expand Up @@ -166,7 +167,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
// ensure Status Conditions are initialized
if !scaledObject.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -188,7 +189,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySucccesReason, msg)
}

if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -325,7 +326,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
status.PausedReplicaCount = nil
}

if err := kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil {
if err := kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil {
return gvkr, err
}
logger.Info("Detected resource targeted for scaling", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
Expand Down
74 changes: 35 additions & 39 deletions pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
Expand Down Expand Up @@ -62,52 +63,47 @@ func NewScaleExecutor(client runtimeclient.Client, scaleClient scale.ScalesGette
}

func (e *scaleExecutor) updateLastActiveTime(ctx context.Context, logger logr.Logger, object interface{}) error {
var patch runtimeclient.Patch

now := metav1.Now()
runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.LastActiveTime = &now
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.LastActiveTime = &now
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status")
return err
}

err := e.client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status")
transform := func(runtimeObj runtimeclient.Object, target interface{}) error {
now, ok := target.(metav1.Time)
if !ok {
return fmt.Errorf("transform target is not metav1.Time type %v", target)
}
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
obj.Status.LastActiveTime = &now
case *kedav1alpha1.ScaledJob:
obj.Status.LastActiveTime = &now
default:
}
return nil
}
return err
return kedautil.TransformObject(ctx, e.client, logger, object, now, transform)
}

func (e *scaleExecutor) setCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string, setCondition func(kedav1alpha1.Conditions, metav1.ConditionStatus, string, string)) error {
var patch runtimeclient.Patch

runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
setCondition(obj.Status.Conditions, status, reason, message)
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
setCondition(obj.Status.Conditions, status, reason, message)
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status")
return err
type transformStruct struct {
status metav1.ConditionStatus
reason string
message string
}

err := e.client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status")
transform := func(runtimeObj runtimeclient.Object, target interface{}) error {
transformObj := target.(*transformStruct)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
setCondition(obj.Status.Conditions, transformObj.status, transformObj.reason, transformObj.message)
case *kedav1alpha1.ScaledJob:
setCondition(obj.Status.Conditions, transformObj.status, transformObj.reason, transformObj.message)
default:
}
return nil
}
target := transformStruct{
status: status,
reason: reason,
message: message,
}
return err
return kedautil.TransformObject(ctx, e.client, logger, object, &target, transform)
}

func (e *scaleExecutor) setReadyCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) {
Expand Down Expand Up @@ -106,7 +107,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
return
}
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
err = kedautil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
Expand Down
63 changes: 47 additions & 16 deletions controllers/keda/util/status.go → pkg/util/status.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2021 The KEDA Authors
Copyright 2023 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,36 +28,67 @@ import (

// SetStatusConditions patches given object with passed list of conditions based on the object's type or returns an error.
func SetStatusConditions(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error {
transform := func(runtimeObj runtimeclient.Object, target interface{}) error {
conditions, ok := target.(*kedav1alpha1.Conditions)
if !ok {
return fmt.Errorf("transform target is not kedav1alpha1.Conditions type %v", target)
}
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
obj.Status.Conditions = *conditions
case *kedav1alpha1.ScaledJob:
obj.Status.Conditions = *conditions
default:
}
return nil
}
return TransformObject(ctx, client, logger, object, conditions, transform)
}

// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error.
func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
transform := func(runtimeObj runtimeclient.Object, target interface{}) error {
status, ok := target.(*kedav1alpha1.ScaledObjectStatus)
if !ok {
return fmt.Errorf("transform target is not kedav1alpha1.ScaledObjectStatus type %v", target)
}
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
obj.Status = *status
default:
}
return nil
}
return TransformObject(ctx, client, logger, scaledObject, status, transform)
}

// TransformObject patches the given object with the targeted passed to it through a transformer function or returns an error.
func TransformObject(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, target interface{}, transform func(runtimeclient.Object, interface{}) error) error {
var patch runtimeclient.Patch

runtimeObj := object.(runtimeclient.Object)
switch obj := runtimeObj.(type) {
case *kedav1alpha1.ScaledObject:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.Conditions = *conditions
if err := transform(obj, target); err != nil {
logger.Error(err, "Failed to patch ScaledObject")
return err
}
case *kedav1alpha1.ScaledJob:
patch = runtimeclient.MergeFrom(obj.DeepCopy())
obj.Status.Conditions = *conditions
if err := transform(obj, target); err != nil {
logger.Error(err, "Failed to patch ScaledJob")
return err
}
default:
err := fmt.Errorf("unknown scalable object type %v", obj)
logger.Error(err, "Failed to patch Objects Status with Conditions")
logger.Error(err, "Failed to patch Objects")
return err
}

err := client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status with Conditions")
}
return err
}

// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error.
func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
patch := runtimeclient.MergeFrom(scaledObject.DeepCopy())
scaledObject.Status = *status
err := client.Status().Patch(ctx, scaledObject, patch)
if err != nil {
logger.Error(err, "Failed to patch ScaledObjects Status")
logger.Error(err, "Failed to patch Objects")
}
return err
}

0 comments on commit 7a59733

Please sign in to comment.