Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client_mut function to Consumer #160

Merged
merged 1 commit into from
Aug 1, 2017
Merged

Add client_mut function to Consumer #160

merged 1 commit into from
Aug 1, 2017

Conversation

sevagh
Copy link
Contributor

@sevagh sevagh commented Aug 1, 2017

Lets us get a mut client. This helped me when doing reset_metadata(). The doc string might need to change, whatever you like.

Copy link
Collaborator

@dead10ck dead10ck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, thanks! I'll get it merged when I get a chance.

@dead10ck dead10ck merged commit b4bc444 into kafka-rust:master Aug 1, 2017
@dead10ck
Copy link
Collaborator

dead10ck commented Aug 1, 2017

@sevagh published on crates.io as v0.6.2. Thanks again!

@xitep
Copy link
Collaborator

xitep commented Aug 16, 2017

sorry for the long pause from my side; what exactly is the use-case you are after? in general i believe this change is not "safe". it allows the following (which doesn't really make sense to me):

1. c = Consumer::with_topic("topic-a")...
2. c.poll(); ...
3. { let mut cm = c.client_mut(); cm.reset_metadata(); cm.load_metadata("topic-b"); }
4. c.poll();

The latter (and any following) call to .poll will result in no message - it'll not fail. but the consumer will continue (needlessly) maintaining offsets for "topic-a".

maybe you're use-case/required behaviour can be implemented directly in the consumer (i guess it's about reloading partition re-assignments for the consumed topics) or be exposed directly in the consumer's API. generally, i do not favor client code working with the consumer (or producer) and KafkaClient at the same time. they represent different levels of abstraction that we don't want to couple tightly.

@sevagh
Copy link
Contributor Author

sevagh commented Aug 16, 2017

This is the code:

fn ...(&mut self) -> {
    loop {
            let msgsets = match self.kafka_consumer.poll() {
                Ok(result) => result,
                Err(kafka::error::Error(kafka::error::ErrorKind::Kafka(KafkaCode::NotLeaderForPartition), _)) => {
                    let mut client = self.kafka_consumer.client_mut();
                    client.load_metadata_all()?;
                    continue
                }
                Err(ref e) => {
                    error!("kafka poll error: {}", e);
                    continue;
                }
            };

There's no real way to do this without client_mut(), or without re-creating the consumer in the loop if the topic leader changes.

@xitep
Copy link
Collaborator

xitep commented Aug 22, 2017

@sevagh many thanks for the example! i think this is exactly what #81 is meant to address. until then we can of course keep the client_mut method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants