Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow manual offsets commit for a consumer #304

Closed
piavka opened this issue Dec 5, 2016 · 15 comments
Closed

Allow manual offsets commit for a consumer #304

piavka opened this issue Dec 5, 2016 · 15 comments

Comments

@piavka
Copy link

piavka commented Dec 5, 2016

Currently library consumer offests commit is automatic according to configuration or then each_batch block is processed. We need to be able to manually commit offsets but @offset_manager is not exposed by the Consumer class nor there is option to disable the automatic offset commits.

I can work on PR if such feature would be accepted ?

@dasch
Copy link
Collaborator

dasch commented Dec 6, 2016

What is your use case – why are the automatic commits not sufficient?

@piavka
Copy link
Author

piavka commented Dec 6, 2016

I have multiple, for example we batch data from kafka into files which are every defined period of time closed and moved for uploads to s3, only once files are moved for uploads offsets are commited.

@dasch
Copy link
Collaborator

dasch commented Dec 6, 2016

I would recommend trying to align the files with fetched batches and using the batch API, e.g.

consumer.each_batch do |batch|
  filename = "files/#{batch.topic}-#{batch.partition}-#{batch.first_offset}"

  File.open(filename, "w") do |file|
    batch.messages.each do |message|
      file << message.value
    end
  end

  # Once this block succeeds, the entire batch is marked as consumed.
  move_to_s3(filename)
end

You can specify min_bytes and max_wait_time to tune the batch sizes. I would recommend starting there rather than implementing your own solution.

@piavka
Copy link
Author

piavka commented Dec 6, 2016

I can't really align with batch size since we are taking about minutes intervals and during this time
period the data needs to be processed before it written to the file and file finally closed and moved to uploads. This is something i've been using mannual commit offsets all the time i just need to migrate from poseidon_cluster to kafka_cluster gem. I don't see what is wrong with giving manual commit offset control to library user? The change itself seems to be minimal to support this with current code.

@dasch
Copy link
Collaborator

dasch commented Dec 7, 2016

I'm simply trying to keep non-core features out – maintaining features is a lot more work than adding them.

What kind of API are you proposing?

@dimas
Copy link

dimas commented Jan 9, 2017

I would say that at least NOT committing offsets automatically is a core feature. At least Java Kafka client allows you doing that.
I cannot see a way of doing it with ruby-kafka. Even if I set offset_commit_interval=>0, offset_commit_threshold=>0 to avoid automatic commits, still, the consumer loop will invoke

    ensure
      # In order to quickly have the consumer group re-balance itself, it's
      # important that members explicitly tell Kafka when they're leaving.
      @offset_manager.commit_offsets rescue nil
...
    end

committing offsets and there seem to be no way of overriding that without some dirty hacks.

And use case - is consuming __consumer_offsets topic itself. You always want to consume it in full from the very beginning and you do not need to track your progress really so no point in committing at all.

Should I raise it as a separate issue?

@dasch
Copy link
Collaborator

dasch commented Jan 10, 2017

@dimas committing offsets is a requirement for the distributed consumer groups to be able to function – otherwise any hiccup in the group would cause partition processing to start over.

Do you need to distribute your workload? If not, you can simply use the non-distributed consumer API:

kafka.each_message(topic: "__consumer_offsets") do |message|
  puts message.offset, message.key, message.value
end

@piavka
Copy link
Author

piavka commented Jan 10, 2017

what if i need to commit offsets only after some processing has happened - a processing of big data chunk, a chunk that cannot fit into a single kafka.each_message(...) {|m| ...} iteration ?

@dasch
Copy link
Collaborator

dasch commented Jan 10, 2017

@piavka do you have an API proposal for ruby-kafka for dealing with manual offset commits?

@piavka
Copy link
Author

piavka commented Jan 10, 2017

@dasch simply add auto_commit_offset param to consumer.each_batch(...)
that would stop calling @offset_manager.commit_offsets on each iteration completion
in case it's set to false and expose commit_offsets method in the client
within each each_batch iteration we have the offsets and thus user can save them, once the relevant processing (outside of each_batch(...)) is done user would commit those saved offsets

@dasch
Copy link
Collaborator

dasch commented Jan 10, 2017

Hmm. I'll think a bit about it.

@dimas
Copy link

dimas commented Jan 10, 2017

@dasch, oh. You are right, that is probably what I should use. My bad.
No I do not need distributing workload.

I think I have kind of weird (or lets say special) use case - trying to do something similar to KafkaOffsetMonitor but in Ruby so my client needs some bits that normal clients should not care about. So I raised another one about that #311, hope you do not mind.

@sidbits
Copy link

sidbits commented Jul 6, 2017

We have Samza tasks which reads messages from Kafka Output stream but if there is any retryable failure while processing the message then i would want my Samza task to read the same message again and reprocess it. And after successfully processing the message acknowledge it for checkpointing instead of auto commit.

Is there a way to manually control the checkpoint(just like what Kafka Consumer provides "Manual Offset Control" by setting enable.auto.commit to false : https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html )

I came across this doc https://samza.apache.org/learn/documentation/0.13/jobs/reprocessing.html which talks about reprocessing previously processed data but it is not offering any acknowledgement based checkpoint control.

@dasch
Copy link
Collaborator

dasch commented Jul 7, 2017

@sidbits I think you're in the wrong repo – this has nothing to do with Samza :)

@dasch
Copy link
Collaborator

dasch commented Aug 4, 2017

Duplicate of #126.

@dasch dasch closed this as completed Aug 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants