Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5/5] Add Async MultiplexedClusterConnection #640

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
84349bb
cluster_client: implement proper builder pattern
utkarshgupta137 Jul 24, 2022
b5ec28e
cluster_client: use ClusterParams struct to pass params
utkarshgupta137 Jul 24, 2022
79fe0ff
cluster: reorganise imports by module
utkarshgupta137 Aug 23, 2022
44a3de6
cluster: simplify TlsMode logic and add tls_insecure param
utkarshgupta137 Aug 23, 2022
77025aa
cluster: refactor connect & create_initial_connections
utkarshgupta137 Aug 23, 2022
dee0504
cluster: keep all params in ClusterParams struct & use it to pass all…
utkarshgupta137 Aug 23, 2022
a12d245
cluster: add read/write timeout params to builder
utkarshgupta137 Aug 23, 2022
24544d4
cluster: add auto_reconnect param to builder
utkarshgupta137 Aug 23, 2022
34a8613
cluster: add configurable connect timeout
utkarshgupta137 Aug 23, 2022
cb94250
cluster: add configurable retries
utkarshgupta137 Aug 23, 2022
ad326cb
cluster: move slots response parsing logic to get_slots function
utkarshgupta137 Aug 23, 2022
0033457
cluster: remove random connection logic
utkarshgupta137 Jul 24, 2022
5d14992
cluster: remove RefCell for connections/slot
utkarshgupta137 Jul 24, 2022
e943189
cluster: consolidate routing logic into Routable trait
utkarshgupta137 Jul 24, 2022
9d744d3
cluster: fix replica routing for execute_on_all_nodes
utkarshgupta137 Jul 24, 2022
1088805
cluster: consolidate logic for create_initial_connections & refresh_s…
utkarshgupta137 Jul 24, 2022
0b9cb6b
cluster: directly expose Slot fields instead of bare methods
utkarshgupta137 Jul 24, 2022
14f6d33
cluster: merge send_all_commands & recv_all_commands
utkarshgupta137 Jul 24, 2022
5bfa0a6
aio: remove unnecessary futures dependency
utkarshgupta137 Jul 28, 2022
de8a514
aio: move connection creation logic from client to Runtime
utkarshgupta137 Jul 28, 2022
d4608c3
aio: fix ConnectionLike trait documentation & add supports_pipelining…
utkarshgupta137 Jul 28, 2022
0884896
sio: improve ConnectionLike trait documentation & add req_pipeline/su…
utkarshgupta137 Jul 28, 2022
7d5aca7
cluster: use common pipeline instead of separate ClusterPipeline
utkarshgupta137 Jul 28, 2022
9968153
cluster: add Async MultiplexedClusterConnection
utkarshgupta137 Jul 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ tokio = { version = "1", features = ["rt", "net"], optional = true }

# Only needed for the connection manager
arc-swap = { version = "1.1.0", optional = true }
futures = { version = "0.3.3", optional = true }

# Only needed for the r2d2 feature
r2d2 = { version = "0.8.8", optional = true }

# Only needed for cluster
crc16 = { version = "0.4", optional = true }
rand = { version = "0.8", optional = true }

# Only needed for async_std support
async-std = { version = "1.8.0", optional = true}
async-trait = { version = "0.1.24", optional = true }
Expand All @@ -73,7 +73,7 @@ async-std-comp = ["aio", "async-std"]
async-std-tls-comp = ["async-std-comp", "async-native-tls", "tls"]
tokio-comp = ["aio", "tokio", "tokio/net"]
tokio-native-tls-comp = ["tls", "tokio-native-tls"]
connection-manager = ["arc-swap", "futures", "aio"]
connection-manager = ["arc-swap", "aio"]
streams = []


Expand Down
144 changes: 142 additions & 2 deletions redis/benches/bench_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#![allow(clippy::unit_arg)] // want to allow this for `black_box()`
#![cfg(feature = "cluster")]
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::{prelude::*, stream};
use redis::cluster::cluster_pipe;
use redis::RedisError;

use support::*;

#[path = "../tests/support/mod.rs"]
mod support;

const PIPELINE_QUERIES: usize = 100;
const PIPELINE_QUERIES: usize = 1_000;

fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection) {
let key = "test_key";
Expand Down Expand Up @@ -76,6 +78,123 @@ fn bench_pipeline(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection
group.finish();
}

fn bench_set_get_and_del_async(c: &mut Criterion, client: &mut redis::cluster::ClusterClient) {
let key = "test_key";

let mut group = c.benchmark_group("cluster_async_basic");
let runtime = current_thread_runtime();
let con = client.get_async_connection();
let mut con = runtime.block_on(con).unwrap();

group.bench_function("set", |b| {
b.iter(|| {
runtime
.block_on(async {
redis::cmd("SET")
.arg(key)
.arg(42)
.query_async(&mut con)
.await?;
Ok::<(), RedisError>(())
})
.unwrap()
})
});

group.bench_function("get", |b| {
b.iter(|| {
runtime
.block_on(async {
redis::cmd("GET").arg(key).query_async(&mut con).await?;
Ok::<(), RedisError>(())
})
.unwrap()
})
});

group.bench_function("set_and_del", |b| {
b.iter(|| {
runtime
.block_on(async {
redis::cmd("SET")
.arg(key)
.arg(42)
.query_async(&mut con)
.await?;
redis::cmd("DEL").arg(key).query_async(&mut con).await?;
Ok::<(), RedisError>(())
})
.unwrap()
})
});

group.finish();
}

fn bench_pipeline_async(c: &mut Criterion, client: &mut redis::cluster::ClusterClient) {
let mut group = c.benchmark_group("cluster_async_pipeline");
let runtime = current_thread_runtime();
let con = client.get_async_connection();
let mut con = runtime.block_on(con).unwrap();

group.throughput(Throughput::Elements(PIPELINE_QUERIES as u64));

let mut queries = Vec::new();
for i in 0..PIPELINE_QUERIES {
queries.push(format!("foo{}", i));
}

let build_pipeline = || {
let mut pipe = cluster_pipe();
for q in &queries {
pipe.set(q, "bar").ignore();
}
};
group.bench_function("build_pipeline", |b| {
b.iter(|| {
build_pipeline();
black_box(())
})
});

let mut pipe = cluster_pipe();
for q in &queries {
pipe.set(q, "bar").ignore();
}
group.bench_function("query_pipeline", |b| {
b.iter(|| {
runtime
.block_on(async { pipe.query_async::<_, ()>(&mut con).await })
.unwrap();
})
});

let cmds: Vec<_> = (0..PIPELINE_QUERIES)
.map(|i| redis::cmd("SET").arg(format!("foo{}", i)).arg(i).clone())
.collect();

let mut connections = (0..PIPELINE_QUERIES)
.map(|_| con.clone())
.collect::<Vec<_>>();

group.bench_function("query_implicit_pipeline", |b| {
b.iter(|| {
runtime
.block_on(async {
cmds.iter()
.zip(&mut connections)
.map(|(cmd, con)| cmd.query_async(con))
.collect::<stream::FuturesUnordered<_>>()
.try_for_each(|()| async { Ok(()) })
.await
})
.unwrap();
})
});

group.finish();
}

fn bench_cluster_setup(c: &mut Criterion) {
let cluster = TestClusterContext::new(6, 1);
cluster.wait_for_cluster_up();
Expand All @@ -85,6 +204,14 @@ fn bench_cluster_setup(c: &mut Criterion) {
bench_pipeline(c, &mut con);
}

fn bench_cluster_async_setup(c: &mut Criterion) {
let mut cluster = TestClusterContext::new(6, 1);
cluster.wait_for_cluster_up();

bench_set_get_and_del_async(c, &mut cluster.client);
bench_pipeline_async(c, &mut cluster.client);
}

#[allow(dead_code)]
fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) {
let cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| {
Expand All @@ -97,9 +224,22 @@ fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) {
bench_pipeline(c, &mut con);
}

#[allow(dead_code)]
fn bench_cluster_async_read_from_replicas_setup(c: &mut Criterion) {
let mut cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| {
builder.read_from_replicas()
});
cluster.wait_for_cluster_up();

bench_set_get_and_del_async(c, &mut cluster.client);
bench_pipeline_async(c, &mut cluster.client);
}

criterion_group!(
cluster_bench,
bench_cluster_setup,
// bench_cluster_read_from_replicas_setup
// bench_cluster_read_from_replicas_setup,
bench_cluster_async_setup,
// bench_cluster_async_read_from_replicas_setup,
);
criterion_main!(cluster_bench);
Loading