New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

assign() then seek() with implicit consumption #46

Closed
tombentley opened this Issue May 23, 2017 · 14 comments

Comments

4 participants
@tombentley
Contributor

tombentley commented May 23, 2017

Some use cases want to assign() and then, before consuming any records, seek() to where they want to start reading from. The native Kafka API allows this because assign() and seek() only take effect on the next call to poll(), so the sequence assign(), seek(), poll() gives the desired semantics.

The vertx-kafka-client API differs from this because assign() implicitly calls poll(), starting consumption from the topic. If you call seek() in the handler for assign() it's already too late because you're already fetching records. This can be rather confusing.

This behaviour can be worked around by pause()ing the KafkaConsumer before the call to assign() and resume()ing it after the call to seek().

Either the current behaviour should be explained more clearly in the Javadoc, or the API should not implicitly start consumption on assign() and subscribe() and instead have an explicit start() method to indicate that the API client is ready to start consuming records. Perhaps this method is really the existing resume() method, and assign() and subscribe() simply don't have the auto-resume() behaviour.

Does @vietj have an opinion on this?

@ppatierno

This comment has been minimized.

Show comment
Hide comment
@ppatierno

ppatierno May 23, 2017

Member

We are wrapping Kafka native API so the poll() and this problems comes here. I like to have the same Kafka native API exposed by the Vert.x one (people who move from native to Vert.x should be already familiar with the API). Of course, we want to expose something like poll() because we have the consumer side as a stream and the reactive way Vert.x expose API. I agree on having something like open() (like we already have a close()) that you the start() that @tombentley is proposing. In any case I wouldn't remove the resume() having start() instead.

Member

ppatierno commented May 23, 2017

We are wrapping Kafka native API so the poll() and this problems comes here. I like to have the same Kafka native API exposed by the Vert.x one (people who move from native to Vert.x should be already familiar with the API). Of course, we want to expose something like poll() because we have the consumer side as a stream and the reactive way Vert.x expose API. I agree on having something like open() (like we already have a close()) that you the start() that @tombentley is proposing. In any case I wouldn't remove the resume() having start() instead.

tombentley added a commit to strimzi/strimzi-kafka-bridge that referenced this issue May 23, 2017

@vietj

This comment has been minimized.

Show comment
Hide comment
@vietj

vietj May 23, 2017

Contributor

I think one behavior could be that there should be an handler on the stream to start the poll, i.e

  • set handler, call assign : start to poll
  • call assign, call seek, set handler : start to poll

that's what we do in other case and it is actually more compatible with RxJava extension behavior

Contributor

vietj commented May 23, 2017

I think one behavior could be that there should be an handler on the stream to start the poll, i.e

  • set handler, call assign : start to poll
  • call assign, call seek, set handler : start to poll

that's what we do in other case and it is actually more compatible with RxJava extension behavior

@ppatierno

This comment has been minimized.

Show comment
Hide comment
@ppatierno

ppatierno May 23, 2017

Member

@vietj can you provide an example on where this is used ? Does it mean that the status of a handler (set or not) is the trigger for starting to poll ?

Member

ppatierno commented May 23, 2017

@vietj can you provide an example on where this is used ? Does it mean that the status of a handler (set or not) is the trigger for starting to poll ?

@vietj

This comment has been minimized.

Show comment
Hide comment
@vietj

vietj May 23, 2017

Contributor

yes this shall be used.

ReadStream don't have notion of subscription, so this is what is used for cold streams.

You can look at AsyncFile for instance.

Contributor

vietj commented May 23, 2017

yes this shall be used.

ReadStream don't have notion of subscription, so this is what is used for cold streams.

You can look at AsyncFile for instance.

@doernemt

This comment has been minimized.

Show comment
Hide comment
@doernemt

doernemt May 23, 2017

Contributor

I agree with @tombentley 's problem description. We have exactly that use case and work around it by first assigning a' dev/null' handler, then calling subscribe. The partitionAssignHandler code then does seekToEnd and only after that we replace the handler by the actual logic we want to be executed on new records...

So, @vietj's proposal fits perfectly for our case.

Contributor

doernemt commented May 23, 2017

I agree with @tombentley 's problem description. We have exactly that use case and work around it by first assigning a' dev/null' handler, then calling subscribe. The partitionAssignHandler code then does seekToEnd and only after that we replace the handler by the actual logic we want to be executed on new records...

So, @vietj's proposal fits perfectly for our case.

tombentley added a commit to tombentley/vertx-kafka-client that referenced this issue May 24, 2017

Fix vert-x3#46
Allow `subscribe()` and `assign()` to be called with a null `recordHandler()`, but check in `schedule()` that we have a non-null handler before doing the work.

When setting a handler we now have to call `schedule()`.

tombentley added a commit to tombentley/vertx-kafka-client that referenced this issue May 24, 2017

Test for vert-x3#46
It shouldn't matter which way around we call `assign()` and `handler()`, we only consume messages once both have been called.

tombentley added a commit to tombentley/vertx-kafka-client that referenced this issue May 24, 2017

@tombentley tombentley referenced this issue May 24, 2017

Merged

Issue 46 #47

@tombentley

This comment has been minimized.

Show comment
Hide comment
@tombentley

tombentley May 24, 2017

Contributor

I may be wrong @doernemt, but FTR I think that workaround might not strictly work. The reason is that the consumer has an internal buffer of messages fetched from Kafka, so I think the following sequence is possible, at least in theory:

  1. Fetch messages form Kafka, let's say we get 100 messages, these are added to the internal buffer.
  2. Part way through processing those messages the seekToEnd happens and a new record handler is installed, but the buffer still has messages.
  3. So the newly installed record handler sees the last 50 messages from the initial fetch, before the second fetch (where the seekToEnd takes effect) happens.
Contributor

tombentley commented May 24, 2017

I may be wrong @doernemt, but FTR I think that workaround might not strictly work. The reason is that the consumer has an internal buffer of messages fetched from Kafka, so I think the following sequence is possible, at least in theory:

  1. Fetch messages form Kafka, let's say we get 100 messages, these are added to the internal buffer.
  2. Part way through processing those messages the seekToEnd happens and a new record handler is installed, but the buffer still has messages.
  3. So the newly installed record handler sees the last 50 messages from the initial fetch, before the second fetch (where the seekToEnd takes effect) happens.
@doernemt

This comment has been minimized.

Show comment
Hide comment
@doernemt

doernemt May 24, 2017

Contributor

You are absolutely right, @tombentley ! We indeed see that effect. We're a few months away from releasing the functionality, that's why that workaround is acceptable. For the launch of the feature, I'd like to have it fixed. Seems like you're currently fixing that issue, so thanks ;) The sequence assign -> seek -> setHandler would apply to our use case.

Contributor

doernemt commented May 24, 2017

You are absolutely right, @tombentley ! We indeed see that effect. We're a few months away from releasing the functionality, that's why that workaround is acceptable. For the launch of the feature, I'd like to have it fixed. Seems like you're currently fixing that issue, so thanks ;) The sequence assign -> seek -> setHandler would apply to our use case.

@ppatierno

This comment has been minimized.

Show comment
Hide comment
@ppatierno

ppatierno May 24, 2017

Member

