Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible for consumer to seek to a given offset in the topic? #1

Closed
varunpatro opened this issue Jul 5, 2017 · 12 comments
Closed

Possible for consumer to seek to a given offset in the topic? #1

varunpatro opened this issue Jul 5, 2017 · 12 comments

Comments

@varunpatro
Copy link

Something like this in the Java API:

https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)

and in librdkafka:

https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.c#L1735

@mfontanini
Copy link
Owner

That function is listed in the "legacy consumer" part of librdkafka's code. Do you know if it works with the high level consumer in librdkafka?

Otherwise, you can always use Consumer::assign passing the offsets for every partition.

@varunpatro
Copy link
Author

Sorry, I'm not sure if that works in the high level consumer.

I did try using Consumer::assign but it fails.

Here's what I did:

auto tp = TopicPartition("test", 0);
tp.set_offset(TopicPartition::OFFSET_END);

cout << tp.get_partition() << tp.get_offset() << endl; // this gives me 0 and -1

consumer.assign({tp});

auto tpl = consumer.get_assignment();
auto offset = consumer.get_offsets(tpl[0]);
count << get<0>(offset) << std<1>(offset) << endl; // this gives me -1001 and -1001

Questions:

  1. I need to get the latest position of a message in a given topic. How do I do this?
  2. I need to seek to a given position in a topic. How do I do this?

@mfontanini
Copy link
Owner

Can't you just subscribe to the topic and use auto.offset.reset to latest? Using assign with a specific offset should work, there's nothing that cppkafka is doing besides forwarding that call to librdkafka.

Also, what do you mean "it fails"?

@mfontanini
Copy link
Owner

I've used this on some application I wrote and it works as expected. Did you manage to make it work? Just assign and you'll start getting messages for that offset.

@mfontanini
Copy link
Owner

mfontanini commented Sep 15, 2017

Closing as this should work as expected. If you couldn't make it work, please re-open.

azat added a commit to azat-archive/cppkafka that referenced this issue May 19, 2018
…ck())

Stack traces:

  (gdb)
  #0  0x000055697b8f2140 in ?? ()
  mfontanini#1  0x00007f10a393a514 in cppkafka::log_callback_proxy(rd_kafka_s const*, int, char const*, char const*) () from /usr/lib/libcppkafka.so.0.1

  (gdb) bt
  #0  0x00007f10a0d44a0d in __GI___pthread_timedjoin_ex (threadid=139697179305728, thread_return=0x7ffc7f574f70, abstime=0x0, block=<op
  timized out>) at pthread_join_common.c:89
  mfontanini#1  0x00007f10a36d1640 in ?? () from /usr/lib/x86_64-linux-gnu/librdkafka.so.1
  mfontanini#2  0x00007f10a36718ff in rd_kafka_destroy () from /usr/lib/x86_64-linux-gnu/librdkafka.so.1
  mfontanini#3  0x00007f10a47f79af in __base_dtor  () at /usr/include/c++/7/bits/unique_ptr.h:268
  mfontanini#4  __base_dtor  (this=0x55697b8ed260) at /usr/include/cppkafka/kafka_handle_base.h:63
azat added a commit to azat-archive/cppkafka that referenced this issue May 19, 2018
…ck())

Configuration should exist for logging, since it contains the copy of
the callback.

Stack traces:

  (gdb)
  #0  0x000055697b8f2140 in ?? ()
  mfontanini#1  0x00007f10a393a514 in cppkafka::log_callback_proxy(rd_kafka_s const*, int, char const*, char const*) () from /usr/lib/libcppkafka.so.0.1

  (gdb) bt
  #0  0x00007f10a0d44a0d in __GI___pthread_timedjoin_ex (threadid=139697179305728, thread_return=0x7ffc7f574f70, abstime=0x0, block=<op
  timized out>) at pthread_join_common.c:89
  mfontanini#1  0x00007f10a36d1640 in ?? () from /usr/lib/x86_64-linux-gnu/librdkafka.so.1
  mfontanini#2  0x00007f10a36718ff in rd_kafka_destroy () from /usr/lib/x86_64-linux-gnu/librdkafka.so.1
  mfontanini#3  0x00007f10a47f79af in __base_dtor  () at /usr/include/c++/7/bits/unique_ptr.h:268
  mfontanini#4  __base_dtor  (this=0x55697b8ed260) at /usr/include/cppkafka/kafka_handle_base.h:63
azat added a commit to azat-archive/cppkafka that referenced this issue May 19, 2018
…ck())

Configuration should exist for logging, since it contains the copy of
the callback.

Stack traces:

  (gdb)
  #0  0x000055697b8f2140 in ?? ()
  mfontanini#1  0x00007f10a393a514 in cppkafka::log_callback_proxy(rd_kafka_s const*, int, char const*, char const*) () from /usr/lib/libcppkafka.so.0.1

  (gdb) bt
  #0  0x00007f10a0d44a0d in __GI___pthread_timedjoin_ex (threadid=139697179305728, thread_return=0x7ffc7f574f70, abstime=0x0, block=<op
  timized out>) at pthread_join_common.c:89
  mfontanini#1  0x00007f10a36d1640 in ?? () from /usr/lib/x86_64-linux-gnu/librdkafka.so.1
  mfontanini#2  0x00007f10a36718ff in rd_kafka_destroy () from /usr/lib/x86_64-linux-gnu/librdkafka.so.1
  mfontanini#3  0x00007f10a47f79af in __base_dtor  () at /usr/include/c++/7/bits/unique_ptr.h:268
  mfontanini#4  __base_dtor  (this=0x55697b8ed260) at /usr/include/cppkafka/kafka_handle_base.h:63
@EvilBeaver
Copy link
Contributor

Same issue.

cppkafka::TopicPartitionList assignments;
offset = TopicPartition::OFFSET_END;
assignments.push_back({topicName, partition, offset});
consumer->assign(assignments);

cout << consumer->get_offsets(consumer->c->get_assignment()[0]);

always gives an INVALID_OFFSET. And consumer doesn't consume anything.

@mfontanini
Copy link
Owner

If this were to be an issue, it would be on rdkafka's side. cppkafka is just a very thin layer on top of it.

Have you checked the documentation for that call in rdkafka's side? Quoting it:

If there is no cached offset (either low or high, or both) then RD_KAFKA_OFFSET_INVALID will be returned for the respective offset.

It is possible you're querying the offset too quickly and it's still unknown to the library.

And consumer doesn't consume anything.

Are you producing messages on that topic/partition? Your consumer will be pointing to the end of the log so you need to produce something after you subscribe for the consumer to receive something.

@EvilBeaver
Copy link
Contributor

Of course I'm producing on this topic/partition. And bundled kafka console-consumer can read messages. My consumer can't

@mfontanini
Copy link
Owner

I've just tried this locally using rdkafka v0.11.4 and it works as expected. I do get INVALID_OFFSET after querying right after calling assign but then I get messages and if I query for the offsets again after I get a message, the query does return the (high) offset correctly.

Are you using the latest version of rdkafka? Again, cppkafka is a super thin layer on top of rdkafka so odds are if there's actually an issue, it's on rdkafka's side and you should create a ticket on their repo.

@EvilBeaver
Copy link
Contributor

Okay, thank you very much! I'll continue digging things

@shurazs
Copy link

shurazs commented Jan 20, 2020

As far as I could understand, there is a recommendation to use function assign instead of seek.
But assign is quite heavy because are new|delete in its realization.
Is it possible to implement seek function?

@filimonov
Copy link
Contributor

filimonov commented May 25, 2020

Do you know if it works with the high level consumer in librdkafka?

confluentinc/librdkafka#1925

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants