Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions redis/src/cluster_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn is_illegal_cmd(cmd: &str) -> bool {
"KEYS" |
"LASTSAVE" |
"MGET" | "MOVE" | "MSET" | "MSETNX" |
"PFMERGE" | "PFCOUNT" | "PING" | "PUBLISH" |
"PING" | "PUBLISH" |
"RANDOMKEY" | "RENAME" | "RENAMENX" | "RPOPLPUSH" |
"SAVE" | "SCAN" |
// All commands that start with "SCRIPT"
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct ClusterPipeline {
/// KEYS
/// LASTSAVE
/// MGET, MOVE, MSET, MSETNX
/// PFMERGE, PFCOUNT, PING, PUBLISH
/// PING, PUBLISH
/// RANDOMKEY, RENAME, RENAMENX, RPOPLPUSH
/// SAVE, SCAN, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, SDIFF, SDIFFSTORE,
/// SENTINEL GET MASTER ADDR BY NAME, SENTINEL MASTER, SENTINEL MASTERS, SENTINEL MONITOR,
Expand Down
105 changes: 105 additions & 0 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,45 @@ where
})
}

/// Takes the given `routable` with possibly multiple keys and creates a single-slot routing info.
/// This is used for commands like PFCOUNT or PFMERGE, where it is required that the command's keys
/// are hashed to the same slots and there is no way how the command might be split on the client.
///
/// If all keys are not routed to the same slot, `None` variant is returned and invoking of such
/// command fails with UNROUTABLE_ERROR.
fn multiple_keys_same_slot<R>(
routable: &R,
cmd: &[u8],
first_key_index: usize,
) -> Option<RoutingInfo>
where
R: Routable + ?Sized,
{
let is_readonly = is_readonly_cmd(cmd);
let mut slots = HashSet::new();
let mut key_index = 0;
while let Some(key) = routable.arg_idx(first_key_index + key_index) {
slots.insert(get_slot(key));
key_index += 1;
}

if slots.len() != 1 {
return None;
}

let slot = slots.into_iter().next().unwrap();
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(
slot,
if is_readonly {
SlotAddr::ReplicaOptional
} else {
SlotAddr::Master
},
)),
))
}

impl ResponsePolicy {
/// Parse the command for the matching response policy.
pub fn for_command(cmd: &[u8]) -> Option<ResponsePolicy> {
Expand Down Expand Up @@ -473,6 +512,7 @@ impl RoutingInfo {

b"MGET" | b"DEL" | b"EXISTS" | b"UNLINK" | b"TOUCH" => multi_shard(r, cmd, 1, false),
b"MSET" => multi_shard(r, cmd, 1, true),
b"PFCOUNT" | b"PFMERGE" => multiple_keys_same_slot(r, cmd, 1),
// TODO - special handling - b"SCAN"
b"SCAN" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF" | b"MOVE" | b"BITOP" => None,
b"EVALSHA" | b"EVAL" => {
Expand Down Expand Up @@ -1107,6 +1147,71 @@ mod tests {
);
}

#[test]
fn test_multiple_keys_same_slot() {
// single key
let mut cmd = crate::cmd("PFCOUNT");
cmd.arg("hll-1");
let routing = RoutingInfo::for_routable(&cmd);
assert!(matches!(
routing,
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route(_, SlotAddr::ReplicaOptional))
))
));

let mut cmd = crate::cmd("PFMERGE");
cmd.arg("hll-1");
let routing = RoutingInfo::for_routable(&cmd);
assert!(matches!(
routing,
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route(_, SlotAddr::Master))
))
));

// multiple keys
let mut cmd = crate::cmd("PFCOUNT");
cmd.arg("{hll}-1").arg("{hll}-2");
let routing = RoutingInfo::for_routable(&cmd);
assert!(matches!(
routing,
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route(_, SlotAddr::ReplicaOptional))
))
));

let mut cmd = crate::cmd("PFMERGE");
cmd.arg("{hll}-1").arg("{hll}-2");
let routing = RoutingInfo::for_routable(&cmd);
assert!(matches!(
routing,
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route(_, SlotAddr::Master))
))
));

// same-slot violation
let mut cmd = crate::cmd("PFCOUNT");
cmd.arg("hll-1").arg("hll-2");
let routing = RoutingInfo::for_routable(&cmd);
assert!(routing.is_none());

let mut cmd = crate::cmd("PFMERGE");
cmd.arg("hll-1").arg("hll-2");
let routing = RoutingInfo::for_routable(&cmd);
assert!(routing.is_none());

// missing keys
let cmd = crate::cmd("PFCOUNT");
let routing = RoutingInfo::for_routable(&cmd);
assert!(routing.is_none());

let cmd = crate::cmd("PFMERGE");
let routing = RoutingInfo::for_routable(&cmd);
assert!(routing.is_none());
}

#[test]
fn test_command_creation_for_multi_shard() {
let mut original_cmd = cmd("DEL");
Expand Down
Loading