Skip to content

Commit

Permalink
Removed references from struct methods since they want the ownership …
Browse files Browse the repository at this point in the history
…of objects anyway
  • Loading branch information
spicavigo committed May 14, 2015
1 parent 126bc01 commit edec13c
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 118 deletions.
59 changes: 30 additions & 29 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ impl KafkaClient {
/// ```no_run
/// let mut client = kafka::client::KafkaClient::new(&vec!("localhost:9092".to_string()));
/// ```
pub fn new(hosts: &Vec<String>) -> KafkaClient {
KafkaClient { hosts: hosts.to_vec(), clientid: CLIENTID.to_string(),
pub fn new(hosts: Vec<String>) -> KafkaClient {
KafkaClient { hosts: hosts, clientid: CLIENTID.to_string(),
timeout: DEFAULT_TIMEOUT, ..KafkaClient::default()}
}

Expand All @@ -73,13 +73,13 @@ impl KafkaClient {
/// Resets and loads metadata for all topics.
pub fn load_metadata_all(&mut self) -> Result<()>{
self.reset_metadata();
self.load_metadata(&vec!())
self.load_metadata(vec!())
}

/// Reloads metadata for a list of supplied topics
///
/// returns Result<(), error::Error>
pub fn load_metadata (&mut self, topics: &Vec<String>) -> Result<()>{
pub fn load_metadata (&mut self, topics: Vec<String>) -> Result<()>{
let resp = try!(self.get_metadata(topics));

let mut brokers: HashMap<i32, String> = HashMap::new();
Expand Down Expand Up @@ -113,10 +113,10 @@ impl KafkaClient {
self.topic_brokers.clear();
}

fn get_metadata(&mut self, topics: &Vec<String>) -> Result<protocol::MetadataResponse> {
fn get_metadata(&mut self, topics: Vec<String>) -> Result<protocol::MetadataResponse> {
let correlation = self.next_id();
for host in self.hosts.to_vec() {
let req = protocol::MetadataRequest::new(correlation, &self.clientid, topics);
let req = protocol::MetadataRequest::new(correlation, self.clientid.clone(), topics.to_vec());
match self.get_conn(&host) {
Ok(mut conn) => if self.send_request(&mut conn, req).is_ok() {
return self.get_response::<protocol::MetadataResponse>(&mut conn);
Expand Down Expand Up @@ -147,10 +147,10 @@ impl KafkaClient {
/// ```
/// Returns a vector of (topic, partition offset data).
/// PartitionOffset will contain parition and offset info Or Error code as returned by Kafka.
pub fn fetch_topic_offset(&mut self, topic: &String) -> Result<Vec<(String, Vec<utils::PartitionOffset>)>> {
pub fn fetch_topic_offset(&mut self, topic: String) -> Result<Vec<(String, Vec<utils::PartitionOffset>)>> {
// Doing it like this because HashMap will not return borrow of self otherwise
let partitions = self.topic_partitions
.get(topic)
.get(&topic)
.unwrap_or(&vec!())
.clone();

Expand All @@ -173,25 +173,25 @@ impl KafkaClient {
topic: topic.clone(),
partitions: partitions.to_vec()
});
for tpo in try!(self.fetch_offset(&v, host)) {
for tpo in try!(self.fetch_offset(v, host.clone())) {
res.push(utils::PartitionOffset{partition: tpo.partition, offset: tpo.offset});
}
}
Ok(vec!((topic.clone(), res)))
}

fn fetch_offset(&mut self, topic_partitions: &Vec<utils::TopicPartitions>, host: &String)
fn fetch_offset(&mut self, topic_partitions: Vec<utils::TopicPartitions>, host: String)
-> Result<Vec<utils::TopicPartitionOffset>> {
let correlation = self.next_id();
let req = protocol::OffsetRequest::new_latest(topic_partitions, correlation, &self.clientid);
let req = protocol::OffsetRequest::new_latest(topic_partitions, correlation, self.clientid.clone());

let resp = try!(self.send_receive::<protocol::OffsetRequest, protocol::OffsetResponse>(&host, req));

Ok(resp.get_offsets())

}

fn get_broker(&mut self, topic: &String, partition: i32) -> Option<String> {
fn get_broker(&mut self, topic: &String, partition: &i32) -> Option<String> {
let key = format!("{}-{}", topic, partition);
match self.topic_brokers.get(&key) {
Some(broker) => {
Expand All @@ -215,12 +215,12 @@ impl KafkaClient {
/// let res = client.load_metadata_all();
/// let msgs = client.fetch_messages(&"my-topic".to_string(), 0, 0);
/// ```
pub fn fetch_messages(&mut self, topic: &String, partition: i32, offset: i64) -> Result<Vec<utils::OffsetMessage>>{
pub fn fetch_messages(&mut self, topic: String, partition: i32, offset: i64) -> Result<Vec<utils::OffsetMessage>>{

let host = self.get_broker(topic, partition).unwrap();
let host = self.get_broker(&topic, &partition).unwrap();

let correlation = self.next_id();
let req = protocol::FetchRequest::new_single(topic, partition, offset, correlation, &self.clientid);
let req = protocol::FetchRequest::new_single(topic, partition, offset, correlation, self.clientid.clone());

let resp = try!(self.send_receive::<protocol::FetchRequest, protocol::FetchResponse>(&host, req));
Ok(resp.get_messages()
Expand Down Expand Up @@ -259,48 +259,49 @@ impl KafkaClient {
/// ```
/// The return value will contain topic, partition, offset and error if any
/// OR error:Error
pub fn send_message(&mut self, topic: &String, partition: i32, required_acks: i16,
timeout: i32, message: &Vec<u8>) -> Result<Vec<utils::TopicPartitionOffset>> {
pub fn send_message(&mut self, topic: String, partition: i32, required_acks: i16,
timeout: i32, message: Vec<u8>) -> Result<Vec<utils::TopicPartitionOffset>> {

let host = self.get_broker(topic, partition).unwrap();
let host = self.get_broker(&topic, &partition).unwrap();

let correlation = self.next_id();
let req = protocol::ProduceRequest::new_single(topic, partition, required_acks,
timeout, message, correlation, &self.clientid);
timeout, message, correlation,
self.clientid.clone());

let resp = try!(self.send_receive
::<protocol::ProduceRequest, protocol::ProduceResponse>(&host, req));
Ok(resp.get_response())

}

pub fn commit_offset(&mut self, group: &String, topic: &String,
partition: i32, offset: i64) -> Result<()>{
let host = self.get_broker(topic, partition).unwrap();
pub fn commit_offset(&mut self, group: String, topic: String,
partition: i32, offset: i64) -> Result<()>{
let host = self.get_broker(&topic, &partition).unwrap();

let correlation = self.next_id();


let req = protocol::OffsetCommitRequest::new(group, topic,
&partition, &offset, &"".to_string(), correlation, &self.clientid);
let req = protocol::OffsetCommitRequest::new(group, topic, partition, offset,
"".to_string(), correlation, self.clientid.clone());

try!(self.send_receive
::<protocol::OffsetCommitRequest, protocol::OffsetCommitResponse>(&host, req));

Ok(())
}

pub fn fetch_group_topic_offset(&mut self, group: &String, topic: &String, partition: i32) -> Result<i64> {
let host = self.get_broker(topic, partition).unwrap();
pub fn fetch_group_topic_offset(&mut self, group: String, topic: String, partition: i32) -> Result<i64> {
let host = self.get_broker(&topic, &partition).unwrap();

let correlation = self.next_id();
let req = protocol::OffsetFetchRequest::new(group, &vec!(topic.clone()),
&vec!(partition), correlation, &self.clientid);
let req = protocol::OffsetFetchRequest::new(group, vec!(topic.clone()),
vec!(partition), correlation, self.clientid.clone());

let resp = try!(self.send_receive
::<protocol::OffsetFetchRequest, protocol::OffsetFetchResponse>(&host, req));

Ok(resp.get_offset_partition(topic, &partition))
Ok(resp.get_offset_partition(topic, partition))

}

Expand Down

0 comments on commit edec13c

Please sign in to comment.