diff --git a/CHANGELOG.md b/CHANGELOG.md index 076f0035835..da19e7796fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ - Add `KEDA_HTTP_DEFAULT_TIMEOUT` support in operator ([#1548](https://github.com/kedacore/keda/issues/1548)) - Removed `MIN field` for scaledjob.([#1553](https://github.com/kedacore/keda/pull/1553)) +- Fix a memory leak in kafka client and close push scalers ([#1565](https://github.com/kedacore/keda/issues/1565)) ### Breaking Changes diff --git a/go.mod b/go.mod index 2261b616400..e6b50207a9a 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/onsi/gomega v1.10.4 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.9.0 + github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 github.com/robfig/cron/v3 v3.0.1 github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.6.1 diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index 10889a70376..efc0714b2fe 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -9,8 +9,15 @@ import ( "k8s.io/metrics/pkg/apis/external_metrics" kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + metrics "github.com/rcrowley/go-metrics" ) +func init() { + // Disable metrics for kafka client (sarama) + // https://github.com/Shopify/sarama/issues/1321 + metrics.UseNilMetrics = true +} + // Scaler interface type Scaler interface { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index fc18381f2f6..4a71a3c84e5 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -154,12 +154,14 @@ func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav for _, s := range ss { scaler, ok := s.(scalers.PushScaler) if !ok { + s.Close() continue } go func() { activeCh := make(chan bool) go scaler.Run(ctx, activeCh) + defer scaler.Close() for { select { case <-ctx.Done():