Skip to content

Commit

Permalink
Propagate contexts down scaler call stacks (#2202)
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron <aaron@ecomaz.net>
  • Loading branch information
arschles committed Oct 26, 2021
1 parent 04aac84 commit d1bed54
Show file tree
Hide file tree
Showing 92 changed files with 406 additions and 365 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -22,7 +22,6 @@

### New

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))

Expand Down Expand Up @@ -54,6 +53,7 @@

### Other

- Ensure that `context.Context` values are passed down the stack from all scaler gRPC handler implementation to scaler implementation code ([#2202](https://github.com/kedacore/keda/pull/2202))
- Migrate to Kubebuilder v3 ([#2082](https://github.com/kedacore/keda/pull/2082))
- API path has been changed: `github.com/kedacore/keda/v2/api/v1alpha1` -> `github.com/kedacore/keda/v2/apis/keda/v1alpha1`
- Use Patch to set FallbackCondition on ScaledObject.Status ([#2037](https://github.com/kedacore/keda/pull/2037))
Expand Down
1 change: 0 additions & 1 deletion apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions controllers/keda/hpa.go
Expand Up @@ -40,10 +40,10 @@ const (
)

// createAndDeployNewHPA creates and deploy HPA in the cluster for specified ScaledObject
func (r *ScaledObjectReconciler) createAndDeployNewHPA(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) error {
func (r *ScaledObjectReconciler) createAndDeployNewHPA(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) error {
hpaName := getHPAName(scaledObject)
logger.Info("Creating a new HPA", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", hpaName)
hpa, err := r.newHPAForScaledObject(logger, scaledObject, gvkr)
hpa, err := r.newHPAForScaledObject(ctx, logger, scaledObject, gvkr)
if err != nil {
logger.Error(err, "Failed to create new HPA resource", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", hpaName)
return err
Expand All @@ -59,8 +59,8 @@ func (r *ScaledObjectReconciler) createAndDeployNewHPA(logger logr.Logger, scale
}

// newHPAForScaledObject returns HPA as it is specified in ScaledObject
func (r *ScaledObjectReconciler) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) {
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject)
func (r *ScaledObjectReconciler) newHPAForScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) {
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(ctx, logger, scaledObject)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,8 +120,8 @@ func (r *ScaledObjectReconciler) newHPAForScaledObject(logger logr.Logger, scale
}

// updateHPAIfNeeded checks whether update of HPA is needed
func (r *ScaledObjectReconciler) updateHPAIfNeeded(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta2.HorizontalPodAutoscaler, gvkr *kedav1alpha1.GroupVersionKindResource) error {
hpa, err := r.newHPAForScaledObject(logger, scaledObject, gvkr)
func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta2.HorizontalPodAutoscaler, gvkr *kedav1alpha1.GroupVersionKindResource) error {
hpa, err := r.newHPAForScaledObject(ctx, logger, scaledObject, gvkr)
if err != nil {
logger.Error(err, "Failed to create new HPA resource", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", getHPAName(scaledObject))
return err
Expand Down Expand Up @@ -155,19 +155,19 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(logger logr.Logger, scaledObj
}

// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject
func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) {
func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta2.MetricSpec
var externalMetricNames []string
var resourceMetricNames []string

scalers, err := r.scaleHandler.GetScalers(scaledObject)
scalers, err := r.scaleHandler.GetScalers(ctx, scaledObject)
if err != nil {
logger.Error(err, "Error getting scalers")
return nil, err
}

for _, scaler := range scalers {
metricSpecs := scaler.GetMetricSpecForScaling()
metricSpecs := scaler.GetMetricSpecForScaling(ctx)

for _, metricSpec := range metricSpecs {
if metricSpec.Resource != nil {
Expand All @@ -187,7 +187,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger,
}
}
scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...)
scaler.Close()
scaler.Close(ctx)
}

// sort metrics in ScaledObject, this way we always check the same resource in Reconcile loop and we can prevent unnecessary HPA updates,
Expand Down
13 changes: 8 additions & 5 deletions controllers/keda/hpa_test.go
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package keda

import (
"context"

"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -75,7 +77,7 @@ var _ = Describe("hpa", func() {
capturedScaledObject = *scaledObject
})

_, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject)
_, err := reconciler.getScaledObjectMetricSpecs(context.Background(), logger, scaledObject)

Expect(err).ToNot(HaveOccurred())
Expect(capturedScaledObject.Status.Health).To(BeEmpty())
Expand All @@ -102,7 +104,7 @@ var _ = Describe("hpa", func() {
capturedScaledObject = *scaledObject
})

_, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject)
_, err := reconciler.getScaledObjectMetricSpecs(context.Background(), logger, scaledObject)

expectedHealth := make(map[string]v1alpha1.HealthStatus)
expectedHealth["some metric name"] = v1alpha1.HealthStatus{
Expand Down Expand Up @@ -136,9 +138,10 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc
},
}
metricSpecs := []v2beta2.MetricSpec{metricSpec}
scaler.EXPECT().GetMetricSpecForScaling().Return(metricSpecs)
scaler.EXPECT().Close()
scaleHandler.EXPECT().GetScalers(gomock.Eq(scaledObject)).Return(scalers, nil)
ctx := context.Background()
scaler.EXPECT().GetMetricSpecForScaling(ctx).Return(metricSpecs)
scaler.EXPECT().Close(ctx)
scaleHandler.EXPECT().GetScalers(context.Background(), gomock.Eq(scaledObject)).Return(scalers, nil)

return scaledObject
}
6 changes: 3 additions & 3 deletions controllers/keda/scaledjob_controller.go
Expand Up @@ -110,7 +110,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found")
return ctrl.Result{}, err
}
msg, err := r.reconcileScaledJob(reqLogger, scaledJob)
msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob)
conditions := scaledJob.Status.Conditions.DeepCopy()
if err != nil {
reqLogger.Error(err, msg)
Expand All @@ -133,14 +133,14 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob
func (r *ScaledJobReconciler) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob)
if err != nil {
return msg, err
}

// Check ScaledJob is Ready or not
_, err = r.scaleHandler.GetScalers(scaledJob)
_, err = r.scaleHandler.GetScalers(ctx, scaledJob)
if err != nil {
logger.Error(err, "Error getting scalers")
return "Failed to ensure ScaledJob is correctly created", err
Expand Down
12 changes: 6 additions & 6 deletions controllers/keda/scaledobject_controller.go
Expand Up @@ -175,7 +175,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

// reconcile ScaledObject and set status appropriately
msg, err := r.reconcileScaledObject(reqLogger, scaledObject)
msg, err := r.reconcileScaledObject(ctx, reqLogger, scaledObject)
conditions := scaledObject.Status.Conditions.DeepCopy()
if err != nil {
reqLogger.Error(err, msg)
Expand All @@ -199,7 +199,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

// reconcileScaledObject implements reconciler logic for ScaleObject
func (r *ScaledObjectReconciler) reconcileScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (string, error) {
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (string, error) {
// Check scale target Name is specified
if scaledObject.Spec.ScaleTargetRef.Name == "" {
err := fmt.Errorf("ScaledObject.spec.scaleTargetRef.name is missing")
Expand All @@ -224,7 +224,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(logger logr.Logger, scale
}

// Create a new HPA or update existing one according to ScaledObject
newHPACreated, err := r.ensureHPAForScaledObjectExists(logger, scaledObject, &gvkr)
newHPACreated, err := r.ensureHPAForScaledObjectExists(ctx, logger, scaledObject, &gvkr)
if err != nil {
return "Failed to ensure HPA is correctly created for ScaledObject", err
}
Expand Down Expand Up @@ -349,14 +349,14 @@ func (r *ScaledObjectReconciler) checkReplicaCountBoundsAreValid(scaledObject *k
}

// ensureHPAForScaledObjectExists ensures that in cluster exist up-to-date HPA for specified ScaledObject, returns true if a new HPA was created
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (bool, error) {
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (bool, error) {
hpaName := getHPAName(scaledObject)
foundHpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
// HPA wasn't found -> let's create a new one
err = r.createAndDeployNewHPA(logger, scaledObject, gvkr)
err = r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr)
if err != nil {
return false, err
}
Expand All @@ -372,7 +372,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(logger logr.Logg
}

// HPA was found -> let's check if we need to update it
err = r.updateHPAIfNeeded(logger, scaledObject, foundHpa, gvkr)
err = r.updateHPAIfNeeded(ctx, logger, scaledObject, foundHpa, gvkr)
if err != nil {
logger.Error(err, "Failed to check HPA for possible update")
return false, err
Expand Down
16 changes: 8 additions & 8 deletions controllers/keda/scaledobject_controller_test.go
Expand Up @@ -100,20 +100,20 @@ var _ = Describe("ScaledObjectController", func() {
}

testScalers = append(testScalers, s)
for _, metricSpec := range s.GetMetricSpecForScaling() {
for _, metricSpec := range s.GetMetricSpecForScaling(context.Background()) {
if metricSpec.External != nil {
expectedExternalMetricNames = append(expectedExternalMetricNames, metricSpec.External.Metric.Name)
}
}
}

// Set up expectations
mockScaleHandler.EXPECT().GetScalers(uniquelyNamedScaledObject).Return(testScalers, nil)
mockScaleHandler.EXPECT().GetScalers(context.Background(), uniquelyNamedScaledObject).Return(testScalers, nil)
mockClient.EXPECT().Status().Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())

// Call function to be tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, uniquelyNamedScaledObject)
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(context.Background(), testLogger, uniquelyNamedScaledObject)

// Test that the status was updated with metric names
Ω(uniquelyNamedScaledObject.Status.ExternalMetricNames).Should(Equal(expectedExternalMetricNames))
Expand All @@ -139,19 +139,19 @@ var _ = Describe("ScaledObjectController", func() {
if err != nil {
Fail(err.Error())
}
for _, metricSpec := range s.GetMetricSpecForScaling() {
for _, metricSpec := range s.GetMetricSpecForScaling(context.Background()) {
if metricSpec.External != nil {
expectedExternalMetricNames = append(expectedExternalMetricNames, metricSpec.External.Metric.Name)
}
}

// Set up expectations
mockScaleHandler.EXPECT().GetScalers(uniquelyNamedScaledObject).Return([]scalers.Scaler{s}, nil)
mockScaleHandler.EXPECT().GetScalers(context.Background(), uniquelyNamedScaledObject).Return([]scalers.Scaler{s}, nil)
mockClient.EXPECT().Status().Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())

// Call function to be tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, uniquelyNamedScaledObject)
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(context.Background(), testLogger, uniquelyNamedScaledObject)

// Test that the status was updated
Ω(uniquelyNamedScaledObject.Status.ExternalMetricNames).Should(Equal(expectedExternalMetricNames))
Expand Down Expand Up @@ -186,10 +186,10 @@ var _ = Describe("ScaledObjectController", func() {
}

// Set up expectations
mockScaleHandler.EXPECT().GetScalers(duplicateNamedScaledObject).Return(testScalers, nil)
mockScaleHandler.EXPECT().GetScalers(context.Background(), duplicateNamedScaledObject).Return(testScalers, nil)

// Call function tobe tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, duplicateNamedScaledObject)
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(context.Background(), testLogger, duplicateNamedScaledObject)

// Test that the status was not updated
Ω(duplicateNamedScaledObject.Status.ExternalMetricNames).Should(BeNil())
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mock_client/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mock/mock_scale/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions pkg/mock/mock_scaler/mock_scaler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions pkg/mock/mock_scaling/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d1bed54

Please sign in to comment.