Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetching group topic offsets does not work #95

Closed
dead10ck opened this issue Jun 23, 2016 · 13 comments
Closed

Fetching group topic offsets does not work #95

dead10ck opened this issue Jun 23, 2016 · 13 comments

Comments

@dead10ck
Copy link
Collaborator

dead10ck commented Jun 23, 2016

I'm trying to use this lib to calculate the offset lag for a particular group ID. It seems able to fetch topic metadata though KafkaClient::fetch_offsets just fine, but when specifying a group with KafkaClient::fetch_group_topic_offsets, it returns an UnknownTopicOrPartition error in the resulting offset entry.

fn main() {
    ...
    let kafka_config = config.kafka_config;

    let mut client = KafkaClient::new(kafka_config.broker_list.clone());
    client.load_metadata(&kafka_config.topics).unwrap();

    let offsets = client.fetch_offsets(&kafka_config.topics, FetchOffset::Latest)
        .unwrap();
    println!("{:#?}", offsets);

    for topic in kafka_config.topics {
        let group_offsets = client.fetch_group_topic_offsets(&kafka_config.group, &topic)
            .unwrap();
        println!("{:#?}", group_offsets);
    }
}

this prints out

{
    "foo": [
        PartitionOffset {
            offset: Ok(
                299905
            ),
            partition: 3
        },
        PartitionOffset {
            offset: Ok(
                299905
            ),
            partition: 6
        },
    ...
}
[
    TopicPartitionOffset {
        offset: Err(
            Kafka(
                UnknownTopicOrPartition
            )
        ),
        topic: "foo",
        partition: 3
    },
    TopicPartitionOffset {
        offset: Err(
            Kafka(
                UnknownTopicOrPartition
            )
        ),
        topic: "foo",
        partition: 6
    },
    ...
]

These are active Kafka (v0.9) topics that are known to be working with the official Java library, and have committed offsets. It's also worth mentioning that when I create a Consumer and poll messages on the same group ID, it does not find a committed offset for the group, even though it does have one--it falls back to the offset specified by the fallback option.

@xitep
Copy link
Collaborator

xitep commented Jun 24, 2016

it seems to me you're hitting a compatibility issue between different protocol versions. this library uses v0 while your java clients are probably using v1: The protocol says that v0 offsets are stored in zookeeper (so this is where kafka-rust will fetch the offsets from) while your java clients (using v1) are storing the offsets in kafka itself (__consumer_offsets topic.)

since v0 and v1 are identical on the wire, you could try merely replacing API_VERSION with the value 1.

update: hm, seems like this won't work so easily. using v1 my 0.9 brokers are returning NotCoordinatorForConsumerCode now. please keep this ticket open, i might find some time over the weekend to look into it.

@xitep
Copy link
Collaborator

xitep commented Jun 24, 2016

Indeed, kafka-rust's implementation for fetching group offsets is not correct. It just happens to work with the zookeeper based strategy for offset persistence, but breaks when trying to communicate on the v1 protocol with a kafka 0.9 broker. I'll have a look at how to best fix this and try to provide a way such that client code can choose to prefer the v1 protocol.

@xitep xitep added the bug label Jun 24, 2016
@dead10ck
Copy link
Collaborator Author

Does this mean committing won't work either?

@xitep
Copy link
Collaborator

xitep commented Jun 26, 2016

Well, committing offsets from within kafka-rust (and fetching them) right now works on the protocol version v0 which means the offsets will end up in zookeeper. A rust program using kafka-rust therefore will consistently store and load those offsets from the zookeeper instance (which is behind the kafka brokers) and things should be working as expected.

However, if your java clients are using v1 or v2 (which they obviously are) they will not use the zookeeper based storage (but will use kafka's owned storage, i.e. based on the __consumer_offset topic) and, hence, will not see offsets committed by kafka-rust. This is similar to the situation when using two java programs where one uses a new and the other an old version of KafkaConsumer for example; it will probably happen that the one program doesn't see the offsets of the other (even though using the same "groupId".) Once I fix this issue, we should have support for the v0, v1, and v2 offset fetching/committing protocol, and you should be able to successfully read offsets committed by a java client based on the 0.9 kafka client library.

However, the story around offset management got even a bit more complicated with kafka 0.9 with the group membership API (are your java clients using KafkaConsumer#subscribe?), which kafka-rust does not support at the moment. This is something we'll probably be addressing only as part of #92.

BTW: I already had a change to do some proof-of-concept work on this issue and learned that kafka 0.9 will sometimes not provide "old" offsets. It appears to keep offsets only for a defined period of time. If you had a consumer which was inactive for let's say 2 days, it might then tell you there's no committed offsets for that consumer (even though it did some commits; i'm still not 100% sure where the problem lies.) If you want to build a resilient lag monitor for a kafka 0.9 instance, it'd probably be better to directly read the __consumer_offsets topic, in the way projects like KafkaOffsetMonitor or kafka-manager do.

@dead10ck
Copy link
Collaborator Author

Ah, I see. Currently, we do not use the subscribe API, though we planned on switching to it at some point.

Reading from the __consumer_offsets topic might work, although I've noticed it's a binary format. Do you know if the format has documentation anywhere, and if it would be difficult to put together a parser?

@xitep
Copy link
Collaborator

xitep commented Jun 27, 2016

i'm sorry, i haven't seen the format documented (but i haven't searched extensively). however, i'm sure you can find it in one of the two mentioned projects; i'm not much familiar with scala, but this seems to be the place where kafka-manager starts reading/processing the topic.

@xitep
Copy link
Collaborator

xitep commented Jun 27, 2016

@dead10ck i just pushed my partial efforts to the branch pete/group-offsets; there's still some work left to do, but it'd be great if you could test the early result on your setup and give me some feedback if things don't work out for you.

you'd need to locally checkout the branch pete/group-offsets-v1 (which hard-codes the v1 protocol version for group offset management) and reference it from within your project using for example:

[dependencies.kafka]
path = "../kafka-rust"

@dead10ck
Copy link
Collaborator Author

@xitep awesome, thank you! However, I am currently on vacation, so I won't be able to test this until next week. I will definitely give it a go next week though!

@xitep
Copy link
Collaborator

xitep commented Jun 30, 2016

just a minor update: i'm dropping the branch pete/group-offsets-v1 in favor of the newly available option to allow clients to choose the offset storage themselves in the branch pete/group-offsets like in the following example:

    use kafka::client::{KafkaClient, GroupOffsetStorage};
    ...
    let mut client = KafkaClient::new(vec!["localhost:9092"]);
    // ~ this is new and will switch the fetch/commit group offset functionality
    // to protocol v1
    // ~ the default is GroupOffsetStorage::Zookeeper for backwards compatibility
    client.set_group_offset_storage(GroupOffsetStorage::Kafka);
    try!(client.load_metadata_all());
    ...

Having that said, the public API should be set in this branch; but there's still some work left to do. Nevertheless, I'll be glad if you had a chance to try it out and provide some feedback.

@dead10ck
Copy link
Collaborator Author

dead10ck commented Jul 1, 2016

@xitep that branch appears to be working perfectly :) it is now fetching offsets correctly from Kafka itself on my 0.9 broker

@xitep
Copy link
Collaborator

xitep commented Jul 1, 2016

that sounds good :) thank you for testing! i still need to handle two more error cases which can occur and can/should be transparently handled by KafkaClient. i'll tackle this over the weekend and after some more testing i'll be merging and releasing it. feel free to shout in case you spot any troubles.

@xitep xitep added the 0.4.x label Jul 2, 2016
@xitep
Copy link
Collaborator

xitep commented Jul 2, 2016

Available in master now; will be released as 0.4 later on crates.io; i'll then close the ticket if everything fine until then.

@xitep
Copy link
Collaborator

xitep commented Jul 12, 2016

Published on crates.io.

@xitep xitep closed this as completed Jul 12, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants