-
Notifications
You must be signed in to change notification settings - Fork 798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add a groups reaper to remove non existing groups #657
Conversation
burrow currently keeps reporting lag for non existing consumers. The only way to remove groups from burrow automatically is configuring expire-group, which is not ideal as it can conflict with consumer with no members. This PR introduces a go routine to get the existing consumer groups from Kafka, and compare it against burrow consumers to reap the non existing ones.
calling `.Stop` on a ticker will not close the channel, so consumers won't get triggered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@@ -98,6 +103,19 @@ func (module *KafkaCluster) Start() error { | |||
// Start main loop that has a timer for offset and topic fetches | |||
module.offsetTicker = time.NewTicker(time.Duration(module.offsetRefresh) * time.Second) | |||
module.metadataTicker = time.NewTicker(time.Duration(module.topicRefresh) * time.Second) | |||
|
|||
if module.groupsReaperRefresh != 0 { | |||
module.groupsReaperTicker = time.NewTicker(time.Duration(module.groupsReaperRefresh) * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
groupsReaperRefreshInSeconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following the current code conventions, offsetRefresh
and topicRefresh
don't have seconds as part of the naming
|
||
if module.groupsReaperRefresh != 0 { | ||
module.groupsReaperTicker = time.NewTicker(time.Duration(module.groupsReaperRefresh) * time.Second) | ||
if !module.saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there not a variable like sarama.MIN_VERSION
. I feel we may forget to update this value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to update this value ever; this only checks that the current client can use the new API added in the protocol v.0.11.0
module.groupsReaperTicker = time.NewTicker(time.Duration(module.groupsReaperRefresh) * time.Second) | ||
if !module.saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) { | ||
module.groupsReaperTicker.Stop() | ||
module.Log.Warn("groups reaper disabled, it needs at least kafka v0.11.0.0 to get the list of consumer groups") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel purge or cleanup would sound more intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
req := &protocol.StorageRequest{ | ||
RequestType: protocol.StorageFetchConsumers, | ||
Reply: make(chan interface{}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why no type here? interface{}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's the type of Reply
, we cannot use a concrete type here
// TODO: find how to get reportedConsumerGroup from KafkaClient | ||
burrowIgnoreGroupName := "burrow-" + module.name | ||
burrowGroups, _ := res.([]string) | ||
for _, g := range burrowGroups { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single letter variable name is hard to read.
g
-> burrowGroup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a single letter, easy to read :D
variable is local and scope is limited, there is not need to use longer variables names, this is usually the go practice
Cluster: module.name, | ||
Group: g, | ||
} | ||
helpers.TimeoutSendStorageRequest(module.App.StorageChannel, request, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it processed in another routine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it'll eventually be processed by the storage channel reader
assert.Equalf(t, protocol.StorageFetchConsumers, request.RequestType, "Expected request sent with type StorageFetchConsumers, not %v", request.RequestType) | ||
assert.Equalf(t, "test", request.Cluster, "Expected request sent with cluster test, not %v", request.Cluster) | ||
|
||
request.Reply <- []string{"group1", "group2"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel you have more than one test in the same test method. Split 🙏
@@ -205,6 +205,9 @@ type SaramaClient interface { | |||
// NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the | |||
// underlying client when shutting down this consumer. | |||
NewConsumerFromClient() (sarama.Consumer, error) | |||
|
|||
// List the consumer groups available in the cluster. | |||
ListConsumerGroups() (map[string]string, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the value type of the map is string
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something related the return type of Sarama, as it returns a Map with the consumer group and the consumer group type; here, we only care about if the consumer group exists, a Set
would be great, but go doesn't have it, so we still need a Map
, but we don't care about the value, so I could convert the type to map[string]struct{}
, I decided not to do it as there is no performance gains, and it'll be more wasteful to create another map just to get rid of the undesired value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No blocking comments but please address some of it 😸
@bai How about this feature? |
@bai could you please 👀 |
Apologies for delay, this fell through the cracks on my side. PR LGTM. |
burrow currently keeps reporting lag for non-existing consumers.
The only way to remove groups from burrow automatically is configuring
expire-group
, which is not ideal as it can conflict with consumer withno members.
This PR introduces a go routine to get the existing consumer groups from
Kafka, and compare it against burrow consumers to reap the non-existing ones.
🎩
add a consumer group:
check after some seconds burrow:
stop consumer and delete consumer group or wait until the consumer offsets retention expires:
kafka reports the deleted consumer group
then the groups reaper deletes this consumer:
burrow stops reporting lag as the group is removed:
it also fixes:
#589