Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add way to commit offsets with Consumer #436

Merged
merged 9 commits into from
Jul 23, 2019

Conversation

JaapRood
Copy link
Collaborator

Closes #378.

In various other clients, like the official KafkaConsumer, arbitrary offsets can be committed through an interface like consumer.commit. So far, the only way to do this in KafkaJS is through commitOffsetIfNecessary function exposed as part of the eachBatch mechanism.

This limitation prevents the committing of offsets before a message has been consumed, which is a time and place where in stream processing tasks it's desirable to do so. While in most cases a consumer.seek operation is desirable, there are situations where you just want to set the progress made and not consume any messages at all.

@JaapRood
Copy link
Collaborator Author

JaapRood commented Jul 16, 2019

With the first steps made to explore how this would look, there's a couple of things to mention:

  • Rather than going with the shape of consumerGroup.commitOffsets, I defined the type as Array<TopicPartitionOffsetAndMetadata>, not unlike the KafkaConsumer does. I changed it to this, as the signature of { topics: {[ topic, partitions: [{ partition, offset, metadata }] ]} was just really cumbersome, instantly showed by just writing the basic specs. Instead, we accept [{ topic, partition, offset, metadata }] and massage it into the right shape afterwards.
  • I haven't yet found a way to express the use case this is meant to support, given the current constraints of consumer.subscribe and consumer.run. I found myself having to setup a consumption loop, while the point of this PR is showing how that shouldn't really be necessary. Sometimes you just want to set the point of progress, in like application reset, or using some custom batching logic, etc. Requiring a consumption loop basically makes this similar to consumer.seek (especially as seek currently commits offset Consumer.seek commits an offset, causing possible data and metadata loss #395). It kind of ties into Add manual assignment of TopicPartitions to Consumer #373 as well, which might be a better time to address this. So far, it's just used instead of auto commit (even though there's other ways of doing that).
  • We don't actually assert in the test yet that the committing works properly, it's just a draft to explore so far 😬. The commitOffsets is tested more deeply in specs for the offset manager, so I wasn't concerned with being overly exhaustive.

@JaapRood
Copy link
Collaborator Author

JaapRood commented Jul 16, 2019

Looking at the code, one thing I'm worried about given the current implementation is that commitOffset call would cause basically a seek to be performed, where I'm pretty convinced it shouldn't. Committing an offset is basically just producing a message (they have their own request in the protocol, but they are stored on a kafka partition, hence why it works transactionally as well) and be independent of the consumption process (even though it's obviously used in conjunction in practice). It should only be used to consider from where to resume processing when starting an assignment, like after a rebalance or manual assignment. In other words, a stored committedOffset should basically be the seed of the local consumedOffset when determining where to start fetching and consuming the partition from. As highlighted by #395, seeking should be changing that consumedOffset, but not touch the committedOffset.

I'll have to dig deeper to see if that's the effect it's having, see if I can write a test to verify it either way. You could say that could be a separate issue and PR, but as this concerns a public API, changing it's behaviour would incur a breaking change, so it's probably something we want to figure out right from the beginning.

@JaapRood
Copy link
Collaborator Author

Having looked closer at how OffsetManager, ConsumerGroup and Cluster interact, I'm happy to say that I don't believe calling commitOffset performs a seek, with the consumption continuing where it should 😬. While the opposite is true, keeping #395 as a valid issue, it shouldn't impact these changes. I'll add a test to verify as much, just to be sure we don't conflate committing and seeking behaviours now or in the future!

@JaapRood JaapRood marked this pull request as ready for review July 16, 2019 14:25
@JaapRood
Copy link
Collaborator Author

Some work remains to be done on the documentation side, but apart from that, I think this is looking pretty good and ready for review.

@tulios
Copy link
Owner

tulios commented Jul 18, 2019

This PR looks good, but I think it is still missing something. What happens if you try to commit outside of the run loop and you receive a protocol error, for example, REBALANCE_IN_PROGRESS. In the current setup, the consumer will catch the error and execute the rebalance, but since the error occurred outside of the loop, you have to "notify" the consumer that a rebalance is needed. This is also the case for other errors, so maybe some errors have to trigger the "recover" phase of the consumer.

and this should be the same for the isolated heartbeat method.

WDYT?

@tulios tulios self-requested a review July 18, 2019 08:34
@JaapRood
Copy link
Collaborator Author

JaapRood commented Jul 18, 2019

You make a good point, something I hadn't considered thus far. Thinking out loud for a bit:

Technically speaking, you'd say that both calls to commit offsets (or send heartbeats) happen in the context of the ConsumerGroup, which is the entity that should be able to reason about these kinds of semantics (atleast from a conceptual point of view). Given that, why would there be any difference about who called this method and how to recover from errors?

Looking at where these protocol errors are handled now, it seems to be at the Runner level. I guess that has the benefit that any protocol errors inside a eachBatch or eachMessage is also caught. However, if we only want to respond to protocol errors that originate from our ConsumerGroup in the first place, shouldn't those already have been dealt with before they even reach that message processing context?

@tulios not having really looked at the reality of it in code, my recommendation would be to handle protocol errors that concern the consumer group inside ConsumerGroup, regardless of where the calls for heartbeat or commitOffsets come from. Would be great to hear what you think about that rationale.

@JaapRood
Copy link
Collaborator Author

Looking at the actual code, things seem a bit more complicated, especially if we want to avoid an all-out refactor. The Runner dealing with run-time state, errors and retries seems like a good idea, so I'm going in the direction of trying to run the commit offsets in context of it.

@JaapRood
Copy link
Collaborator Author

I took the approach of adding a commitOffsets to Runner and calling that from the public consumer API. This allows runner to handle the errors around rebalancing or lost members (yet to be verified by test coverage) and the operation to be retried. It still needs some love, but this is mostly to discuss whether this it the right approach.

From the perspective of code organisation, I can imagine the same working for consumer.heartbeat, although the retry logic would be somewhat different (there's no longer a need to send a heartbeat after a group has just rebalanced or we lost our membership).

The thing I'm wondering if retrying after a rebalance is actually the behaviour we want to see from committing offsets. A commit would be sensitive to the assignment context it's made in, so if that context changes, isn't the correct thing to bail on the attempt instead? A new assignment would then pick up the previously committed offset and "retry" through processing the messages again. To achieve exactly-once, transactions should be used.

…s, but do trigger a rejoin in the background
@JaapRood
Copy link
Collaborator Author

@tulios I think that covers it, ready for another look from you!

@tulios
Copy link
Owner

tulios commented Jul 22, 2019

@JaapRood sorry for the delay, the changes should work. Final question 😄
Let's say I have my consumer running and I'm not committing with the new method within the run loop, what should be the expectation when using the new commitOffset, if I commit offset 3 should the consumer fetch from offset 4, or is it acceptable that the two things might not be in-sync, and the consumer will actually fetch from offset 2 because fetch was triggered before commitOffset actually finished.

@JaapRood
Copy link
Collaborator Author

@JaapRood sorry for the delay

@tulios no worries, that's what async working is for 👍 .

Fetching and committing do not have to be in sync, as far as I've been able to gather through experience, my exploration above in this PR and #395. They just happen to be most of the time. It should be through the use of consumer.seek that a user can keep them in sync, by performing that after the commit. Consumer.seek in turn, should not commit an offset, merely set the run-time 'consumed' offset to produce the right 'next' offset to fetch from (see #395). The 'committed' offset only interacts with 'consumed' offset in determining from where to start fetching a new assignment when not explicitly seeked too.

Hopefully that holds water, happy to further discuss those semantics if need be!

@tulios
Copy link
Owner

tulios commented Jul 23, 2019

Nice, that's also my understanding. I just wanted to make sure we were aligned.

@tulios tulios merged commit 2d5d830 into tulios:master Jul 23, 2019
@JaapRood JaapRood deleted the feature/consumer-commit branch July 23, 2019 08:07
@ThisIsMissEm
Copy link

What's the timeline for this reaching a release?

(I'd been looking at the documentation & thought it was already released)

@tulios
Copy link
Owner

tulios commented Jul 24, 2019

Hi @ThisIsMissEm, we usually run all the changes in production before we release to the public. We merged the feature recently so it might take at least a week for us to release, in the meanwhile, you can depend on the commit hash (npm install https://github.com/tulios/kafkajs.git#2d5d83097ed347d466f7a97969dbfe1f03189db4)

The current setup is suboptimal, and we are already addressing the situation, once #441 is merged every merge to master will create a pre-release version on GitHub with the change.

@ThisIsMissEm
Copy link

@tulios okay, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add way to commit offset with Consumer
3 participants