I have a couple of doubts/questions here .. because maybe I miss understood "a new record handler is installed" phrase on point 2.
Let's imagine that a consumer call assign() and then setHandler() ... it starts to consume records from Kafka from offset 0 for example. Let's imagine that the stream has 1000 records but the first fetch returns 100 records. Based on some logic in the application, when the record 50 is reached, the application decides to call seekToEnd(), now :

  • I don't need to set a new handler right ? I already have a handler installed for processing data ...
  • can we purge the internal buffer when a seek operation is executed to avoid the effect you are talking about ? (for sure I'm missing something because the solution seems to be too much trivial ;))

@tombentley maybe you can elaborate your example ?

Member

ppatierno commented May 24, 2017

I have a couple of doubts/questions here .. because maybe I miss understood "a new record handler is installed" phrase on point 2.
Let's imagine that a consumer call assign() and then setHandler() ... it starts to consume records from Kafka from offset 0 for example. Let's imagine that the stream has 1000 records but the first fetch returns 100 records. Based on some logic in the application, when the record 50 is reached, the application decides to call seekToEnd(), now :

  • I don't need to set a new handler right ? I already have a handler installed for processing data ...
  • can we purge the internal buffer when a seek operation is executed to avoid the effect you are talking about ? (for sure I'm missing something because the solution seems to be too much trivial ;))

@tombentley maybe you can elaborate your example ?

@tombentley

This comment has been minimized.

Show comment
Hide comment
@tombentley

tombentley May 24, 2017

Contributor

@ppatierno using your example:

  • you wouldn't need to set a new handler,
  • because of the internal buffer (with or without PR #47) the handler would see the remaining 49 records before it saw those it was expecting to see (those from the seek()ed to offset),
  • When enable.auto.commit=false purging the buffer when seek() was called should work. But the situation is not clear when the consumer is configured with enable.auto.commit=true.

The documentation for enable.auto.commit is

If true the consumer's offset will be periodically committed in the background.

This doesn't sound like the offset commit happens at the same time as poll() (nor that it is affected by seek(). So in this situation I'm not sure it's really possible to provide well-defined semantics. But then if you were using enable.auto.commit=true you wouldn't have those well-defined semantics with the native Consumer either, so I don't think we've lost anything that the native Consumer offers.

Contributor

tombentley commented May 24, 2017

@ppatierno using your example:

  • you wouldn't need to set a new handler,
  • because of the internal buffer (with or without PR #47) the handler would see the remaining 49 records before it saw those it was expecting to see (those from the seek()ed to offset),
  • When enable.auto.commit=false purging the buffer when seek() was called should work. But the situation is not clear when the consumer is configured with enable.auto.commit=true.

The documentation for enable.auto.commit is

If true the consumer's offset will be periodically committed in the background.

This doesn't sound like the offset commit happens at the same time as poll() (nor that it is affected by seek(). So in this situation I'm not sure it's really possible to provide well-defined semantics. But then if you were using enable.auto.commit=true you wouldn't have those well-defined semantics with the native Consumer either, so I don't think we've lost anything that the native Consumer offers.

@tombentley

This comment has been minimized.

Show comment
Hide comment
@tombentley

tombentley May 24, 2017

Contributor

Of course it's not simple to purge the buffer given that we can be subscribed/assigned to >1 topic, but they're all in the same buffer.

Contributor

tombentley commented May 24, 2017

Of course it's not simple to purge the buffer given that we can be subscribed/assigned to >1 topic, but they're all in the same buffer.

@tombentley

This comment has been minimized.

Show comment
Hide comment
@tombentley

tombentley May 25, 2017

Contributor

Let's list the API methods which should have this logic applied:

  • seek(TopicPartition) -- we need to filter out the buffered messages in the given TopicPartition until the next call to poll()
  • seekToBeginning(Set<TopicPartition>) -- we need to filter out the buffered messages in the given TopicPartitions until the next call to poll()
  • seekToEnd(Set<TopicPartition>) -- we need to filter out the buffered messages in the given TopicPartitions until the next call to poll()
  • assign(Set<TopicPartition>) -- we need to filter out messages in TopicPartions in the old assignment, but not in the new assignment, until the next call to poll()
  • subscribe(Set<String>) -- we need to filter out messages in TopicPartions in the old subscription, but not in the new subscription, until the next call to poll()

pause(Set<TopicPartition>) and resume(Set<TopicPartition>) are hard though, because these aren't simply about filtering. The client might later resume() either before the next call to poll(), or after it. In either case the messages in the buffer for that partition can't just be filtered out, they need to be kept. So we need per-partition Iterators. I think it would be complicated to provide an API that guarantees that once the pause() handler has been called that the recordHandler won't see any messages from the paused partitions. It's certainly not impossible, but it's worth asking whether that guarantee is worth the complexity cost.

Contributor

tombentley commented May 25, 2017

Let's list the API methods which should have this logic applied:

  • seek(TopicPartition) -- we need to filter out the buffered messages in the given TopicPartition until the next call to poll()
  • seekToBeginning(Set<TopicPartition>) -- we need to filter out the buffered messages in the given TopicPartitions until the next call to poll()
  • seekToEnd(Set<TopicPartition>) -- we need to filter out the buffered messages in the given TopicPartitions until the next call to poll()
  • assign(Set<TopicPartition>) -- we need to filter out messages in TopicPartions in the old assignment, but not in the new assignment, until the next call to poll()
  • subscribe(Set<String>) -- we need to filter out messages in TopicPartions in the old subscription, but not in the new subscription, until the next call to poll()

pause(Set<TopicPartition>) and resume(Set<TopicPartition>) are hard though, because these aren't simply about filtering. The client might later resume() either before the next call to poll(), or after it. In either case the messages in the buffer for that partition can't just be filtered out, they need to be kept. So we need per-partition Iterators. I think it would be complicated to provide an API that guarantees that once the pause() handler has been called that the recordHandler won't see any messages from the paused partitions. It's certainly not impossible, but it's worth asking whether that guarantee is worth the complexity cost.

tombentley added a commit to tombentley/vertx-kafka-client that referenced this issue May 25, 2017

@tombentley

This comment has been minimized.

Show comment
Hide comment
@tombentley

tombentley May 25, 2017

Contributor

I've pushed further changes (and tests) which provide the guarantee that once the handler of the above mentioned methods has been called, the record handler won't see messages in affected partitions.

The buffer can thus be accessed from two threads, the event loop thread and the consumer thread. I believe there is currently a race condition between the thread consuming messages and threads calling flush, but the patches give an idea of what's involved to provide this guarantee.

Is this guarantee desirable?

Contributor

tombentley commented May 25, 2017

I've pushed further changes (and tests) which provide the guarantee that once the handler of the above mentioned methods has been called, the record handler won't see messages in affected partitions.

The buffer can thus be accessed from two threads, the event loop thread and the consumer thread. I believe there is currently a race condition between the thread consuming messages and threads calling flush, but the patches give an idea of what's involved to provide this guarantee.

Is this guarantee desirable?

@tombentley

This comment has been minimized.

Show comment
Hide comment
@tombentley

tombentley May 26, 2017

Contributor

I'm going to open a separate issue for this further problem raised by @ppatierno, and amend the existing PR #47 to just address the initial problem described in this issue.

Contributor

tombentley commented May 26, 2017

I'm going to open a separate issue for this further problem raised by @ppatierno, and amend the existing PR #47 to just address the initial problem described in this issue.

@tombentley

This comment has been minimized.

Show comment
Hide comment
@tombentley

tombentley May 26, 2017

Contributor

The PR #47 now addresses this issue only.

Contributor

tombentley commented May 26, 2017

The PR #47 now addresses this issue only.

ppatierno added a commit that referenced this issue May 26, 2017

Merge pull request #47 from tombentley/issue-46
[#46] assign() then seek() with implicit consumption

tombentley added a commit to strimzi/strimzi-kafka-bridge that referenced this issue May 26, 2017

tombentley added a commit to strimzi/strimzi-kafka-bridge that referenced this issue May 26, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment