diff --git a/core/internal/notifier/coordinator.go b/core/internal/notifier/coordinator.go index 63bee20b..358dbae8 100644 --- a/core/internal/notifier/coordinator.go +++ b/core/internal/notifier/coordinator.go @@ -92,6 +92,11 @@ type Coordinator struct { clusters map[string]*clusterGroups clusterLock *sync.RWMutex + + // The value of the 'ShowAll' parameter when submitting a request to + // the evaluator. This flag is determined based on the notifier + // 'threshold' configuration. + ShowAll bool } // getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there @@ -162,6 +167,7 @@ func (nc *Coordinator) Configure() { nc.quitChannel = make(chan struct{}) nc.running = sync.WaitGroup{} nc.evaluatorResponse = make(chan *protocol.ConsumerGroupStatus) + nc.ShowAll = false // Set the function for parsing templates and calling module Notify (configurable to enable testing) if nc.templateParseFunc == nil { @@ -185,6 +191,13 @@ func (nc *Coordinator) Configure() { viper.SetDefault(configRoot+".send-interval", viper.GetInt64(configRoot+".interval")) viper.SetDefault(configRoot+".threshold", 2) + // if any of the notifier thresholds are 1, we need to fetch + // status of all consumer groups from the evaluator + threshold := viper.GetInt(configRoot + ".threshold") + if threshold == 1 { + nc.ShowAll = true + } + // Compile the whitelist for the consumer groups to notify for var groupWhitelist *regexp.Regexp whitelist := viper.GetString(configRoot + ".group-whitelist") @@ -375,6 +388,7 @@ func (nc *Coordinator) sendEvaluatorRequests() { Reply: nc.evaluatorResponse, Cluster: sendCluster, Group: sendConsumer, + ShowAll: nc.ShowAll, } }(cluster, consumer) groupInfo.LastEval = timeNow diff --git a/core/internal/notifier/coordinator_race_test.go b/core/internal/notifier/coordinator_race_test.go index c90b1820..b3c3d7bf 100644 --- a/core/internal/notifier/coordinator_race_test.go +++ b/core/internal/notifier/coordinator_race_test.go @@ -57,11 +57,11 @@ func TestCoordinator_sendEvaluatorRequests(t *testing.T) { case "testcluster": assert.Equalf(t, "testcluster", request.Cluster, "Expected request cluster to be testcluster, not %v", request.Cluster) assert.Equalf(t, "testgroup", request.Group, "Expected request group to be testgroup, not %v", request.Group) - assert.False(t, request.ShowAll, "Expected ShowAll to be false") + assert.True(t, request.ShowAll, "Expected ShowAll to be true") case "testcluster2": assert.Equalf(t, "testcluster2", request.Cluster, "Expected request cluster to be testcluster2, not %v", request.Cluster) assert.Equalf(t, "testgroup2", request.Group, "Expected request group to be testgroup2, not %v", request.Group) - assert.False(t, request.ShowAll, "Expected ShowAll to be false") + assert.True(t, request.ShowAll, "Expected ShowAll to be true") default: assert.Failf(t, "Received unexpected request for cluster %v, group %v", request.Cluster, request.Group) }