Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Provide support for explicitly pausing autoscaling of ScaledJobs #4558

Merged
merged 2 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277))
- **Redis Scalers**: Allow scaling using consumer group lag ([#3127](https://github.com/kedacore/keda/issues/3127))
- **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))
- **General**: Introduce annotation `autoscaling.keda.sh/paused: true` for ScaledJobs to pause autoscaling ([#3303](https://github.com/kedacore/keda/issues/3303))
- **Admission Webhooks**: Support ScaledObject taking over existing HPAs with the same name while they are not managed by other ScaledObject ([#4457](https://github.com/kedacore/keda/issues/4457))

### Improvements
Expand Down
11 changes: 11 additions & 0 deletions apis/keda/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ const (
ScaledObjectConditionPausedMessage = "ScaledObject is paused"
)

const (
// ScaledJobConditionPausedReason defines the default Reason for paused ScaledJob
ScaledJobConditionPausedReason = "ScaledJobPaused"
// ScaledJobConditionPausedReason defines the default Reason for paused ScaledJob
ScaledJobConditionUnpausedReason = "ScaledJobUnpaused"
// ScaledJobConditionPausedMessage defines the default Message for paused ScaledJob
ScaledJobConditionPausedMessage = "ScaledJob is paused"
// ScaledJobConditionPausedMessage defines the default Message for paused ScaledJob
ScaledJobConditionUnpausedMessage = "ScaledJob is unpaused"
)

// Condition to store the condition state
type Condition struct {
// Type of condition
Expand Down
3 changes: 3 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// +kubebuilder:printcolumn:name="Authentication",type="string",JSONPath=".spec.triggers[*].authenticationRef.name"
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status"
// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status"
// +kubebuilder:printcolumn:name="Paused",type="string",JSONPath=".status.conditions[?(@.type==\"Paused\")].status"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// ScaledJob is the Schema for the scaledjobs API
Expand Down Expand Up @@ -78,6 +79,8 @@ type ScaledJobStatus struct {
LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"`
// +optional
Conditions Conditions `json:"conditions,omitempty"`
// +optional
Paused string `json:"Paused,omitempty"`
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
}

// ScaledJobList contains a list of ScaledJob
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ spec:
- jsonPath: .status.conditions[?(@.type=="Active")].status
name: Active
type: string
- jsonPath: .status.conditions[?(@.type=="Paused")].status
name: Paused
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
Expand Down Expand Up @@ -8264,6 +8267,8 @@ spec:
status:
description: ScaledJobStatus defines the observed state of ScaledJob
properties:
Paused:
tomkerkhove marked this conversation as resolved.
Show resolved Hide resolved
type: string
conditions:
description: Conditions an array representation to store multiple
Conditions
Expand Down
44 changes: 41 additions & 3 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scaling"
Expand Down Expand Up @@ -84,7 +85,11 @@ func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options control
WithOptions(options).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(
predicate.Or(
kedacontrollerutil.PausedPredicate{},
predicate.GenerationChangedPredicate{},
))).
Complete(r)
}

Expand Down Expand Up @@ -136,8 +141,8 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found")
return ctrl.Result{}, err
}
msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob)
conditions := scaledJob.Status.Conditions.DeepCopy()
msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob, &conditions)
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
Expand All @@ -159,7 +164,15 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob
func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (string, error) {
isPaused, err := r.checkIfPaused(ctx, logger, scaledJob, conditions)
if err != nil {
return "Failed to check if ScaledJob was paused", err
}
if isPaused {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
return "ScaledJob is paused, skipping reconcile loop", err
}

// nosemgrep: trailofbits.go.invalid-usage-of-modified-variable.invalid-usage-of-modified-variable
msg, err := r.deletePreviousVersionScaleJobs(ctx, logger, scaledJob)
if err != nil {
Expand Down Expand Up @@ -193,6 +206,31 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log
return "ScaledJob is defined correctly and is ready to scaling", nil
}

// checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stop the scale loop.
func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) {
_, pausedAnnotation := scaledJob.GetAnnotations()[kedacontrollerutil.PausedAnnotation]
pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue
if pausedAnnotation {
if !pausedStatus {
logger.Info("ScaledJob is paused, stopping scaling loop.")
msg := kedav1alpha1.ScaledJobConditionPausedMessage
if err := r.stopScaleLoop(ctx, logger, scaledJob); err != nil {
msg = "failed to stop the scale loop for paused ScaledJob"
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledJobStopScaleLoopFailed", msg)
return false, err
}
conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledJobConditionPausedReason, msg)
}
return true, nil
}
if pausedStatus {
logger.Info("Unpausing ScaledJob.")
msg := kedav1alpha1.ScaledJobConditionUnpausedMessage
conditions.SetPausedCondition(metav1.ConditionFalse, kedav1alpha1.ScaledJobConditionUnpausedReason, msg)
}
return false, nil
}

// Delete Jobs owned by the previous version of the scaledJob based on the rolloutStrategy given for this scaledJob, if any
func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
var rolloutStrategy string
Expand Down
28 changes: 28 additions & 0 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"

const PausedAnnotation = "autoscaling.keda.sh/paused"

type PausedReplicasPredicate struct {
predicate.Funcs
}
Expand Down Expand Up @@ -61,3 +63,29 @@ func (ScaleObjectReadyConditionPredicate) Update(e event.UpdateEvent) bool {

return false
}

