Skip to content

Commit

Permalink
Disable sarama metrics and close scalers (kedacore#1572)
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed ElSayed <ahmels@microsoft.com>
  • Loading branch information
ahmelsayed authored and ycabrer committed Mar 1, 2021
1 parent c61c140 commit faefe77
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

- Add `KEDA_HTTP_DEFAULT_TIMEOUT` support in operator ([#1548](https://github.com/kedacore/keda/issues/1548))
- Removed `MIN field` for scaledjob.([#1553](https://github.com/kedacore/keda/pull/1553))
- Fix a memory leak in kafka client and close push scalers ([#1565](https://github.com/kedacore/keda/issues/1565))

### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/onsi/gomega v1.10.4
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.9.0
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/robfig/cron/v3 v3.0.1
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.6.1
Expand Down
7 changes: 7 additions & 0 deletions pkg/scalers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (
"k8s.io/metrics/pkg/apis/external_metrics"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
metrics "github.com/rcrowley/go-metrics"
)

func init() {
// Disable metrics for kafka client (sarama)
// https://github.com/Shopify/sarama/issues/1321
metrics.UseNilMetrics = true
}

// Scaler interface
type Scaler interface {

Expand Down
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,14 @@ func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav
for _, s := range ss {
scaler, ok := s.(scalers.PushScaler)
if !ok {
s.Close()
continue
}

go func() {
activeCh := make(chan bool)
go scaler.Run(ctx, activeCh)
defer scaler.Close()
for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit faefe77

Please sign in to comment.