From 99681535b6113f9cb39b342da9c0ba631618683d Mon Sep 17 00:00:00 2001 From: Utkarsh Gupta Date: Thu, 28 Jul 2022 07:09:32 +0530 Subject: [PATCH] cluster: add Async MultiplexedClusterConnection --- redis/benches/bench_cluster.rs | 144 +++++++++++++++- redis/src/aio.rs | 51 +++--- redis/src/cluster.rs | 20 ++- redis/src/cluster_aio.rs | 295 +++++++++++++++++++++++++++++++++ redis/src/cluster_client.rs | 48 ++++++ redis/src/lib.rs | 3 + 6 files changed, 530 insertions(+), 31 deletions(-) create mode 100644 redis/src/cluster_aio.rs diff --git a/redis/benches/bench_cluster.rs b/redis/benches/bench_cluster.rs index 9717f8366..e78cf951a 100644 --- a/redis/benches/bench_cluster.rs +++ b/redis/benches/bench_cluster.rs @@ -1,14 +1,16 @@ #![allow(clippy::unit_arg)] // want to allow this for `black_box()` #![cfg(feature = "cluster")] use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; +use futures::{prelude::*, stream}; use redis::cluster::cluster_pipe; +use redis::RedisError; use support::*; #[path = "../tests/support/mod.rs"] mod support; -const PIPELINE_QUERIES: usize = 100; +const PIPELINE_QUERIES: usize = 1_000; fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection) { let key = "test_key"; @@ -76,6 +78,123 @@ fn bench_pipeline(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection group.finish(); } +fn bench_set_get_and_del_async(c: &mut Criterion, client: &mut redis::cluster::ClusterClient) { + let key = "test_key"; + + let mut group = c.benchmark_group("cluster_async_basic"); + let runtime = current_thread_runtime(); + let con = client.get_async_connection(); + let mut con = runtime.block_on(con).unwrap(); + + group.bench_function("set", |b| { + b.iter(|| { + runtime + .block_on(async { + redis::cmd("SET") + .arg(key) + .arg(42) + .query_async(&mut con) + .await?; + Ok::<(), RedisError>(()) + }) + .unwrap() + }) + }); + + group.bench_function("get", |b| { + b.iter(|| { + runtime + .block_on(async { + redis::cmd("GET").arg(key).query_async(&mut con).await?; + Ok::<(), RedisError>(()) + }) + .unwrap() + }) + }); + + group.bench_function("set_and_del", |b| { + b.iter(|| { + runtime + .block_on(async { + redis::cmd("SET") + .arg(key) + .arg(42) + .query_async(&mut con) + .await?; + redis::cmd("DEL").arg(key).query_async(&mut con).await?; + Ok::<(), RedisError>(()) + }) + .unwrap() + }) + }); + + group.finish(); +} + +fn bench_pipeline_async(c: &mut Criterion, client: &mut redis::cluster::ClusterClient) { + let mut group = c.benchmark_group("cluster_async_pipeline"); + let runtime = current_thread_runtime(); + let con = client.get_async_connection(); + let mut con = runtime.block_on(con).unwrap(); + + group.throughput(Throughput::Elements(PIPELINE_QUERIES as u64)); + + let mut queries = Vec::new(); + for i in 0..PIPELINE_QUERIES { + queries.push(format!("foo{}", i)); + } + + let build_pipeline = || { + let mut pipe = cluster_pipe(); + for q in &queries { + pipe.set(q, "bar").ignore(); + } + }; + group.bench_function("build_pipeline", |b| { + b.iter(|| { + build_pipeline(); + black_box(()) + }) + }); + + let mut pipe = cluster_pipe(); + for q in &queries { + pipe.set(q, "bar").ignore(); + } + group.bench_function("query_pipeline", |b| { + b.iter(|| { + runtime + .block_on(async { pipe.query_async::<_, ()>(&mut con).await }) + .unwrap(); + }) + }); + + let cmds: Vec<_> = (0..PIPELINE_QUERIES) + .map(|i| redis::cmd("SET").arg(format!("foo{}", i)).arg(i).clone()) + .collect(); + + let mut connections = (0..PIPELINE_QUERIES) + .map(|_| con.clone()) + .collect::>(); + + group.bench_function("query_implicit_pipeline", |b| { + b.iter(|| { + runtime + .block_on(async { + cmds.iter() + .zip(&mut connections) + .map(|(cmd, con)| cmd.query_async(con)) + .collect::>() + .try_for_each(|()| async { Ok(()) }) + .await + }) + .unwrap(); + }) + }); + + group.finish(); +} + fn bench_cluster_setup(c: &mut Criterion) { let cluster = TestClusterContext::new(6, 1); cluster.wait_for_cluster_up(); @@ -85,6 +204,14 @@ fn bench_cluster_setup(c: &mut Criterion) { bench_pipeline(c, &mut con); } +fn bench_cluster_async_setup(c: &mut Criterion) { + let mut cluster = TestClusterContext::new(6, 1); + cluster.wait_for_cluster_up(); + + bench_set_get_and_del_async(c, &mut cluster.client); + bench_pipeline_async(c, &mut cluster.client); +} + #[allow(dead_code)] fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) { let cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| { @@ -97,9 +224,22 @@ fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) { bench_pipeline(c, &mut con); } +#[allow(dead_code)] +fn bench_cluster_async_read_from_replicas_setup(c: &mut Criterion) { + let mut cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| { + builder.read_from_replicas() + }); + cluster.wait_for_cluster_up(); + + bench_set_get_and_del_async(c, &mut cluster.client); + bench_pipeline_async(c, &mut cluster.client); +} + criterion_group!( cluster_bench, bench_cluster_setup, - // bench_cluster_read_from_replicas_setup + // bench_cluster_read_from_replicas_setup, + bench_cluster_async_setup, + // bench_cluster_async_read_from_replicas_setup, ); criterion_main!(cluster_bench); diff --git a/redis/src/aio.rs b/redis/src/aio.rs index bae113019..3e1eaf780 100644 --- a/redis/src/aio.rs +++ b/redis/src/aio.rs @@ -77,7 +77,7 @@ pub(crate) trait RedisRuntime: AsyncStream + Send + Sync + Sized + 'static { } } -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub(crate) enum Runtime { #[cfg(feature = "tokio-comp")] Tokio, @@ -85,6 +85,12 @@ pub(crate) enum Runtime { AsyncStd, } +impl Default for Runtime { + fn default() -> Self { + Self::locate() + } +} + impl Runtime { pub(crate) fn locate() -> Self { #[cfg(all(feature = "tokio-comp", not(feature = "async-std-comp")))] @@ -112,8 +118,7 @@ impl Runtime { } } - #[allow(dead_code)] - fn spawn(&self, f: impl Future + Send + 'static) { + pub(crate) fn spawn(&self, f: impl Future + Send + 'static) { match self { #[cfg(feature = "tokio-comp")] Runtime::Tokio => tokio::Tokio::spawn(f), @@ -645,22 +650,10 @@ struct InFlight { } // A single message sent through the pipeline -struct PipelineMessage { - input: S, - output: PipelineOutput, - response_count: usize, -} - -/// Wrapper around a `Stream + Sink` where each item sent through the `Sink` results in one or more -/// items being output by the `Stream` (the number is specified at time of sending). With the -/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream` -/// and `Sink`. -struct Pipeline(mpsc::Sender>); - -impl Clone for Pipeline { - fn clone(&self) -> Self { - Pipeline(self.0.clone()) - } +pub(crate) struct PipelineMessage { + pub(crate) input: S, + pub(crate) output: PipelineOutput, + pub(crate) response_count: usize, } impl Debug for Pipeline @@ -833,6 +826,20 @@ where } } +/// Wrapper around a `Stream + Sink` where each item sent through the `Sink` results in one or more +/// items being output by the `Stream` (the number is specified at time of sending). With the +/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream` +/// and `Sink`. +pub(crate) struct Pipeline( + pub(crate) mpsc::Sender>, +); + +impl Clone for Pipeline { + fn clone(&self) -> Self { + Pipeline(self.0.clone()) + } +} + impl Pipeline where SinkItem: Send + 'static, @@ -894,7 +901,7 @@ where /// on the same underlying connection (tcp/unix socket). #[derive(Clone)] pub struct MultiplexedConnection { - pipeline: Pipeline, Value, RedisError>, + pub(crate) pipeline: Pipeline, Value, RedisError>, db: i64, } @@ -951,6 +958,10 @@ impl MultiplexedConnection { }; Ok((con, driver)) } + + pub(crate) async fn check_connection(&mut self) -> bool { + cmd("PING").query_async::<_, String>(self).await.is_ok() + } } impl ConnectionLike for MultiplexedConnection { diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 797f38306..2e10f4983 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -82,13 +82,15 @@ use crate::parser::parse_redis_value; use crate::pipeline::Pipeline; use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, Value}; +#[cfg(feature = "aio")] +pub use crate::cluster_aio::MultiplexedClusterConnection; pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder}; #[doc(no_inline)] pub use crate::cmd::pipe as cluster_pipe; #[doc(no_inline)] pub use crate::pipeline::Pipeline as ClusterPipeline; -type SlotMap = BTreeMap; +pub(crate) type SlotMap = BTreeMap; /// This is a connection of Redis cluster. #[derive(Default)] @@ -510,7 +512,7 @@ impl ConnectionLike for ClusterConnection { } // MergeResults trait. -trait MergeResults { +pub(crate) trait MergeResults { fn merge_results(values: HashMap) -> Self where Self: Sized; @@ -536,15 +538,15 @@ impl MergeResults for Vec { } // NodeCmd struct. -struct NodeCmd { - node: String, +pub(crate) struct NodeCmd { + pub(crate) node: String, // The original command indexes - indexes: Vec, - pipe: Vec, + pub(crate) indexes: Vec, + pub(crate) pipe: Vec, } impl NodeCmd { - fn new(node: &str) -> NodeCmd { + pub(crate) fn new(node: &str) -> NodeCmd { NodeCmd { node: node.to_string(), indexes: vec![], @@ -554,7 +556,7 @@ impl NodeCmd { } // Parse `CLUSTER SLOTS` response into SlotMap. -fn parse_slots_response( +pub(crate) fn parse_slots_response( response: Vec, cluster_params: &ClusterParams, ) -> RedisResult { @@ -665,7 +667,7 @@ fn parse_slots_response( .collect()) } -fn get_connection_info(node: &str, cluster_params: &ClusterParams) -> ConnectionInfo { +pub(crate) fn get_connection_info(node: &str, cluster_params: &ClusterParams) -> ConnectionInfo { ConnectionInfo { addr: get_connection_addr(parse_node_str(node), cluster_params.tls_insecure), redis: RedisConnectionInfo { diff --git a/redis/src/cluster_aio.rs b/redis/src/cluster_aio.rs new file mode 100644 index 000000000..e0f703c0c --- /dev/null +++ b/redis/src/cluster_aio.rs @@ -0,0 +1,295 @@ +use std::io; +use std::iter::Iterator; +use std::sync::Arc; + +use futures_util::future::{self, FutureExt}; +use futures_util::stream::{FuturesUnordered, StreamExt}; +use rand::seq::IteratorRandom; +use rand::thread_rng; +use tokio::sync::oneshot; + +use crate::aio::{ConnectionLike, MultiplexedConnection, PipelineMessage, Runtime}; +use crate::cluster::{get_connection_info, parse_slots_response, NodeCmd, SlotMap}; +use crate::cluster_client::ClusterParams; +use crate::cluster_routing::{is_illegal_cluster_pipeline_cmd, Routable, UNROUTABLE_ERROR}; +use crate::cmd::{cmd, Cmd}; +use crate::connection::ConnectionInfo; +use crate::pipeline::Pipeline; +use crate::types::{ErrorKind, HashMap, RedisError, RedisFuture, RedisResult, Value}; + +// Cluster related logic. +#[derive(Default)] +struct NodesManager { + runtime: Runtime, + cluster_params: ClusterParams, + initial_nodes: Vec, + + connections: HashMap, + slots: SlotMap, +} + +impl NodesManager { + pub(crate) async fn new( + runtime: Runtime, + cluster_params: &ClusterParams, + initial_nodes: &[ConnectionInfo], + ) -> RedisResult { + let mut nodes_manager = NodesManager { + runtime, + cluster_params: cluster_params.clone(), + initial_nodes: initial_nodes.to_vec(), + ..Default::default() + }; + nodes_manager.create_initial_connections().await?; + + Ok(nodes_manager) + } + + async fn create_initial_connections(&mut self) -> RedisResult<()> { + let nodes = self + .initial_nodes + .iter() + .map(|info| info.addr.to_string()) + .collect::>(); + + self.refresh_connections(nodes).await; + + if self.connections.is_empty() { + return Err(RedisError::from(( + ErrorKind::IoError, + "It failed to check startup nodes.", + ))); + } + + self.refresh_slots().await?; + Ok(()) + } + + async fn refresh_connections(&mut self, nodes: Vec) { + for node in nodes { + let conn = if let Some(conn) = self.connections.get_mut(&node) { + conn + } else if self.connect(&node).await.is_ok() { + self.connections.get_mut(&node).unwrap() + } else { + continue; + }; + + if !conn.check_connection().await { + self.connections.remove(&node); + } + } + } + + // Query a node to discover slot -> master mappings. + async fn refresh_slots(&mut self) -> RedisResult<()> { + let mut cmd = Cmd::new(); + cmd.arg("CLUSTER").arg("SLOTS"); + + let len = self.connections.len(); + let samples = self + .connections + .values_mut() + .choose_multiple(&mut thread_rng(), len); + for conn in samples { + if let Ok(Value::Bulk(response)) = cmd.query_async(conn).await { + let slots = parse_slots_response(response, &self.cluster_params)?; + + let mut nodes = slots.values().flatten().cloned().collect::>(); + nodes.sort_unstable(); + nodes.dedup(); + self.refresh_connections(nodes).await; + + self.slots = slots; + return Ok(()); + } + } + + Err(RedisError::from(( + ErrorKind::ResponseError, + "Slot refresh error.", + "didn't get any slots from server".to_string(), + ))) + } + + async fn connect(&mut self, node: &str) -> RedisResult<()> { + let cluster_params = &self.cluster_params; + let info = get_connection_info(node, cluster_params); + + let mut conn = self.runtime.get_multiplexed_connection(&info).await?; + + if cluster_params.read_from_replicas { + // If READONLY is sent to primary nodes, it will have no effect + cmd("READONLY").query_async(&mut conn).await?; + } + self.connections.insert(node.to_string(), conn); + Ok(()) + } + + fn get_connection_for_node(&self, node: &str) -> RedisResult<&MultiplexedConnection> { + self.connections + .get(node) + .ok_or_else(|| RedisError::from(UNROUTABLE_ERROR)) + } + + #[allow(dead_code)] + fn get_connection_for_route(&self, route: (u16, usize)) -> RedisResult<&MultiplexedConnection> { + let node = self.get_node_for_route(route); + self.get_connection_for_node(node) + } + + fn get_node_for_route(&self, route: (u16, usize)) -> &str { + let (slot, idx) = route; + &self.slots.range(&slot..).next().unwrap().1[idx] + } + + fn map_cmds_to_nodes(&self, cmds: &[Cmd]) -> RedisResult> { + let mut cmd_map: HashMap = HashMap::with_capacity(self.slots.len()); + + for (idx, cmd) in cmds.iter().enumerate() { + let node = match cmd.route()? { + (Some(slot), idx) => self.get_node_for_route((slot, idx)), + (None, _idx) => unreachable!(), + }; + + let nc = if let Some(nc) = cmd_map.get_mut(node) { + nc + } else { + cmd_map + .entry(node.to_string()) + .or_insert_with(|| NodeCmd::new(node)) + }; + + nc.indexes.push(idx); + cmd.write_packed_command(&mut nc.pipe); + } + + // Workaround for https://github.com/tkaitchuck/aHash/issues/118 + Ok(cmd_map.into_iter().map(|(_k, v)| v).collect()) + } +} + +/// A multiplexed Redis cluster connection, which can be cloned, allowing multiple requests to be +/// sent concurrently over the same underlying [connections](MultiplexedConnection). +#[derive(Clone)] +pub struct MultiplexedClusterConnection { + nodes_manager: Arc, +} + +impl MultiplexedClusterConnection { + pub(crate) async fn new( + runtime: Runtime, + cluster_params: &ClusterParams, + initial_nodes: &[ConnectionInfo], + ) -> RedisResult { + let nodes_manager = NodesManager::new(runtime, cluster_params, initial_nodes).await?; + + Ok(MultiplexedClusterConnection { + nodes_manager: Arc::new(nodes_manager), + }) + } + + async fn execute_pipeline(&self, nc: NodeCmd) -> RedisResult> { + let conn = self.nodes_manager.get_connection_for_node(&nc.node)?; + + let (sender, receiver) = oneshot::channel(); + + conn.pipeline + .0 + .send(PipelineMessage { + input: nc.pipe, + output: sender, + response_count: nc.indexes.len(), + }) + .await + .map_err(|_| broken_pipe())?; + + Ok(nc + .indexes + .into_iter() + .zip(receiver.await.map_err(|_| broken_pipe())??.into_iter()) + .collect()) + } +} + +// ConnectionLike impl for MultiplexedClusterConnection. +impl ConnectionLike for MultiplexedClusterConnection { + fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> { + let route = match cmd.route() { + Ok(route) => match route { + (Some(slot), idx) => (slot, idx), + (None, _idx) => return future::err(RedisError::from(UNROUTABLE_ERROR)).boxed(), + }, + Err(err) => return future::err(err).boxed(), + }; + + let node = self.nodes_manager.get_node_for_route(route); + let mut nc = NodeCmd::new(node); + nc.indexes.push(0); + cmd.write_packed_command(&mut nc.pipe); + + self.execute_pipeline(nc) + .map(|res| res.map(|mut val| val.pop().unwrap().1)) + .boxed() + } + + fn req_packed_commands<'a>( + &'a mut self, + pipeline: &'a Pipeline, + _offset: usize, + _count: usize, + ) -> RedisFuture<'a, Vec> { + for cmd in pipeline.commands() { + let cmd_name = std::str::from_utf8(cmd.arg_idx(0).unwrap_or(b"")) + .unwrap_or("") + .trim() + .to_ascii_uppercase(); + + if is_illegal_cluster_pipeline_cmd(&cmd_name) { + return future::err(RedisError::from(( + UNROUTABLE_ERROR.0, + UNROUTABLE_ERROR.1, + format!( + "Command '{}' can't be executed in a cluster pipeline.", + cmd_name + ), + ))) + .boxed(); + } + } + + if let Ok(ncs) = self.nodes_manager.map_cmds_to_nodes(pipeline.commands()) { + (async move { + let mut futs = ncs + .into_iter() + .map(|nc| self.execute_pipeline(nc)) + .collect::>(); + + let mut ret = vec![Value::Nil; pipeline.commands().len()]; + while let Some(res) = futs.next().await { + if let Ok(items) = res { + for (idx, val) in items { + ret[idx] = val; + } + } + } + Ok(ret) + }) + .boxed() + } else { + future::err(broken_pipe()).boxed() + } + } + + fn get_db(&self) -> i64 { + 0 + } + + fn supports_transactions(&self) -> bool { + false + } +} + +fn broken_pipe() -> RedisError { + RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)) +} diff --git a/redis/src/cluster_client.rs b/redis/src/cluster_client.rs index 03212a573..ac19dbe0d 100644 --- a/redis/src/cluster_client.rs +++ b/redis/src/cluster_client.rs @@ -3,6 +3,8 @@ use std::time::Duration; use crate::cluster::ClusterConnection; use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo}; use crate::types::{ErrorKind, RedisError, RedisResult}; +#[cfg(feature = "aio")] +use crate::{aio::Runtime, cluster_aio::MultiplexedClusterConnection}; /// Redis cluster specific parameters. #[derive(Default, Clone)] @@ -146,6 +148,8 @@ impl ClusterClientBuilder { /// /// If the value is `None`, then `get_connection` call may block indefinitely. /// Passing Some(Duration::ZERO) to this method will result in an error. + /// + /// Note: this param isn't used for async connections. pub fn connect_timeout(mut self, dur: Option) -> RedisResult { // Check if duration is valid before updating local value. if dur.is_some() && dur.unwrap().is_zero() { @@ -163,6 +167,8 @@ impl ClusterClientBuilder { /// /// If the value is `None`, then `send_packed_command` call may block indefinitely. /// Passing Some(Duration::ZERO) to this method will result in an error. + /// + /// Note: this param isn't used for async connections. pub fn write_timeout(mut self, dur: Option) -> RedisResult { // Check if duration is valid before updating local value. if dur.is_some() && dur.unwrap().is_zero() { @@ -180,6 +186,8 @@ impl ClusterClientBuilder { /// /// If the value is `None`, then `recv_response` call may block indefinitely. /// Passing Some(Duration::ZERO) to this method will result in an error. + /// + /// Note: this param isn't used for async connections. pub fn read_timeout(mut self, dur: Option) -> RedisResult { // Check if duration is valid before updating local value. if dur.is_some() && dur.unwrap().is_zero() { @@ -281,6 +289,46 @@ impl ClusterClient { } } +#[cfg(feature = "aio")] +#[cfg_attr(docsrs, doc(cfg(feature = "aio")))] +impl ClusterClient { + /// Return an async multiplexed cluster connection from the client. + #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] + #[cfg_attr( + docsrs, + doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp"))) + )] + pub async fn get_async_connection(&self) -> RedisResult { + match Runtime::locate() { + #[cfg(feature = "tokio-comp")] + Runtime::Tokio => self.get_tokio_connection().await, + #[cfg(feature = "async-std-comp")] + Runtime::AsyncStd => self.get_async_std_connection().await, + } + } + + /// Return a tokio multiplexed cluster connection from the client. + #[cfg(feature = "tokio-comp")] + #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))] + pub async fn get_tokio_connection(&self) -> RedisResult { + self.get_async_connection_inner(Runtime::Tokio).await + } + + /// Return an async-std multiplexed cluster connection from the client. + #[cfg(feature = "async-std-comp")] + #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))] + pub async fn get_async_std_connection(&self) -> RedisResult { + self.get_async_connection_inner(Runtime::AsyncStd).await + } + + async fn get_async_connection_inner( + &self, + runtime: Runtime, + ) -> RedisResult { + MultiplexedClusterConnection::new(runtime, &self.cluster_params, &self.initial_nodes).await + } +} + #[cfg(test)] mod tests { use super::{ClusterClient, ClusterClientBuilder, ConnectionInfo, IntoConnectionInfo}; diff --git a/redis/src/lib.rs b/redis/src/lib.rs index d2fe606b3..27c96d5ae 100644 --- a/redis/src/lib.rs +++ b/redis/src/lib.rs @@ -431,6 +431,9 @@ mod cluster_client; #[cfg(feature = "cluster")] mod cluster_routing; +#[cfg(all(feature = "cluster", feature = "aio"))] +mod cluster_aio; + #[cfg(feature = "r2d2")] #[cfg_attr(docsrs, doc(cfg(feature = "r2d2")))] mod r2d2;