Skip to content

Commit

Permalink
feat: Provide support for explicitly pausing autoscaling of ScaledJobs
Browse files Browse the repository at this point in the history
--- Accreditation ---

Original PR: #3828

Borrows from the work originally implemented by: https://github.com/keegancwinchester

Note: I would have loved to have pulled the commit from the original branch, but I could not be able to.

Documentation MR by original implementor: kedacore/keda-docs#932

--- Fixes ---

Fixes # #3656

--- PR Notes ---

Introduce annotation to pause ScaledJobs.

Signed-off-by: BenJ <benjamin.jessop@fluxfederation.com>
  • Loading branch information
flux-benj committed May 23, 2023
1 parent 4019401 commit a676efb
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269))
- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277))
- **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))
- **General:** Introduce annotation `autoscaling.keda.sh/paused: true` for ScaledJobs to pause autoscaling ([#3303](https://github.com/kedacore/keda/issues/3303))

### Improvements

Expand Down
2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type ScaledJobStatus struct {
LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"`
// +optional
Conditions Conditions `json:"conditions,omitempty"`
// +optional
Paused string `json:"Paused,omitempty"`
}

// ScaledJobList contains a list of ScaledJob
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8220,6 +8220,8 @@ spec:
status:
description: ScaledJobStatus defines the observed state of ScaledJob
properties:
Paused:
type: string
conditions:
description: Conditions an array representation to store multiple
Conditions
Expand Down
21 changes: 20 additions & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scaling"
Expand Down Expand Up @@ -84,7 +85,11 @@ func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options control
WithOptions(options).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(
predicate.Or(
kedacontrollerutil.PausedPredicate{},
predicate.GenerationChangedPredicate{},
))).
Complete(r)
}

Expand Down Expand Up @@ -129,6 +134,12 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}

// Check if scaledJob.Status.Paused constant is not an empty string, if not an empty string then stopScaleLoop
if scaledJob.Status.Paused != "" {
reqLogger.Info("ScaledJob is paused, stopping scale loop")
return ctrl.Result{}, r.Pause(ctx, reqLogger, scaledJob)
}

// Check jobTargetRef is specified
if scaledJob.Spec.JobTargetRef == nil {
errMsg := "scaledJob.spec.jobTargetRef is not set"
Expand Down Expand Up @@ -313,3 +324,11 @@ func (r *ScaledJobReconciler) updatePromMetricsOnDelete(namespacedName string) {

delete(scaledJobPromMetricsMap, namespacedName)
}

func (r *ScaledJobReconciler) Pause(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
if scaledJob.Annotations["autoscaling.keda.sh/paused"] == "true" {
logger.V(1).Info("Stopping a ScaleLoop when scaledJob is paused")
return r.stopScaleLoop(ctx, logger, scaledJob)
}
return nil
}
34 changes: 34 additions & 0 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"

const Paused = "autoscaling.keda.sh/paused"

type PausedReplicasPredicate struct {
predicate.Funcs
}
Expand Down Expand Up @@ -61,3 +63,35 @@ func (ScaleObjectReadyConditionPredicate) Update(e event.UpdateEvent) bool {

return false
}

type PausedPredicate struct {
predicate.Funcs
}

func (PausedPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

newAnnotations := e.ObjectNew.GetAnnotations()
oldAnnotations := e.ObjectOld.GetAnnotations()

newPausedValue, newPausedPresent := "", false
oldPausedValue, oldPausedPresent := "", false

if newAnnotations != nil {
newPausedValue, newPausedPresent = newAnnotations[Paused]
}

if oldAnnotations != nil {
oldPausedValue, oldPausedPresent = oldAnnotations[Paused]
}

if newPausedPresent {
if oldPausedPresent {
return newPausedValue != oldPausedValue
}
return true
}
return false
}
3 changes: 3 additions & 0 deletions pkg/eventreason/eventreason.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const (
// KEDAScalersStopped is for event when scalers watch was stopped for ScaledObject or ScaledJob
KEDAScalersStopped = "KEDAScalersStopped"

// KEDAScalersPaused is for event when scalers watch was paused for ScaledJob
KEDAScalersPaused = "KEDAScalersPaused"

// KEDAScalerFailed is for event when a scaler fails for a ScaledJob or a ScaledObject
KEDAScalerFailed = "KEDAScalerFailed"

Expand Down
20 changes: 20 additions & 0 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const (
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)

isPaused := e.Paused(ctx, logger, scaledJob)

if isPaused {
logger.Info("ScaledJob is paused")
return
}

runningJobCount := e.getRunningJobCount(ctx, scaledJob)
pendingJobCount := e.getPendingJobCount(ctx, scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)
Expand Down Expand Up @@ -424,3 +431,16 @@ func min(x, y int64) int64 {
}
return x
}

// Check if scaledJob has Paused annotation
func (e *scaleExecutor) Paused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) bool {
if scaledJob.Annotations["autoscaling.keda.sh/paused"] == "true" {
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAScalersPaused, "Autoscaler is paused via annotation")
err := e.client.Status().Update(ctx, scaledJob)
if err != nil {
logger.Error(err, "Failed to update scaledjob status")
}
return true
}
return false
}
182 changes: 182 additions & 0 deletions tests/internals/pause_scaledjob/pause_scaledjob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//go:build e2e
// +build e2e

