Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer commits offset regardless of handler failures #59

Closed
kkogan opened this issue May 12, 2016 · 4 comments
Closed

Consumer commits offset regardless of handler failures #59

kkogan opened this issue May 12, 2016 · 4 comments

Comments

@kkogan
Copy link

kkogan commented May 12, 2016

s.handler(p.messageSet, p.topic, p.partition, p.highwaterMarkOffset)
.catch(function (err) {
self.client.warn('Handler for', p.topic + ':' + p.partition, 'failed with', err);
})
.finally(function () {
s.paused = false;
s.offset = _.last(p.messageSet).offset + 1; // advance offset position
});

It seems like the finally handler there is incrementing offsets regardless of handler success. This means that if the handler chooses not to commit offsets to kafka due to failures in the handler itself, the consumer will still continue to consume messages, leading to potential divergence in persisted offsets and processed offsets.
What is the recommended way of not advancing the offset on failure?

Moreover, if a batch of payloads fails some of the way through, a handler can commit the last successful offset. In this scenario, shouldn't the consumer continue from the offset after this last successful one until the handler commits a greater offset?

@oleksiyk
Copy link
Owner

First of all this increment is not a Kafka offset commit, it is an increment of the requested offset position.

In case of handler failure, the other option would be to stop fetching because fetching with the same offset and hitting handler error again and again is pointless.

Suggested behaviour is to handle handler failures in the handler (your application) itself. The batch should be processed in order and not in parallel if you want to have a guarantee in case of the failure.

@kkogan
Copy link
Author

kkogan commented May 13, 2016

So I'm on board with the local position advancing automatically, as the Java API does the same.
However, the Java API provides a seek method that allows overriding the local position.
Sounds like you would recommend just retrying errors inside the handlers, blocking it's resolution, and thus pausing consumption for that topic+partition combination?

@oleksiyk
Copy link
Owner

seek method is good idea.

At present you can just call subscribe again (without unsubscribing) with required offset:

subscribe('topic', [0], { offset: 300 }, handler)

@oleksiyk
Copy link
Owner

Sounds like you would recommend just retrying errors inside the handlers, blocking it's resolution, and thus pausing consumption for that topic+partition combination?

Actually I can't think of any other way. The application has a choice: log the error and proceed or pause consumption of next messages (for this topic/partition pair) and retry processing the faulty one. I'm open for ideas.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants