Skip to content

Commit

Permalink
cluster: move slots response parsing logic to get_slots function
Browse files Browse the repository at this point in the history
- rename get_slots to parse_slots_response
- move coverage checks & Slot creation logic into this fn
- move CLUSTER SLOTS query logic out of this fn

- this to easily reuse this function in the Async impl
  • Loading branch information
utkarshgupta137 committed Aug 31, 2022
1 parent bf8217f commit e98553b
Showing 1 changed file with 107 additions and 116 deletions.
223 changes: 107 additions & 116 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,7 @@ impl ClusterConnection {
// Query a node to discover slot-> master mappings.
fn refresh_slots(&self) -> RedisResult<()> {
let mut slots = self.slots.borrow_mut();
*slots = self.create_new_slots(|slot_data| {
let replica =
if !self.cluster_params.read_from_replicas || slot_data.replicas().is_empty() {
slot_data.master().to_string()
} else {
slot_data
.replicas()
.choose(&mut thread_rng())
.unwrap()
.to_string()
};

[slot_data.master().to_string(), replica]
})?;
*slots = self.create_new_slots()?;

let mut nodes = slots.values().flatten().collect::<Vec<_>>();
nodes.sort_unstable();
Expand Down Expand Up @@ -235,49 +222,19 @@ impl ClusterConnection {
Ok(())
}

fn create_new_slots<F>(&self, mut get_addr: F) -> RedisResult<SlotMap>
where
F: FnMut(&Slot) -> [String; 2],
{
fn create_new_slots(&self) -> RedisResult<SlotMap> {
let mut cmd = Cmd::new();
cmd.arg("CLUSTER").arg("SLOTS");

let mut connections = self.connections.borrow_mut();
let mut new_slots = None;
let mut rng = thread_rng();
let len = connections.len();
let mut samples = connections.values_mut().choose_multiple(&mut rng, len);

for conn in samples.iter_mut() {
if let Ok(mut slots_data) = get_slots(conn, &self.cluster_params) {
slots_data.sort_by_key(|s| s.start());
let last_slot = slots_data.iter().try_fold(0, |prev_end, slot_data| {
if prev_end != slot_data.start() {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
format!(
"Received overlapping slots {} and {}..{}",
prev_end,
slot_data.start(),
slot_data.end()
),
)));
}
Ok(slot_data.end() + 1)
})?;

if last_slot != SLOT_SIZE {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
format!("Lacks the slots >= {}", last_slot),
)));
}

new_slots = Some(
slots_data
.iter()
.map(|slot_data| (slot_data.end(), get_addr(slot_data)))
.collect(),
);
if let Ok(Value::Bulk(response)) = cmd.query(conn) {
new_slots = Some(parse_slots_response(response, &self.cluster_params)?);
break;
}
}
Expand Down Expand Up @@ -667,83 +624,117 @@ fn get_random_connection<'a>(
(addr, con)
}

// Get slot data from connection.
fn get_slots(
connection: &mut Connection,
// Parse `CLUSTER SLOTS` response into SlotMap.
fn parse_slots_response(
response: Vec<Value>,
cluster_params: &ClusterParams,
) -> RedisResult<Vec<Slot>> {
let mut cmd = Cmd::new();
cmd.arg("CLUSTER").arg("SLOTS");
let value = connection.req_command(&cmd)?;

// Parse response.
let mut result = Vec::with_capacity(2);

if let Value::Bulk(items) = value {
let mut iter = items.into_iter();
while let Some(Value::Bulk(item)) = iter.next() {
if item.len() < 3 {
continue;
}

let start = if let Value::Int(start) = item[0] {
start as u16
} else {
continue;
};

let end = if let Value::Int(end) = item[1] {
end as u16
} else {
continue;
};
) -> RedisResult<SlotMap> {
let mut slots = Vec::with_capacity(2);
let mut items = response.into_iter();
while let Some(Value::Bulk(item)) = items.next() {
if item.len() < 3 {
continue;
}

let mut nodes: Vec<String> = item
.into_iter()
.skip(2)
.filter_map(|node| {
if let Value::Bulk(node) = node {
if node.len() < 2 {
return None;
}
let start = if let Value::Int(start) = item[0] {
start as u16
} else {
continue;
};
let end = if let Value::Int(end) = item[1] {
end as u16
} else {
continue;
};

let ip = if let Value::Data(ref ip) = node[0] {
String::from_utf8_lossy(ip)
} else {
return None;
};
if ip.is_empty() {
return None;
}
let mut nodes: Vec<String> = item
.into_iter()
.skip(2)
.filter_map(|node| {
if let Value::Bulk(node) = node {
if node.len() < 2 {
return None;
}

let port = if let Value::Int(port) = node[1] {
port as u16
} else {
return None;
};
Some(
get_connection_addr(
(ip.into_owned(), port),
cluster_params.tls_insecure,
)
.to_string(),
)
let ip = if let Value::Data(ref ip) = node[0] {
String::from_utf8_lossy(ip)
} else {
None
return None;
};
if ip.is_empty() {
return None;
}
})
.collect();

if nodes.is_empty() {
continue;
}
let port = if let Value::Int(port) = node[1] {
port as u16
} else {
return None;
};
Some(
get_connection_addr((ip.into_owned(), port), cluster_params.tls_insecure)
.to_string(),
)
} else {
None
}
})
.collect();

let replicas = nodes.split_off(1);
result.push(Slot::new(start, end, nodes.pop().unwrap(), replicas));
if nodes.is_empty() {
continue;
}

let replicas = nodes.split_off(1);
slots.push(Slot::new(start, end, nodes.pop().unwrap(), replicas));
}

Ok(result)
slots.sort_unstable_by_key(|s| s.start());

let last_slot = slots.iter().try_fold(0, |prev_end, slot_data| {
if prev_end != slot_data.start() {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
format!(
"Received overlapping slots {} and {}..{}",
prev_end,
slot_data.start(),
slot_data.end()
),
)));
}
Ok(slot_data.end() + 1)
})?;

if last_slot != SLOT_SIZE {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
format!("Lacks the slots >= {}", last_slot),
)));
}

Ok(slots
.iter()
.map(|slot_data| {
let nodes = {
let replica =
if !cluster_params.read_from_replicas || slot_data.replicas().is_empty() {
slot_data.master().to_string()
} else {
slot_data
.replicas()
.choose(&mut thread_rng())
.unwrap()
.to_string()
};

[slot_data.master().to_string(), replica]
};

(slot_data.end(), nodes)
})
.collect())
}

fn get_connection_info(node: &str, cluster_params: &ClusterParams) -> ConnectionInfo {
Expand Down

0 comments on commit e98553b

Please sign in to comment.