type PausedPredicate struct {
predicate.Funcs
}

func (PausedPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

newAnnotations := e.ObjectNew.GetAnnotations()
oldAnnotations := e.ObjectOld.GetAnnotations()

newPausedValue := ""
oldPausedValue := ""

if newAnnotations != nil {
newPausedValue = newAnnotations[PausedAnnotation]
}

if oldAnnotations != nil {
oldPausedValue = oldAnnotations[PausedAnnotation]
}

return newPausedValue != oldPausedValue
}
213 changes: 213 additions & 0 deletions tests/internals/pause_scaledjob/pause_scaledjob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
//go:build e2e
// +build e2e

// go test -v -tags e2e ./internals/pause_scaledjob/pause_scaledjob_test.go

package pause_scaledjob_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper"
)

// Load environment variables from .env file

const (
testName = "pause-scaledjob-test"
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
serviceName = fmt.Sprintf("%s-service", testName)
scalerName = fmt.Sprintf("%s-scaler", testName)
scaledJobName = fmt.Sprintf("%s-sj", testName)
minReplicaCount = 0
maxReplicaCount = 3
iterationCountInitial = 15
iterationCountLatter = 30
)

type templateData struct {
TestNamespace string
ServiceName string
ScalerName string
ScaledJobName string
MinReplicaCount, MaxReplicaCount int
MetricThreshold, MetricValue int
}

const (
serviceTemplate = `
apiVersion: v1
kind: Service
metadata:
name: {{.ServiceName}}
namespace: {{.TestNamespace}}
spec:
ports:
- port: 6000
targetPort: 6000
selector:
app: {{.ScalerName}}
`

scalerTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.ScalerName}}
namespace: {{.TestNamespace}}
labels:
app: {{.ScalerName}}
spec:
replicas: 1
selector:
matchLabels:
app: {{.ScalerName}}
template:
metadata:
labels:
app: {{.ScalerName}}
spec:
containers:
- name: scaler
image: ghcr.io/kedacore/tests-external-scaler-e2e:latest
imagePullPolicy: Always
ports:
- containerPort: 6000
`

scaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
spec:
pollingInterval: 5
maxReplicaCount: {{.MaxReplicaCount}}
minReplicaCount: {{.MinReplicaCount}}
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "15"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
triggers:
- type: external
metadata:
scalerAddress: {{.ServiceName}}.{{.TestNamespace}}:6000
metricThreshold: "{{.MetricThreshold}}"
metricValue: "{{.MetricValue}}"
`
)

// Util function
func WaitForJobByFilterCountUntilIteration(t *testing.T, kc *kubernetes.Clientset, namespace string,
target, iterations, intervalSeconds int, listOptions metav1.ListOptions) bool {
var isTargetAchieved = false

for i := 0; i < iterations; i++ {
jobList, _ := kc.BatchV1().Jobs(namespace).List(context.Background(), listOptions)
count := len(jobList.Items)

t.Logf("Waiting for job count to hit target. Namespace - %s, Current - %d, Target - %d",
namespace, count, target)

if count == target {
isTargetAchieved = true
} else {
isTargetAchieved = false
}

time.Sleep(time.Duration(intervalSeconds) * time.Second)
}

return isTargetAchieved
}

func TestScaler(t *testing.T) {
// setup
t.Log("--- setting up ---")

// Create kubernetes resources
kc := GetKubernetesClient(t)
metricValue := 1

data, templates := getTemplateData(metricValue)

listOptions := metav1.ListOptions{
FieldSelector: "status.successful=0",
}

CreateKubernetesResources(t, kc, testNamespace, data, templates)

assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, data.MetricThreshold, iterationCountInitial, 1, listOptions),
"job count should be %d after %d iterations", data.MetricThreshold, iterationCountInitial)

// test scaling
testPause(t, kc, listOptions)
testUnpause(t, kc, data, listOptions)

// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}

func getTemplateData(metricValue int) (templateData, []Template) {
return templateData{
TestNamespace: testNamespace,
ScaledJobName: scaledJobName,
ScalerName: scalerName,
ServiceName: serviceName,
MinReplicaCount: minReplicaCount,
MaxReplicaCount: maxReplicaCount,
MetricThreshold: 1,
MetricValue: metricValue,
}, []Template{
{Name: "scalerTemplate", Config: scalerTemplate},
{Name: "serviceTemplate", Config: serviceTemplate},
{Name: "scaledJobTemplate", Config: scaledJobTemplate},
}
}

func testPause(t *testing.T, kc *kubernetes.Clientset, listOptions metav1.ListOptions) {
t.Log("--- testing Paused annotation ---")

_, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused=true --namespace %s", scaledJobName, testNamespace))
assert.NoErrorf(t, err, "cannot execute command - %s", err)

t.Log("job count does not change as job is paused")

expectedTarget := 0
assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1, listOptions),
"job count should be %d after %d iterations", expectedTarget, iterationCountLatter)
}

func testUnpause(t *testing.T, kc *kubernetes.Clientset, data templateData, listOptions metav1.ListOptions) {
t.Log("--- testing removing Paused annotation ---")

_, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused- --namespace %s", scaledJobName, testNamespace))
assert.NoErrorf(t, err, "cannot execute command - %s", err)

t.Log("job count increases from zero as job is no longer paused")

expectedTarget := data.MetricThreshold
assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1, listOptions),
"job count should be %d after %d iterations", expectedTarget, iterationCountLatter)
}