Skip to content

Commit

Permalink
fix: Prevented stuck status due to timeouts during scalers generation (
Browse files Browse the repository at this point in the history
  • Loading branch information
JorTurFer authored and zroubalik committed Nov 27, 2023
1 parent 4467bb0 commit 27bf91e
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Here is an overview of all new **experimental** features:

### Fixes

- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Prevented stuck status due to timeouts during scalers generation ([#5083](https://github.com/kedacore/keda/issues/5083))

### Deprecations

Expand Down
32 changes: 16 additions & 16 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,36 +291,25 @@ func (h *scaleHandler) getScalersCacheForScaledObject(ctx context.Context, scale
// performGetScalersCache returns cache for input scalableObject, it is common code used by GetScalersCache() and getScalersCacheForScaledObject() methods
func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, scalableObject interface{}, scalableObjectGeneration *int64, scalableObjectKind, scalableObjectNamespace, scalableObjectName string) (*cache.ScalersCache, error) {
h.scalerCachesLock.RLock()
regenerateCache := false
if cache, ok := h.scalerCaches[key]; ok {
// generation was specified -> let's include it in the check as well
if scalableObjectGeneration != nil {
if cache.ScalableObjectGeneration == *scalableObjectGeneration {
h.scalerCachesLock.RUnlock()
return cache, nil
}
// object was found in cache, but the generation is not correct,
// we'll need to close scalers in the cache and
// proceed further to recreate the cache
regenerateCache = false
} else {
h.scalerCachesLock.RUnlock()
return cache, nil
}
}
h.scalerCachesLock.RUnlock()

h.scalerCachesLock.Lock()
defer h.scalerCachesLock.Unlock()
if cache, ok := h.scalerCaches[key]; ok {
// generation was specified -> let's include it in the check as well
if scalableObjectGeneration != nil {
if cache.ScalableObjectGeneration == *scalableObjectGeneration {
return cache, nil
}
// object was found in cache, but the generation is not correct,
// let's close scalers in the cache and proceed further to recreate the cache
cache.Close(ctx)
} else {
return cache, nil
}
}

if scalableObject == nil {
switch scalableObjectKind {
case "ScaledObject":
Expand Down Expand Up @@ -388,6 +377,17 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s
default:
}

// Scalers Close() could be impacted by timeouts, blocking the mutex
// until the timeout happens. Instead of locking the mutex, we take
// the old cache item and we close it in another goroutine, not locking
// the cache: https://github.com/kedacore/keda/issues/5083
if regenerateCache {
oldCache := h.scalerCaches[key]
go oldCache.Close(ctx)
}

h.scalerCachesLock.Lock()
defer h.scalerCachesLock.Unlock()
h.scalerCaches[key] = newCache
return h.scalerCaches[key], nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//go:build e2e
// +build e2e

package broken_scaledobject_tolerancy_test

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper"
)

const (
testName = "broken-scaledobject-tolerancy-test"
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
deploymentName = fmt.Sprintf("%s-deployment", testName)
monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
)

type templateData struct {
TestNamespace string
DeploymentName string
MonitoredDeploymentName string
ScaledObjectName string
}

const (
monitoredDeploymentTemplate = `apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.MonitoredDeploymentName}}
namespace: {{.TestNamespace}}
labels:
deploy: workload-test
spec:
replicas: 0
selector:
matchLabels:
pod: workload-test
template:
metadata:
labels:
pod: workload-test
spec:
containers:
- name: nginx
image: 'nginxinc/nginx-unprivileged'`

deploymentTemplate = `apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.DeploymentName}}
namespace: {{.TestNamespace}}
labels:
deploy: workload-sut
spec:
replicas: 0
selector:
matchLabels:
pod: workload-sut
template:
metadata:
labels:
pod: workload-sut
spec:
containers:
- name: nginx
image: 'nginxinc/nginx-unprivileged'`

brokenScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}-broken
namespace: {{.TestNamespace}}
spec:
scaleTargetRef:
name: {{.MonitoredDeploymentName}}
minReplicaCount: 0
maxReplicaCount: 1
triggers:
- metadata:
activationLagThreshold: '1'
bootstrapServers: 1.2.3.4:9092
consumerGroup: earliest
lagThreshold: '1'
offsetResetPolicy: earliest
topic: kafka-topic
type: kafka
`

scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
spec:
scaleTargetRef:
name: {{.DeploymentName}}
pollingInterval: 1
cooldownPeriod: 0
minReplicaCount: 0
maxReplicaCount: 10
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
stabilizationWindowSeconds: 5
triggers:
- type: kubernetes-workload
metadata:
podSelector: 'pod=workload-test'
value: '1'
`
)

// As we need to ensure that a broken ScaledObject doesn't impact
// to other ScaledObjects https://github.com/kedacore/keda/issues/5083,
// this test deploys a broken ScaledObject pointing to missing endpoint
// which produces timeouts. In the meantime, we deploy another ScaledObject
// and validate that it works although the broken ScaledObject produces timeouts.
// all the time. This prevents us for introducing deadlocks on internal scalers cache
func TestBrokenScaledObjectTolerance(t *testing.T) {
// setup
t.Log("--- setting up ---")
// Create kubernetes resources
kc := GetKubernetesClient(t)
data, templates := getTemplateData()

CreateKubernetesResources(t, kc, testNamespace, data, templates)

testScaleOut(t, kc)
testScaleIn(t, kc)

// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}

func getTemplateData() (templateData, []Template) {
return templateData{
TestNamespace: testNamespace,
DeploymentName: deploymentName,
ScaledObjectName: scaledObjectName,
MonitoredDeploymentName: monitoredDeploymentName,
}, []Template{
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate},
{Name: "scaledObjectTemplate", Config: scaledObjectTemplate},
{Name: "brokenScaledObjectTemplate", Config: brokenScaledObjectTemplate},
}
}

func testScaleOut(t *testing.T, kc *kubernetes.Clientset) {
// scale monitored deployment to 2 replicas
replicas := 2
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6),
fmt.Sprintf("replica count should be %d after 1 minute", replicas))

// scale monitored deployment to 4 replicas
replicas = 4
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6),
fmt.Sprintf("replica count should be %d after 1 minute", replicas))
}

func testScaleIn(t *testing.T, kc *kubernetes.Clientset) {
// scale monitored deployment to 2 replicas
replicas := 2
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6),
fmt.Sprintf("replica count should be %d after 1 minute", replicas))

// scale monitored deployment to 0 replicas
replicas = 0
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6),
fmt.Sprintf("replica count should be %d after 1 minute", replicas))
}

0 comments on commit 27bf91e

Please sign in to comment.