Skip to content

Commit

Permalink
Avoid allocations while choosing partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Novotnik, Petr committed Sep 30, 2015
1 parent e0ab5db commit 62535d6
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,23 @@ impl KafkaClient {
}
}

fn choose_partition(&mut self, topic: &String) -> Option<i32> {
fn choose_partition(&mut self, topic: &str) -> Option<i32> {
// XXX why doing the lookup twice if once suffices? we should
// choose a different structure to hold our data to allow us
// doing only one lookup (and avoiding the allocation upon a
// not-yet-existing entry)

match self.topic_partitions.get(topic) {
None => None,
Some(partitions) if partitions.is_empty() => None,
Some(partitions) => {
let plen = partitions.len();
if plen == 0 {
return None;
if let Some(curr) = self.topic_partition_curr.get_mut(topic) {
*curr = (*curr+1) % partitions.len() as i32;
return Some(*curr);
}

let curr = self.topic_partition_curr.entry(topic.clone()).or_insert(0);
*curr = (*curr+1) % plen as i32;
Some(*curr)
},
None => None
self.topic_partition_curr.insert(topic.to_owned(), 1);
Some(1)
}
}

}
Expand Down Expand Up @@ -356,17 +360,14 @@ impl KafkaClient {

// Map topic and partition to the corresponding broker
for pm in input {
let partition = self.choose_partition(&pm.topic);
if partition.is_none() {
continue
if let Some(p) = self.choose_partition(&pm.topic) {
self.get_broker(&pm.topic, &p).and_then(|broker| {
let entry = reqs.entry(broker.clone()).or_insert(
protocol::ProduceRequest::new(required_acks, timeout, correlation, self.clientid.clone()));
entry.add(pm.topic.clone(), p.clone(), pm.message.clone());
Some(())
});
}
let p = partition.unwrap();
self.get_broker(&pm.topic, &p).and_then(|broker| {
let entry = reqs.entry(broker.clone()).or_insert(
protocol::ProduceRequest::new(required_acks, timeout, correlation, self.clientid.clone()));
entry.add(pm.topic.clone(), p.clone(), pm.message.clone());
Some(())
});
}

// Call each broker with the request formed earlier
Expand Down

0 comments on commit 62535d6

Please sign in to comment.