// go test -v -tags e2e ./internals/pause_scaledjob/pause_scaledjob_test.go

package pause_scaledjob_test

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper"
)

// Load environment variables from .env file

const (
testName = "pause-scaledjob-test"
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
serviceName = fmt.Sprintf("%s-service", testName)
scalerName = fmt.Sprintf("%s-scaler", testName)
scaledJobName = fmt.Sprintf("%s-sj", testName)
minReplicaCount = 0
maxReplicaCount = 10
iterationCountInitial = 15
iterationCountLatter = 30
)

type templateData struct {
TestNamespace string
ServiceName string
ScalerName string
ScaledJobName string
MinReplicaCount, MaxReplicaCount int
MetricThreshold, MetricValue int
}

const (
serviceTemplate = `
apiVersion: v1
kind: Service
metadata:
name: {{.ServiceName}}
namespace: {{.TestNamespace}}
spec:
ports:
- port: 6000
targetPort: 6000
selector:
app: {{.ScalerName}}
`

scalerTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.ScalerName}}
namespace: {{.TestNamespace}}
labels:
app: {{.ScalerName}}
spec:
replicas: 1
selector:
matchLabels:
app: {{.ScalerName}}
template:
metadata:
labels:
app: {{.ScalerName}}
spec:
containers:
- name: scaler
image: ghcr.io/kedacore/tests-external-scaler-e2e:latest
imagePullPolicy: Always
ports:
- containerPort: 6000
`

scaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
spec:
pollingInterval: 5
maxReplicaCount: {{.MaxReplicaCount}}
minReplicaCount: {{.MinReplicaCount}}
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "15"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
triggers:
- type: external
metadata:
scalerAddress: {{.ServiceName}}.{{.TestNamespace}}:6000
metricThreshold: "{{.MetricThreshold}}"
metricValue: "{{.MetricValue}}"
`
)

func TestScaler(t *testing.T) {
// setup
t.Log("--- setting up ---")

// Create kubernetes resources
kc := GetKubernetesClient(t)
metricValue := 1

data, templates := getTemplateData(metricValue)

CreateKubernetesResources(t, kc, testNamespace, data, templates)

assert.True(t, WaitForJobCountUntilIteration(t, kc, testNamespace, data.MetricThreshold, iterationCountInitial, 1),
"job count should be %d after %d iterations", data.MetricThreshold, iterationCountInitial)

// test scaling
testPause(t, kc)
testUnpause(t, kc, data)

// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}

func getTemplateData(metricValue int) (templateData, []Template) {
return templateData{
TestNamespace: testNamespace,
ScaledJobName: scaledJobName,
ScalerName: scalerName,
ServiceName: serviceName,
MinReplicaCount: minReplicaCount,
MaxReplicaCount: maxReplicaCount,
MetricThreshold: 1,
MetricValue: metricValue,
}, []Template{
{Name: "scalerTemplate", Config: scalerTemplate},
{Name: "serviceTemplate", Config: serviceTemplate},
{Name: "scaledJobTemplate", Config: scaledJobTemplate},
}
}

func testPause(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing Paused annotation ---")

_, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused=true", scaledJobName))
assert.NoErrorf(t, err, "cannot execute command - %s", err)

t.Log("job count does not change as job is paused")

expectedTarget := 0
assert.True(t, WaitForJobCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1),
"job count should be %d after %d iterations", expectedTarget, iterationCountLatter)
}

func testUnpause(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing removing Paused annotation ---")

_, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused-", scaledJobName))
assert.NoErrorf(t, err, "cannot execute command - %s", err)

t.Log("job count increases from zero as job is no longer paused")

expectedTarget := data.MetricThreshold
assert.True(t, WaitForJobCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1),
"job count should be %d after %d iterations", expectedTarget, iterationCountLatter)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//go:build e2e
// +build e2e

package pause_scaling_test
package pause_scaledobject_test

import (
"fmt"
Expand All @@ -16,7 +16,7 @@ import (
// Load environment variables from .env file

const (
testName = "pause-scaling-test"
testName = "pause-scaledobject-test"
)

var (
Expand Down

0 comments on commit a676efb

Please sign in to comment.