-
Notifications
You must be signed in to change notification settings - Fork 141
Conversation
@horkhe @nemothekid @aaronkavlie-wf @kvs: your input on this is very welcome! |
Also pinging @eapache as always :) |
} | ||
|
||
instances, instancesChanged, err := cm.group.WatchInstances() | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: retry this on error.
Dunno how similar the ZK version is, but this seems like an ideal place for a swappable interface. |
I will go ahead and try to implement it with sarama's OffsetManager. I can rework the ZK version to match the sarama interface. |
I implemented offset management using sarama's WIP offset managers. It appears to work well. |
I also added Whitelist and Blacklist subscription types. This mean you can subscribe to topics (not) matching a regular expression. Any new topics that are created in the cluster will automatically be consumed. |
} | ||
|
||
eventCount += 1 | ||
if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be unnecessary (sarama does these checks) and it's wrong anyways for compacted topics where the offsets might not be monotonic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was copy pasted from the previous example app and is shitty. This also fails when you start consuming a partition, then another instance takes it over, and later you get the partition assigned back to you.
Will remove.
Quick skim looks good 👍 When this is fully ready I'll do a deep-dive review. |
This is getting pretty close to being feature complete. Let's work to get IBM/sarama#461 merged so we can pick this up. |
@wvanbergen good job! I only wish you guys had all that production ready a couple of months ago, then I would not need to implement that myself :) |
} | ||
|
||
// Initialize sarama consumer | ||
if consumer, err := sarama.NewConsumerFromClient(cm.client); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer and OffsetManager share a client instance but consumer requests can stay blocked doing long polling. Maybe it makes sense to use separate clients so that OffsetManager would never be affected by long polls?
… consumer instance list.
…setManager interface
// partition consumer resumes consuming from this partition later. | ||
func (pm *partitionManager) waitForProcessing() { | ||
nextOffset, _ := pm.offsetManager.NextOffset() | ||
lastProcessedOffset := nextOffset - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really super happy about this implementation, but not sure how else to do this.
We only want to wait for offsets if a) we actually consumed any messages at all, and b) if we haven't already processed all consumed offsets. In either of those cases, the pm.processingDone
will never be closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems reasonable to me... early returns would make it much prettier though, e.g.
if lastConsumedOffset == -1 {
return
}
// ...
} | ||
ts.offsetTotal += offset - 1 | ||
|
||
request.AddBlock(topic, partition, offset-1, 0, "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp 0
means the beginning of Unix epoch. As a result all committed offsets are expired immediately. (in my setup Kafka kept them around for 1 minute). So you need to use ReceiveTime
(-1) instead.
Any update on this one? |
It's mostly done; I need to get the functional tests to work on Travis (they work fine on my machine). Any input on that is welcome. |
@wvanbergen Hi, any news about this PR? Do you need any help? Last commit from Oct 7, so I'm not sure if I should wait or rather choose different approach for consumer groups. |
Hey. It looks like I will not be have a lot of time available to maintain this library, or finish this PR. If anybody is interested in taking it over, I will be glad to help out and get you started. |
And does it make sense when IBM/sarama@66d77e1 is merged? Or only as support for cluster still running on 0.8.x? |
Yeah, this will be primarily for people that are stuck on 0.8 for the time being. |
This is complementary fix for wvanbergen#68 (issue: wvanbergen#62), before the re-implementation (wvanbergen#72) is ready. In my use case, the message consuming logic is sometimes time consuming, even with 3 times retry as the fix in pull#68, it's still easy to have the issue#62. Furhter checking current logic in consumer_group.go:partitionConsumer(), it may take as many as cg.config.Offsets.ProcessingTimeout to ReleasePartition so that the partition can be claimed by new consumer during rebalance. So just simply set the max retry time same as cg.config.Offsets.ProcessingTimeout, which is 60s by default. Verified this the system including this fix with frequent rebalance operations, the issue does not occur again.
This reimplements the kafka high-level consumer. This addresses a couple of limitations of the old implementation, and also makes the code and especially the multithreading model easier to grok (I was young an inexperienced when I wrote the initial implementation ;).
Changes
Subscription
interface to describe what topics to consume. This would allow us to implement a regular expression based black list or white list approach, as well as a static list of topics.Consumer
type, so unit testing apps that use this library is now possible using dependency injection.Implementation notes
consumerManager
: runs a goroutine that figures out what partitions to consume, and start/stop partition managers for them. Afterwards it waits for changes in the subscription or changes in this list of running instances to do it again. Implements theConsumer
interface.partitionManager
: runs a goroutine that manages a singlesarama.PartitionConsumer
, claiming the partition in Zookeeper, and managing offsets.Subscription
: Describes what partitions the entire group should be consuming, and watches zookeeper for potential changes.TODO
Subscription
type to Kazoo?