Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async-Cluster use same routing as Sync-Cluster #789

Merged
merged 2 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 17 additions & 53 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ use std::{

use crate::{
aio::{ConnectionLike, MultiplexedConnection},
parse_redis_url, Arg, Cmd, ConnectionAddr, ConnectionInfo, ErrorKind, IntoConnectionInfo,
cluster_routing::RoutingInfo,
parse_redis_url, Cmd, ConnectionAddr, ConnectionInfo, ErrorKind, IntoConnectionInfo,
RedisError, RedisFuture, RedisResult, Value,
};
use crc16::*;
use futures::{
future::{self, BoxFuture},
prelude::*,
Expand Down Expand Up @@ -214,61 +214,29 @@ impl<C> CmdArg<C> {
}
}

// TODO -- return offset for master/replica to support replica reads:
fn slot(&self) -> Option<u16> {
fn get_cmd_arg(cmd: &Cmd, arg_num: usize) -> Option<&[u8]> {
cmd.args_iter().nth(arg_num).and_then(|arg| match arg {
Arg::Simple(arg) => Some(arg),
Arg::Cursor => None,
})
}

fn position(cmd: &Cmd, candidate: &[u8]) -> Option<usize> {
cmd.args_iter().position(|arg| match arg {
Arg::Simple(arg) => arg.eq_ignore_ascii_case(candidate),
_ => false,
})
}

fn slot_for_command(cmd: &Cmd) -> Option<u16> {
match get_cmd_arg(cmd, 0) {
Some(b"EVAL") | Some(b"EVALSHA") => {
get_cmd_arg(cmd, 2).and_then(|key_count_bytes| {
let key_count_res = std::str::from_utf8(key_count_bytes)
.ok()
.and_then(|key_count_str| key_count_str.parse::<usize>().ok());
key_count_res.and_then(|key_count| {
if key_count > 0 {
get_cmd_arg(cmd, 3).map(slot_for_key)
} else {
// TODO need to handle sending to all masters
None
}
})
})
}
Some(b"XGROUP") => get_cmd_arg(cmd, 2).map(slot_for_key),
Some(b"XREAD") | Some(b"XREADGROUP") => {
let pos = position(cmd, b"STREAMS")?;
get_cmd_arg(cmd, pos + 1).map(slot_for_key)
}
Some(b"SCRIPT") => {
// TODO need to handle sending to all masters
None
}
_ => get_cmd_arg(cmd, 1).map(slot_for_key),
fn slot_for_command(cmd: &Cmd) -> Option<(u16, u16)> {
match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::Random) => None,
Some(RoutingInfo::MasterSlot(slot)) => Some((slot, 0)),
Some(RoutingInfo::ReplicaSlot(slot)) => Some((slot, 1)),
Some(RoutingInfo::AllNodes) | Some(RoutingInfo::AllMasters) => None,
_ => None,
}
}

match self {
Self::Cmd { cmd, .. } => slot_for_command(cmd),
Self::Pipeline { pipeline, .. } => {
Self::Cmd { ref cmd, .. } => slot_for_command(cmd).map(|x| x.0),
Self::Pipeline { ref pipeline, .. } => {
let mut iter = pipeline.cmd_iter();
let slot = iter.next().map(slot_for_command)?;
for cmd in iter {
if slot != slot_for_command(cmd) {
return None;
}
}
slot
slot.map(|x| x.0)
}
}
}
Expand Down Expand Up @@ -1044,13 +1012,9 @@ where
(addr.to_string(), connections.get(addr).unwrap().clone())
}

fn slot_for_key(key: &[u8]) -> u16 {
let key = sub_key(key);
State::<XMODEM>::calculate(key) % SLOT_SIZE as u16
}

// If a key contains `{` and `}`, everything between the first occurence is the only thing that
// determines the hash slot
#[allow(dead_code)]
fn sub_key(key: &[u8]) -> &[u8] {
key.iter()
.position(|b| *b == b'{')
Expand Down Expand Up @@ -1184,9 +1148,9 @@ fn get_password(addr: &str) -> Option<String> {

#[cfg(test)]
mod tests {
use crate::parse_redis_value;

use super::*;
use crate::parse_redis_value;
use crc16::{State, XMODEM};

fn slot_for_packed_command(cmd: &[u8]) -> Option<u16> {
command_key(cmd).map(|key| {
Expand Down
101 changes: 99 additions & 2 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use crate::types::Value;

pub(crate) const SLOT_SIZE: u16 = 16384;

fn slot(key: &[u8]) -> u16 {
crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum RoutingInfo {
AllNodes,
Expand Down Expand Up @@ -58,7 +62,7 @@ impl RoutingInfo {
None => key,
};

let slot = crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE;
let slot = slot(key);
if is_readonly_cmd(cmd) {
RoutingInfo::ReplicaSlot(slot)
} else {
Expand Down Expand Up @@ -174,7 +178,7 @@ fn get_hashtag(key: &[u8]) -> Option<&[u8]> {

#[cfg(test)]
mod tests {
use super::{get_hashtag, RoutingInfo};
use super::{get_hashtag, slot, RoutingInfo};
use crate::{cmd, parser::parse_redis_value};

#[test]
Expand Down Expand Up @@ -257,5 +261,98 @@ mod tests {
RoutingInfo::for_routable(&cmd).unwrap(),
);
}

// Assert expected RoutingInfo explicitly:

for cmd in vec![cmd("FLUSHALL"), cmd("FLUSHDB"), cmd("SCRIPT")] {
assert_eq!(
RoutingInfo::for_routable(&cmd),
Some(RoutingInfo::AllMasters)
);
}

for cmd in vec![
cmd("ECHO"),
cmd("CONFIG"),
cmd("CLIENT"),
cmd("SLOWLOG"),
cmd("DBSIZE"),
cmd("LASTSAVE"),
cmd("PING"),
cmd("INFO"),
cmd("BGREWRITEAOF"),
cmd("BGSAVE"),
cmd("CLIENT LIST"),
cmd("SAVE"),
cmd("TIME"),
cmd("KEYS"),
] {
assert_eq!(RoutingInfo::for_routable(&cmd), Some(RoutingInfo::AllNodes));
}

for cmd in vec![
cmd("SCAN"),
cmd("CLIENT SETNAME"),
cmd("SHUTDOWN"),
cmd("SLAVEOF"),
cmd("REPLICAOF"),
cmd("SCRIPT KILL"),
cmd("MOVE"),
cmd("BITOP"),
] {
assert_eq!(RoutingInfo::for_routable(&cmd), None,);
}

for cmd in vec![
cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0),
cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0),
] {
assert_eq!(RoutingInfo::for_routable(cmd), Some(RoutingInfo::Random));
}

for (cmd, expected) in vec![
(
cmd("EVAL")
.arg(r#"redis.call("GET, KEYS[1]");"#)
.arg(1)
.arg("foo"),
Some(RoutingInfo::MasterSlot(slot(b"foo"))),
),
(
cmd("XGROUP")
.arg("CREATE")
.arg("mystream")
.arg("workers")
.arg("$")
.arg("MKSTREAM"),
Some(RoutingInfo::MasterSlot(slot(b"mystream"))),
),
(
cmd("XINFO").arg("GROUPS").arg("foo"),
Some(RoutingInfo::ReplicaSlot(slot(b"foo"))),
),
(
cmd("XREADGROUP")
.arg("GROUP")
.arg("wkrs")
.arg("consmrs")
.arg("STREAMS")
.arg("mystream"),
Some(RoutingInfo::MasterSlot(slot(b"mystream"))),
),
(
cmd("XREAD")
.arg("COUNT")
.arg("2")
.arg("STREAMS")
.arg("mystream")
.arg("writers")
.arg("0-0")
.arg("0-0"),
Some(RoutingInfo::ReplicaSlot(slot(b"mystream"))),
),
] {
assert_eq!(RoutingInfo::for_routable(cmd), expected,);
}
}
}