Skip to content

Commit

Permalink
cluster: add Async MultiplexedClusterConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
utkarshgupta137 committed Aug 23, 2022
1 parent 7d5aca7 commit 9968153
Show file tree
Hide file tree
Showing 6 changed files with 530 additions and 31 deletions.
144 changes: 142 additions & 2 deletions redis/benches/bench_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#![allow(clippy::unit_arg)] // want to allow this for `black_box()`
#![cfg(feature = "cluster")]
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::{prelude::*, stream};
use redis::cluster::cluster_pipe;
use redis::RedisError;

use support::*;

#[path = "../tests/support/mod.rs"]
mod support;

const PIPELINE_QUERIES: usize = 100;
const PIPELINE_QUERIES: usize = 1_000;

fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection) {
let key = "test_key";
Expand Down Expand Up @@ -76,6 +78,123 @@ fn bench_pipeline(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection
group.finish();
}

fn bench_set_get_and_del_async(c: &mut Criterion, client: &mut redis::cluster::ClusterClient) {
let key = "test_key";

let mut group = c.benchmark_group("cluster_async_basic");
let runtime = current_thread_runtime();
let con = client.get_async_connection();
let mut con = runtime.block_on(con).unwrap();

group.bench_function("set", |b| {
b.iter(|| {
runtime
.block_on(async {
redis::cmd("SET")
.arg(key)
.arg(42)
.query_async(&mut con)
.await?;
Ok::<(), RedisError>(())
})
.unwrap()
})
});

group.bench_function("get", |b| {
b.iter(|| {
runtime
.block_on(async {
redis::cmd("GET").arg(key).query_async(&mut con).await?;
Ok::<(), RedisError>(())
})
.unwrap()
})
});

group.bench_function("set_and_del", |b| {
b.iter(|| {
runtime
.block_on(async {
redis::cmd("SET")
.arg(key)
.arg(42)
.query_async(&mut con)
.await?;
redis::cmd("DEL").arg(key).query_async(&mut con).await?;
Ok::<(), RedisError>(())
})
.unwrap()
})
});

group.finish();
}

fn bench_pipeline_async(c: &mut Criterion, client: &mut redis::cluster::ClusterClient) {
let mut group = c.benchmark_group("cluster_async_pipeline");
let runtime = current_thread_runtime();
let con = client.get_async_connection();
let mut con = runtime.block_on(con).unwrap();

group.throughput(Throughput::Elements(PIPELINE_QUERIES as u64));

let mut queries = Vec::new();
for i in 0..PIPELINE_QUERIES {
queries.push(format!("foo{}", i));
}

let build_pipeline = || {
let mut pipe = cluster_pipe();
for q in &queries {
pipe.set(q, "bar").ignore();
}
};
group.bench_function("build_pipeline", |b| {
b.iter(|| {
build_pipeline();
black_box(())
})
});

let mut pipe = cluster_pipe();
for q in &queries {
pipe.set(q, "bar").ignore();
}
group.bench_function("query_pipeline", |b| {
b.iter(|| {
runtime
.block_on(async { pipe.query_async::<_, ()>(&mut con).await })
.unwrap();
})
});

let cmds: Vec<_> = (0..PIPELINE_QUERIES)
.map(|i| redis::cmd("SET").arg(format!("foo{}", i)).arg(i).clone())
.collect();

let mut connections = (0..PIPELINE_QUERIES)
.map(|_| con.clone())
.collect::<Vec<_>>();

group.bench_function("query_implicit_pipeline", |b| {
b.iter(|| {
runtime
.block_on(async {
cmds.iter()
.zip(&mut connections)
.map(|(cmd, con)| cmd.query_async(con))
.collect::<stream::FuturesUnordered<_>>()
.try_for_each(|()| async { Ok(()) })
.await
})
.unwrap();
})
});

group.finish();
}

fn bench_cluster_setup(c: &mut Criterion) {
let cluster = TestClusterContext::new(6, 1);
cluster.wait_for_cluster_up();
Expand All @@ -85,6 +204,14 @@ fn bench_cluster_setup(c: &mut Criterion) {
bench_pipeline(c, &mut con);
}

fn bench_cluster_async_setup(c: &mut Criterion) {
let mut cluster = TestClusterContext::new(6, 1);
cluster.wait_for_cluster_up();

bench_set_get_and_del_async(c, &mut cluster.client);
bench_pipeline_async(c, &mut cluster.client);
}

#[allow(dead_code)]
fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) {
let cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| {
Expand All @@ -97,9 +224,22 @@ fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) {
bench_pipeline(c, &mut con);
}

#[allow(dead_code)]
fn bench_cluster_async_read_from_replicas_setup(c: &mut Criterion) {
let mut cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| {
builder.read_from_replicas()
});
cluster.wait_for_cluster_up();

bench_set_get_and_del_async(c, &mut cluster.client);
bench_pipeline_async(c, &mut cluster.client);
}

criterion_group!(
cluster_bench,
bench_cluster_setup,
// bench_cluster_read_from_replicas_setup
// bench_cluster_read_from_replicas_setup,
bench_cluster_async_setup,
// bench_cluster_async_read_from_replicas_setup,
);
criterion_main!(cluster_bench);
51 changes: 31 additions & 20 deletions redis/src/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,20 @@ pub(crate) trait RedisRuntime: AsyncStream + Send + Sync + Sized + 'static {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub(crate) enum Runtime {
#[cfg(feature = "tokio-comp")]
Tokio,
#[cfg(feature = "async-std-comp")]
AsyncStd,
}

impl Default for Runtime {
fn default() -> Self {
Self::locate()
}
}

impl Runtime {
pub(crate) fn locate() -> Self {
#[cfg(all(feature = "tokio-comp", not(feature = "async-std-comp")))]
Expand Down Expand Up @@ -112,8 +118,7 @@ impl Runtime {
}
}

#[allow(dead_code)]
fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
pub(crate) fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
match self {
#[cfg(feature = "tokio-comp")]
Runtime::Tokio => tokio::Tokio::spawn(f),
Expand Down Expand Up @@ -645,22 +650,10 @@ struct InFlight<O, E> {
}

// A single message sent through the pipeline
struct PipelineMessage<S, I, E> {
input: S,
output: PipelineOutput<I, E>,
response_count: usize,
}

/// Wrapper around a `Stream + Sink` where each item sent through the `Sink` results in one or more
/// items being output by the `Stream` (the number is specified at time of sending). With the
/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream`
/// and `Sink`.
struct Pipeline<SinkItem, I, E>(mpsc::Sender<PipelineMessage<SinkItem, I, E>>);

impl<SinkItem, I, E> Clone for Pipeline<SinkItem, I, E> {
fn clone(&self) -> Self {
Pipeline(self.0.clone())
}
pub(crate) struct PipelineMessage<S, I, E> {
pub(crate) input: S,
pub(crate) output: PipelineOutput<I, E>,
pub(crate) response_count: usize,
}

impl<SinkItem, I, E> Debug for Pipeline<SinkItem, I, E>
Expand Down Expand Up @@ -833,6 +826,20 @@ where
}
}

/// Wrapper around a `Stream + Sink` where each item sent through the `Sink` results in one or more
/// items being output by the `Stream` (the number is specified at time of sending). With the
/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream`
/// and `Sink`.
pub(crate) struct Pipeline<SinkItem, I, E>(
pub(crate) mpsc::Sender<PipelineMessage<SinkItem, I, E>>,
);

impl<SinkItem, I, E> Clone for Pipeline<SinkItem, I, E> {
fn clone(&self) -> Self {
Pipeline(self.0.clone())
}
}

impl<SinkItem, I, E> Pipeline<SinkItem, I, E>
where
SinkItem: Send + 'static,
Expand Down Expand Up @@ -894,7 +901,7 @@ where
/// on the same underlying connection (tcp/unix socket).
#[derive(Clone)]
pub struct MultiplexedConnection {
pipeline: Pipeline<Vec<u8>, Value, RedisError>,
pub(crate) pipeline: Pipeline<Vec<u8>, Value, RedisError>,
db: i64,
}

Expand Down Expand Up @@ -951,6 +958,10 @@ impl MultiplexedConnection {
};
Ok((con, driver))
}

pub(crate) async fn check_connection(&mut self) -> bool {
cmd("PING").query_async::<_, String>(self).await.is_ok()
}
}

impl ConnectionLike for MultiplexedConnection {
Expand Down
20 changes: 11 additions & 9 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ use crate::parser::parse_redis_value;
use crate::pipeline::Pipeline;
use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, Value};

#[cfg(feature = "aio")]
pub use crate::cluster_aio::MultiplexedClusterConnection;
pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
#[doc(no_inline)]
pub use crate::cmd::pipe as cluster_pipe;
#[doc(no_inline)]
pub use crate::pipeline::Pipeline as ClusterPipeline;

type SlotMap = BTreeMap<u16, [String; 2]>;
pub(crate) type SlotMap = BTreeMap<u16, [String; 2]>;

/// This is a connection of Redis cluster.
#[derive(Default)]
Expand Down Expand Up @@ -510,7 +512,7 @@ impl ConnectionLike for ClusterConnection {
}

// MergeResults trait.
trait MergeResults {
pub(crate) trait MergeResults {
fn merge_results(values: HashMap<String, Self>) -> Self
where
Self: Sized;
Expand All @@ -536,15 +538,15 @@ impl MergeResults for Vec<Value> {
}

// NodeCmd struct.
struct NodeCmd {
node: String,
pub(crate) struct NodeCmd {
pub(crate) node: String,
// The original command indexes
indexes: Vec<usize>,
pipe: Vec<u8>,
pub(crate) indexes: Vec<usize>,
pub(crate) pipe: Vec<u8>,
}

impl NodeCmd {
fn new(node: &str) -> NodeCmd {
pub(crate) fn new(node: &str) -> NodeCmd {
NodeCmd {
node: node.to_string(),
indexes: vec![],
Expand All @@ -554,7 +556,7 @@ impl NodeCmd {
}

// Parse `CLUSTER SLOTS` response into SlotMap.
fn parse_slots_response(
pub(crate) fn parse_slots_response(
response: Vec<Value>,
cluster_params: &ClusterParams,
) -> RedisResult<SlotMap> {
Expand Down Expand Up @@ -665,7 +667,7 @@ fn parse_slots_response(
.collect())
}

fn get_connection_info(node: &str, cluster_params: &ClusterParams) -> ConnectionInfo {
pub(crate) fn get_connection_info(node: &str, cluster_params: &ClusterParams) -> ConnectionInfo {
ConnectionInfo {
addr: get_connection_addr(parse_node_str(node), cluster_params.tls_insecure),
redis: RedisConnectionInfo {
Expand Down

0 comments on commit 9968153

Please sign in to comment.