From 36fa7e35b619278275dbe83c10f13bfd88f5d6fe Mon Sep 17 00:00:00 2001 From: Utkarsh Gupta Date: Fri, 8 Jul 2022 21:45:55 +0530 Subject: [PATCH] Cluster: Create read_from_replicas option (#635) Send write queries to primaries & read queries to replicas in `read_from_replicas` mode; deprecate the `readonly` param in favour of the new `read_from_replicas` param. --- benches/bench_cluster.rs | 42 +++++++++-- src/cluster.rs | 154 ++++++++++++++++++++++----------------- src/cluster_client.rs | 28 ++++--- src/cluster_routing.rs | 30 +++++--- src/commands.rs | 33 +++++++++ tests/test_cluster.rs | 11 +-- 6 files changed, 201 insertions(+), 97 deletions(-) diff --git a/benches/bench_cluster.rs b/benches/bench_cluster.rs index 6570d8092..9717f8366 100644 --- a/benches/bench_cluster.rs +++ b/benches/bench_cluster.rs @@ -16,7 +16,10 @@ fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterCon let mut group = c.benchmark_group("cluster_basic"); group.bench_function("set", |b| { - b.iter(|| black_box(redis::cmd("SET").arg(key).arg(42).execute(con))) + b.iter(|| { + redis::cmd("SET").arg(key).arg(42).execute(con); + black_box(()) + }) }); group.bench_function("get", |b| { @@ -27,7 +30,12 @@ fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterCon redis::cmd("SET").arg(key).arg(42).execute(con); redis::cmd("DEL").arg(key).execute(con); }; - group.bench_function("set_and_del", |b| b.iter(|| black_box(set_and_del()))); + group.bench_function("set_and_del", |b| { + b.iter(|| { + set_and_del(); + black_box(()) + }) + }); group.finish(); } @@ -47,14 +55,22 @@ fn bench_pipeline(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection pipe.set(q, "bar").ignore(); } }; - group.bench_function("build_pipeline", |b| b.iter(|| black_box(build_pipeline()))); + group.bench_function("build_pipeline", |b| { + b.iter(|| { + build_pipeline(); + black_box(()) + }) + }); let mut pipe = cluster_pipe(); for q in &queries { pipe.set(q, "bar").ignore(); } group.bench_function("query_pipeline", |b| { - b.iter(|| black_box(pipe.query::<()>(con).unwrap())) + b.iter(|| { + pipe.query::<()>(con).unwrap(); + black_box(()) + }) }); group.finish(); @@ -69,5 +85,21 @@ fn bench_cluster_setup(c: &mut Criterion) { bench_pipeline(c, &mut con); } -criterion_group!(cluster_bench, bench_cluster_setup); +#[allow(dead_code)] +fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) { + let cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| { + builder.read_from_replicas() + }); + cluster.wait_for_cluster_up(); + + let mut con = cluster.connection(); + bench_set_get_and_del(c, &mut con); + bench_pipeline(c, &mut con); +} + +criterion_group!( + cluster_bench, + bench_cluster_setup, + // bench_cluster_read_from_replicas_setup +); criterion_main!(cluster_bench); diff --git a/src/cluster.rs b/src/cluster.rs index 6241c73b2..37a263786 100644 --- a/src/cluster.rs +++ b/src/cluster.rs @@ -61,7 +61,7 @@ use crate::cluster_pipeline::UNROUTABLE_ERROR; pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline}; use crate::cluster_routing::{Routable, RoutingInfo, Slot, SLOT_SIZE}; -type SlotMap = BTreeMap; +type SlotMap = BTreeMap; /// This is a connection of Redis cluster. pub struct ClusterConnection { @@ -69,7 +69,7 @@ pub struct ClusterConnection { connections: RefCell>, slots: RefCell, auto_reconnect: RefCell, - readonly: bool, + read_from_replicas: bool, username: Option, password: Option, read_timeout: RefCell>, @@ -96,13 +96,13 @@ impl TlsMode { impl ClusterConnection { pub(crate) fn new( initial_nodes: Vec, - readonly: bool, + read_from_replicas: bool, username: Option, password: Option, ) -> RedisResult { let connections = Self::create_initial_connections( &initial_nodes, - readonly, + read_from_replicas, username.clone(), password.clone(), )?; @@ -111,7 +111,7 @@ impl ClusterConnection { connections: RefCell::new(connections), slots: RefCell::new(SlotMap::new()), auto_reconnect: RefCell::new(true), - readonly, + read_from_replicas, username, password, read_timeout: RefCell::new(None), @@ -155,6 +155,14 @@ impl ClusterConnection { /// block indefinitely. It is an error to pass the zero `Duration` to this /// method. pub fn set_write_timeout(&self, dur: Option) -> RedisResult<()> { + // Check if duration is valid before updating local value. + if dur.is_some() && dur.unwrap().is_zero() { + return Err(RedisError::from(( + ErrorKind::InvalidClientConfig, + "Duration should be None or non-zero.", + ))); + } + let mut t = self.write_timeout.borrow_mut(); *t = dur; let connections = self.connections.borrow(); @@ -170,6 +178,14 @@ impl ClusterConnection { /// block indefinitely. It is an error to pass the zero `Duration` to this /// method. pub fn set_read_timeout(&self, dur: Option) -> RedisResult<()> { + // Check if duration is valid before updating local value. + if dur.is_some() && dur.unwrap().is_zero() { + return Err(RedisError::from(( + ErrorKind::InvalidClientConfig, + "Duration should be None or non-zero.", + ))); + } + let mut t = self.read_timeout.borrow_mut(); *t = dur; let connections = self.connections.borrow(); @@ -203,7 +219,7 @@ impl ClusterConnection { /// `BrokenPipe` error. fn create_initial_connections( initial_nodes: &[ConnectionInfo], - readonly: bool, + read_from_replicas: bool, username: Option, password: Option, ) -> RedisResult> { @@ -223,9 +239,12 @@ impl ClusterConnection { _ => panic!("No reach."), }; - if let Ok(mut conn) = - connect(info.clone(), readonly, username.clone(), password.clone()) - { + if let Ok(mut conn) = connect( + info.clone(), + read_from_replicas, + username.clone(), + password.clone(), + ) { if conn.check_connection() { connections.insert(addr, conn); break; @@ -245,58 +264,59 @@ impl ClusterConnection { // Query a node to discover slot-> master mappings. fn refresh_slots(&self) -> RedisResult<()> { let mut slots = self.slots.borrow_mut(); - *slots = if self.readonly { - let mut rng = thread_rng(); - self.create_new_slots(|slot_data| { - let replicas = slot_data.replicas(); - if replicas.is_empty() { - slot_data.master().to_string() - } else { - replicas.choose(&mut rng).unwrap().to_string() - } - })? - } else { - self.create_new_slots(|slot_data| slot_data.master().to_string())? - }; + *slots = self.create_new_slots(|slot_data| { + let replica = if !self.read_from_replicas || slot_data.replicas().is_empty() { + slot_data.master().to_string() + } else { + slot_data + .replicas() + .choose(&mut thread_rng()) + .unwrap() + .to_string() + }; + + [slot_data.master().to_string(), replica] + })?; + + let mut nodes = slots.values().flatten().collect::>(); + nodes.sort_unstable(); + nodes.dedup(); let mut connections = self.connections.borrow_mut(); - *connections = { - // Remove dead connections and connect to new nodes if necessary - let mut new_connections = HashMap::with_capacity(connections.len()); - - for addr in slots.values() { - if !new_connections.contains_key(addr) { - if connections.contains_key(addr) { - let mut conn = connections.remove(addr).unwrap(); - if conn.check_connection() { - new_connections.insert(addr.to_string(), conn); - continue; - } + *connections = nodes + .into_iter() + .filter_map(|addr| { + if connections.contains_key(addr) { + let mut conn = connections.remove(addr).unwrap(); + if conn.check_connection() { + return Some((addr.to_string(), conn)); } + } - if let Ok(mut conn) = connect( - addr.as_ref(), - self.readonly, - self.username.clone(), - self.password.clone(), - ) { - if conn.check_connection() { - conn.set_read_timeout(*self.read_timeout.borrow())?; - conn.set_write_timeout(*self.write_timeout.borrow())?; - new_connections.insert(addr.to_string(), conn); - } + if let Ok(mut conn) = connect( + addr.as_ref(), + self.read_from_replicas, + self.username.clone(), + self.password.clone(), + ) { + if conn.check_connection() { + conn.set_read_timeout(*self.read_timeout.borrow()).unwrap(); + conn.set_write_timeout(*self.write_timeout.borrow()) + .unwrap(); + return Some((addr.to_string(), conn)); } } - } - new_connections - }; + + None + }) + .collect(); Ok(()) } fn create_new_slots(&self, mut get_addr: F) -> RedisResult where - F: FnMut(&Slot) -> String, + F: FnMut(&Slot) -> [String; 2], { let mut connections = self.connections.borrow_mut(); let mut new_slots = None; @@ -323,7 +343,7 @@ impl ClusterConnection { Ok(slot_data.end() + 1) })?; - if usize::from(last_slot) != SLOT_SIZE { + if last_slot != SLOT_SIZE { return Err(RedisError::from(( ErrorKind::ResponseError, "Slot refresh error.", @@ -354,13 +374,14 @@ impl ClusterConnection { fn get_connection<'a>( &self, connections: &'a mut HashMap, - slot: u16, + route: (u16, usize), ) -> RedisResult<(String, &'a mut Connection)> { + let (slot, idx) = route; let slots = self.slots.borrow(); if let Some((_, addr)) = slots.range(&slot..).next() { Ok(( - addr.to_string(), - self.get_connection_by_addr(connections, addr)?, + addr[idx].clone(), + self.get_connection_by_addr(connections, &addr[idx])?, )) } else { // try a random node next. This is safe if slots are involved @@ -381,7 +402,7 @@ impl ClusterConnection { // TODO: error handling let conn = connect( addr, - self.readonly, + self.read_from_replicas, self.username.clone(), self.password.clone(), )?; @@ -412,9 +433,10 @@ impl ClusterConnection { T: MergeResults + std::fmt::Debug, F: FnMut(&mut Connection) -> RedisResult, { - let slot = match RoutingInfo::for_routable(cmd) { + let route = match RoutingInfo::for_routable(cmd) { Some(RoutingInfo::Random) => None, - Some(RoutingInfo::Slot(slot)) => Some(slot), + Some(RoutingInfo::MasterSlot(slot)) => Some((slot, 0)), + Some(RoutingInfo::ReplicaSlot(slot)) => Some((slot, 1)), Some(RoutingInfo::AllNodes) | Some(RoutingInfo::AllMasters) => { return self.execute_on_all_nodes(func); } @@ -439,10 +461,10 @@ impl ClusterConnection { is_asking = false; } (addr.to_string(), conn) - } else if !excludes.is_empty() || slot.is_none() { + } else if !excludes.is_empty() || route.is_none() { get_random_connection(&mut *connections, Some(&excludes)) } else { - self.get_connection(&mut *connections, slot.unwrap())? + self.get_connection(&mut *connections, route.unwrap())? }; (addr, func(conn)) }; @@ -484,7 +506,7 @@ impl ClusterConnection { } else if *self.auto_reconnect.borrow() && err.is_io_error() { let new_connections = Self::create_initial_connections( &self.initial_nodes, - self.readonly, + self.read_from_replicas, self.username.clone(), self.password.clone(), )?; @@ -552,20 +574,21 @@ impl ClusterConnection { fn get_addr_for_cmd(&self, cmd: &Cmd) -> RedisResult { let slots = self.slots.borrow(); - let addr_for_slot = |slot: u16| -> RedisResult { + let addr_for_slot = |slot: u16, idx: usize| -> RedisResult { let (_, addr) = slots .range(&slot..) .next() .ok_or((ErrorKind::ClusterDown, "Missing slot coverage"))?; - Ok(addr.to_string()) + Ok(addr[idx].clone()) }; match RoutingInfo::for_routable(cmd) { Some(RoutingInfo::Random) => { let mut rng = thread_rng(); - Ok(addr_for_slot(rng.gen_range(0..SLOT_SIZE) as u16)?) + Ok(addr_for_slot(rng.gen_range(0..SLOT_SIZE) as u16, 0)?) } - Some(RoutingInfo::Slot(slot)) => Ok(addr_for_slot(slot)?), + Some(RoutingInfo::MasterSlot(slot)) => Ok(addr_for_slot(slot, 0)?), + Some(RoutingInfo::ReplicaSlot(slot)) => Ok(addr_for_slot(slot, 1)?), _ => fail!(UNROUTABLE_ERROR), } } @@ -714,7 +737,7 @@ impl ConnectionLike for ClusterConnection { fn connect( info: T, - readonly: bool, + read_from_replicas: bool, username: Option, password: Option, ) -> RedisResult @@ -727,7 +750,8 @@ where let client = super::Client::open(connection_info)?; let mut con = client.get_connection()?; - if readonly { + if read_from_replicas { + // If READONLY is sent to primary nodes, it will have no effect cmd("READONLY").query(&mut con)?; } Ok(con) diff --git a/src/cluster_client.rs b/src/cluster_client.rs index d56cbff68..a6962f205 100644 --- a/src/cluster_client.rs +++ b/src/cluster_client.rs @@ -7,7 +7,7 @@ use super::{ /// Used to configure and build a [ClusterClient](ClusterClient). pub struct ClusterClientBuilder { initial_nodes: RedisResult>, - readonly: bool, + read_from_replicas: bool, username: Option, password: Option, } @@ -20,7 +20,7 @@ impl ClusterClientBuilder { .into_iter() .map(|x| x.into_connection_info()) .collect(), - readonly: false, + read_from_replicas: false, username: None, password: None, } @@ -50,11 +50,19 @@ impl ClusterClientBuilder { self } - /// Set read only mode for new ClusterClient (default is false). - /// If readonly is true, all queries will go to replica nodes. If there are no replica nodes, - /// queries will be issued to the primary nodes. - pub fn readonly(mut self, readonly: bool) -> ClusterClientBuilder { - self.readonly = readonly; + /// Enable read from replicas for new ClusterClient (default is false). + /// + /// If True, then read queries will go to the replica nodes & write queries will go to the + /// primary nodes. If there are no replica nodes, then all queries will go to the primary nodes. + pub fn read_from_replicas(mut self) -> ClusterClientBuilder { + self.read_from_replicas = true; + self + } + + /// Use `read_from_replicas()`. + #[deprecated(since = "0.22.0", note = "Use read_from_replicas()")] + pub fn readonly(mut self, read_from_replicas: bool) -> ClusterClientBuilder { + self.read_from_replicas = read_from_replicas; self } } @@ -62,7 +70,7 @@ impl ClusterClientBuilder { /// This is a Redis cluster client. pub struct ClusterClient { initial_nodes: Vec, - readonly: bool, + read_from_replicas: bool, username: Option, password: Option, } @@ -89,7 +97,7 @@ impl ClusterClient { pub fn get_connection(&self) -> RedisResult { ClusterConnection::new( self.initial_nodes.clone(), - self.readonly, + self.read_from_replicas, self.username.clone(), self.password.clone(), ) @@ -134,7 +142,7 @@ impl ClusterClient { Ok(ClusterClient { initial_nodes: nodes, - readonly: builder.readonly, + read_from_replicas: builder.read_from_replicas, username: builder.username.or(connection_info_username), password: builder.password.or(connection_info_password), }) diff --git a/src/cluster_routing.rs b/src/cluster_routing.rs index 504ba6ff2..c8a9c59b2 100644 --- a/src/cluster_routing.rs +++ b/src/cluster_routing.rs @@ -1,16 +1,18 @@ use std::iter::Iterator; use crate::cmd::{Arg, Cmd}; +use crate::commands::is_readonly_cmd; use crate::types::Value; -pub(crate) const SLOT_SIZE: usize = 16384; +pub(crate) const SLOT_SIZE: u16 = 16384; #[derive(Debug, Clone, Copy, PartialEq)] pub(crate) enum RoutingInfo { AllNodes, AllMasters, Random, - Slot(u16), + MasterSlot(u16), + ReplicaSlot(u16), } impl RoutingInfo { @@ -18,7 +20,8 @@ impl RoutingInfo { where R: Routable + ?Sized, { - match &r.command()?[..] { + let cmd = &r.command()?[..]; + match cmd { b"FLUSHALL" | b"FLUSHDB" | b"SCRIPT" => Some(RoutingInfo::AllMasters), b"ECHO" | b"CONFIG" | b"CLIENT" | b"SLOWLOG" | b"DBSIZE" | b"LASTSAVE" | b"PING" | b"INFO" | b"BGREWRITEAOF" | b"BGSAVE" | b"CLIENT LIST" | b"SAVE" | b"TIME" @@ -33,30 +36,34 @@ impl RoutingInfo { if key_count == 0 { Some(RoutingInfo::Random) } else { - r.arg_idx(3).and_then(RoutingInfo::for_key) + r.arg_idx(3).and_then(|key| RoutingInfo::for_key(cmd, key)) } } - b"XGROUP" | b"XINFO" => r.arg_idx(2).and_then(RoutingInfo::for_key), + b"XGROUP" | b"XINFO" => r.arg_idx(2).and_then(|key| RoutingInfo::for_key(cmd, key)), b"XREAD" | b"XREADGROUP" => { let streams_position = r.position(b"STREAMS")?; r.arg_idx(streams_position + 1) - .and_then(RoutingInfo::for_key) + .and_then(|key| RoutingInfo::for_key(cmd, key)) } _ => match r.arg_idx(1) { - Some(key) => RoutingInfo::for_key(key), + Some(key) => RoutingInfo::for_key(cmd, key), None => Some(RoutingInfo::Random), }, } } - pub fn for_key(key: &[u8]) -> Option { + pub fn for_key(cmd: &[u8], key: &[u8]) -> Option { let key = match get_hashtag(key) { Some(tag) => tag, None => key, }; - Some(RoutingInfo::Slot( - crc16::State::::calculate(key) % SLOT_SIZE as u16, - )) + + let slot = crc16::State::::calculate(key) % SLOT_SIZE; + if is_readonly_cmd(cmd) { + Some(RoutingInfo::ReplicaSlot(slot)) + } else { + Some(RoutingInfo::MasterSlot(slot)) + } } } @@ -139,7 +146,6 @@ impl Slot { &self.master } - #[allow(dead_code)] pub fn replicas(&self) -> &Vec { &self.replicas } diff --git a/src/commands.rs b/src/commands.rs index e4bd051c9..8311614a2 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -17,6 +17,39 @@ use crate::streams; #[cfg(feature = "acl")] use crate::acl; +#[cfg(feature = "cluster")] +pub(crate) fn is_readonly_cmd(cmd: &[u8]) -> bool { + matches!( + cmd, + // @admin + b"LASTSAVE" | + // @bitmap + b"BITCOUNT" | b"BITFIELD_RO" | b"BITPOS" | b"GETBIT" | + // @connection + b"CLIENT" | b"ECHO" | + // @geo + b"GEODIST" | b"GEOHASH" | b"GEOPOS" | b"GEORADIUSBYMEMBER_RO" | b"GEORADIUS_RO" | b"GEOSEARCH" | + // @hash + b"HEXISTS" | b"HGET" | b"HGETALL" | b"HKEYS" | b"HLEN" | b"HMGET" | b"HRANDFIELD" | b"HSCAN" | b"HSTRLEN" | b"HVALS" | + // @hyperloglog + b"PFCOUNT" | + // @keyspace + b"DBSIZE" | b"DUMP" | b"EXISTS" | b"EXPIRETIME" | b"KEYS" | b"OBJECT" | b"PEXPIRETIME" | b"PTTL" | b"RANDOMKEY" | b"SCAN" | b"TOUCH" | b"TTL" | b"TYPE" | + // @list + b"LINDEX" | b"LLEN" | b"LPOS" | b"LRANGE" | b"SORT_RO" | + // @scripting + b"EVALSHA_RO" | b"EVAL_RO" | b"FCALL_RO" | + // @set + b"SCARD" | b"SDIFF" | b"SINTER" | b"SINTERCARD" | b"SISMEMBER" | b"SMEMBERS" | b"SMISMEMBER" | b"SRANDMEMBER" | b"SSCAN" | b"SUNION" | + // @sortedset + b"ZCARD" | b"ZCOUNT" | b"ZDIFF" | b"ZINTER" | b"ZINTERCARD" | b"ZLEXCOUNT" | b"ZMSCORE" | b"ZRANDMEMBER" | b"ZRANGE" | b"ZRANGEBYLEX" | b"ZRANGEBYSCORE" | b"ZRANK" | b"ZREVRANGE" | b"ZREVRANGEBYLEX" | b"ZREVRANGEBYSCORE" | b"ZREVRANK" | b"ZSCAN" | b"ZSCORE" | b"ZUNION" | + // @stream + b"XINFO" | b"XLEN" | b"XPENDING" | b"XRANGE" | b"XREAD" | b"XREVRANGE" | + // @string + b"GET" | b"GETRANGE" | b"LCS" | b"MGET" | b"STRALGO" | b"STRLEN" | b"SUBSTR" + ) +} + macro_rules! implement_commands { ( $lifetime: lifetime diff --git a/tests/test_cluster.rs b/tests/test_cluster.rs index 2d00eaf31..6704ec4d8 100644 --- a/tests/test_cluster.rs +++ b/tests/test_cluster.rs @@ -56,19 +56,20 @@ fn test_cluster_with_bad_password() { } #[test] -fn test_cluster_readonly() { - let cluster = - TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| builder.readonly(true)); +fn test_cluster_read_from_replicas() { + let cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| { + builder.read_from_replicas() + }); let mut con = cluster.connection(); - // con is a READONLY replica, so we'll get the MOVED response and will be redirected - // to the master + // Write commands would go to the primary nodes redis::cmd("SET") .arg("{x}key1") .arg(b"foo") .execute(&mut con); redis::cmd("SET").arg(&["{x}key2", "bar"]).execute(&mut con); + // Read commands would go to the replica nodes assert_eq!( redis::cmd("MGET") .arg(&["{x}key1", "{x}key2"])