Skip to content

Commit

Permalink
Clarify combine_and_sort_array_results.
Browse files Browse the repository at this point in the history
Add documentation, add a test, and change `multi_shard` to use
0-indexing on the result.
  • Loading branch information
nihohit committed Sep 9, 2023
1 parent 02a88e4 commit 2032d0e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 19 deletions.
3 changes: 2 additions & 1 deletion redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,11 @@ where
let cmd = match routing {
MultipleNodeRoutingInfo::MultiSlot(vec) => {
let mut new_cmd = Cmd::new();
let command_length = 1; // TODO - the +1 should change if we have multi-slot commands with 2 command words.
new_cmd.arg(cmd.arg_idx(0));
let (_, indices) = vec.get(index).unwrap();
for index in indices {
new_cmd.arg(cmd.arg_idx(*index));
new_cmd.arg(cmd.arg_idx(*index + command_length));
}
Arc::new(new_cmd)
}
Expand Down
82 changes: 64 additions & 18 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,30 @@ pub(crate) fn combine_array_results(values: Vec<Value>) -> RedisResult<Value> {
Ok(Value::Bulk(results))
}

/// Combines multiple call results in the `values` field, each assume to be an array of results,
/// into a single array. `sorting_order` defines the order of the results in the returned array -
/// for each array of results, `sorting_order` should contain a matching array with the indices of
/// the results in the final array.
pub(crate) fn combine_and_sort_array_results<'a>(
values: Vec<Value>,
sorting_order: impl IntoIterator<Item = &'a Vec<usize>> + ExactSizeIterator,
sorting_order: impl Iterator<Item = &'a Vec<usize>> + ExactSizeIterator,
) -> RedisResult<Value> {
let mut results = Vec::new();
results.resize(values.len(), Value::Nil);
results.resize(
values.iter().fold(0, |acc, value| match value {
Value::Bulk(values) => values.len() + acc,
_ => 0,
}),
Value::Nil,
);
assert_eq!(values.len(), sorting_order.len());

for (key_indices, value) in sorting_order.into_iter().zip(values) {
match value {
Value::Bulk(values) => {
assert_eq!(values.len(), key_indices.len());
for (index, value) in key_indices.iter().zip(values) {
results[*index - 1] = value;
results[*index] = value;
}
}
_ => {
Expand Down Expand Up @@ -198,8 +208,18 @@ fn get_route(is_readonly: bool, key: &[u8]) -> Route {
}
}

/// Takes the given `routable` and creates a multi-slot routing info.
/// This is used for commands like MSET & MGET, where if the command's keys
/// are hashed to multiple slots, the command should be split into sub-commands,
/// each targetting a single slot. The results of these sub-commands are then
/// usually reassembled using `combine_and_sort_array_results`. In order to do this,
/// `MultipleNodeRoutingInfo::MultiSlot` contains the routes for each sub-command, and
/// the indices in the final combined result for each result from the sub-command.
///
/// If all keys are routed to the same slot, there's no need to split the command,
/// so a single node routing info will be returned.
fn multi_shard<R>(
r: &R,
routable: &R,
cmd: &[u8],
first_key_index: usize,
has_values: bool,
Expand All @@ -209,19 +229,19 @@ where
{
let is_readonly = is_readonly_cmd(cmd);
let mut routes = HashMap::new();
let mut index = first_key_index;
while let Some(key) = r.arg_idx(index) {
let mut key_index = 0;
while let Some(key) = routable.arg_idx(first_key_index + key_index) {
let route = get_route(is_readonly, key);
let entry = routes.entry(route);
let keys = entry.or_insert(Vec::new());
keys.push(index);
keys.push(key_index);

if has_values {
index += 1;
r.arg_idx(index)?; // check that there's a value for the key
keys.push(index);
key_index += 1;
routable.arg_idx(first_key_index + key_index)?; // check that there's a value for the key
keys.push(key_index);
}
index += 1
key_index += 1;
}

let mut routes: Vec<(Route, Vec<usize>)> = routes.into_iter().collect();
Expand Down Expand Up @@ -618,7 +638,7 @@ mod tests {
get_hashtag, slot, MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo,
Slot, SlotAddr, SlotMap,
};
use crate::{cmd, parser::parse_redis_value};
use crate::{cmd, parser::parse_redis_value, Value};

#[test]
fn test_get_hashtag() {
Expand Down Expand Up @@ -846,9 +866,9 @@ mod tests {
cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
let routing = RoutingInfo::for_routable(&cmd);
let mut expected = std::collections::HashMap::new();
expected.insert(Route(4813, SlotAddr::Master), vec![3]);
expected.insert(Route(5061, SlotAddr::Master), vec![2, 4]);
expected.insert(Route(12182, SlotAddr::Master), vec![1]);
expected.insert(Route(4813, SlotAddr::Master), vec![2]);
expected.insert(Route(5061, SlotAddr::Master), vec![1, 3]);
expected.insert(Route(12182, SlotAddr::Master), vec![0]);

assert!(
matches!(routing.clone(), Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::MultiSlot(vec))) if {
Expand All @@ -862,9 +882,9 @@ mod tests {
cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
let routing = RoutingInfo::for_routable(&cmd);
let mut expected = std::collections::HashMap::new();
expected.insert(Route(4813, SlotAddr::Replica), vec![3]);
expected.insert(Route(5061, SlotAddr::Replica), vec![2, 4]);
expected.insert(Route(12182, SlotAddr::Replica), vec![1]);
expected.insert(Route(4813, SlotAddr::Replica), vec![2]);
expected.insert(Route(5061, SlotAddr::Replica), vec![1, 3]);
expected.insert(Route(12182, SlotAddr::Replica), vec![0]);

assert!(
matches!(routing.clone(), Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::MultiSlot(vec))) if {
Expand Down Expand Up @@ -958,4 +978,30 @@ mod tests {
.slot_addr_for_route(&Route::new(2001, SlotAddr::Master))
.is_none());
}

#[test]
fn test_combining_results_into_single_array() {
let res1 = Value::Bulk(vec![Value::Nil, Value::Okay]);
let res2 = Value::Bulk(vec![
Value::Data("1".as_bytes().to_vec()),
Value::Data("4".as_bytes().to_vec()),
]);
let res3 = Value::Bulk(vec![Value::Status("2".to_string()), Value::Int(3)]);
let results = super::combine_and_sort_array_results(
vec![res1, res2, res3],
[vec![0, 5], vec![1, 4], vec![2, 3]].iter(),
);

assert_eq!(
results.unwrap(),
Value::Bulk(vec![
Value::Nil,
Value::Data("1".as_bytes().to_vec()),
Value::Status("2".to_string()),
Value::Int(3),
Value::Data("4".as_bytes().to_vec()),
Value::Okay,
])
);
}
}

0 comments on commit 2032d0e

Please sign in to comment.