Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Initial stab at Zookeeper-managed topic consumer. #1

Merged
merged 8 commits into from
May 26, 2014
Merged

Conversation

wvanbergen
Copy link
Owner

This needs some work, but is ready for a first look. This implements the zookeeper logic to have a nice consumer group:

  • It discovers the Kafka cluster as it is defined in Zookeeper.
  • It load balances based on the number of running consumers
  • It commits the offset for every partition to Zookeeper every x seconds (current x is hardcoded to 1 second)
  • It automatically resumes from the latest committed offset.

This is heavily based on https://github.com/bsm/sarama/tree/master/cluster, but large parts are rewritten because it wasn't working for me. The Zookeeper-backed consumer group management should be compatible with the JVM library, as documented here: https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management

TODO

  • Error handling needs a close look.
  • Make some hardcoded values configurable (e.g. the commit interval of 1 second)
  • Remove a bunch of logging statements.
  • KafkaConsumer and ConsumerGroup should probably be merged into one type.
  • Some stuff in PartitionConsumer should move to Sarama to clean up the implementation.

@jnormore @berkcaputcu @manygrams /cc @eapache @bsm

@ontarionick
Copy link
Collaborator

My Go knowledge is far too basic to offer any real help on that, but this looks / sounds awesome to me! 👍

@wvanbergen
Copy link
Owner Author

@manygrams You should check out topic_consumer.go (https://github.com/wvanbergen/kafkacluster/pull/1/files#diff-6). It's a good example of how to use this API.

@wvanbergen
Copy link
Owner Author

@snormore @boourns Some feedback on my use of channels/goroutines would be appreciated, because I am not really sure about any best practices or guidelines when dealing with those.

for {
event, ok := <-stream
if !ok {
log.Println("Consumer is done")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the consumer ever done and it's not an error? I'm just thinking this should exit with a non-zero exit code?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer.Close() above will end up closing the stream channel. So by pressing ctrl+c.


topicChannel := make(chan *Event)

go func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this func is sufficiently large I would prefer it be named

wvanbergen added a commit that referenced this pull request May 26, 2014
Initial stab at Zookeeper-managed topic consumer.
@wvanbergen wvanbergen merged commit 6b2ae6e into master May 26, 2014
@wvanbergen wvanbergen deleted the initial branch May 26, 2014 15:52
@wvanbergen
Copy link
Owner Author

Going to merge this so we can start using it. It will probably need some more work over time once we start using it for stuff

@ontarionick
Copy link
Collaborator

Can we pair on using this from my Rails app?

@wvanbergen
Copy link
Owner Author

Yes!

On May 26, 2014, at 12:20 PM, Nick Evans notifications@github.com wrote:

Can we pair on using this from my Rails app?


Reply to this email directly or view it on GitHub.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants