-
Notifications
You must be signed in to change notification settings - Fork 144
Open
Description
When I execute the following code, an error occurs
fn dump_metadata() -> Result<(), String> {
let mut client = KafkaClient::new(vec!["172.16.1.118:9092".to_string()]);
client.load_metadata_all().map_err(|e| e.to_string())?;
let topics = {
let topics = client.topics();
let mut names = Vec::with_capacity(topics.len());
for topic in topics.names() {
names.push(topic.to_owned());
}
names.sort();
names
};
// ~ fetch the topics' earliest and latest offsets
let (topic_width, offsets) = {
let mut topic_width = 0;
let mut m = HashMap::with_capacity(topics.len());
let mut offsets = client
.fetch_offsets(&topics, FetchOffset::Latest)
.map_err(|e| e.to_string())?;
for (topic, offsets) in offsets {
topic_width = cmp::max(topic_width, topic.len());
let mut offs = Vec::with_capacity(offsets.len());
{
info!("topic:{}", topic);
let num_partitions = client
.topics()
.partitions(&topic)
.map(|ps| ps.len())
.unwrap_or(0);
for _ in 0..num_partitions {
offs.push(Offsets::default());
}
}
for offset in offsets {
offs[offset.partition as usize].latest = offset.offset;
}
m.insert(topic, offs);
}
offsets = client
.fetch_offsets(&topics, FetchOffset::Earliest)
.map_err(|e| e.to_string())?;
for (topic, offsets) in offsets {
let offs = m.get_mut(&topic).expect("unknown topic");
for offset in offsets {
offs[offset.partition as usize].earliest = offset.offset;
}
}
(topic_width + 2, m)
};
Ok(())
}The error message is:
failed to fill whole bufferMetadata
Metadata
Assignees
Labels
No labels