Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing routing information to cluster. #899

Merged
merged 9 commits into from
Sep 18, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,109 @@ fn test_async_cluster_basic_script() {
.unwrap();
}

#[test]
fn test_async_route_flush_to_specific_node() {
let cluster = TestClusterContext::new(3, 0);

block_on_all(async move {
let mut connection = cluster.async_connection().await;
let _: () = connection.set("foo", "bar").await.unwrap();
let _: () = connection.set("bar", "foo").await.unwrap();

let route = redis::cluster_routing::Route::new(1, redis::cluster_routing::SlotAddr::Master);
let single_node_route = redis::cluster_routing::SingleNodeRoutingInfo::SpecificNode(route);
let routing = RoutingInfo::SingleNode(single_node_route);
assert_eq!(
connection
.route_command(&redis::cmd("FLUSHALL"), routing, None)
.await
.unwrap(),
Value::Okay
);
let res: String = connection.get("foo").await.unwrap();
assert_eq!(res, "bar".to_string());
let res2: Option<String> = connection.get("bar").await.unwrap();
nihohit marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(res2, None);
Ok::<_, RedisError>(())
})
.unwrap();
}

#[test]
fn test_async_route_info_to_nodes() {
let cluster = TestClusterContext::new(12, 1);

let split_to_addresses_and_info = |res| -> (Vec<String>, Vec<String>) {
if let Value::Bulk(values) = res {
let mut pairs: Vec<_> = values
.into_iter()
.map(|value| redis::from_redis_value::<(String, String)>(&value).unwrap())
.collect();
pairs.sort_by(|(address1, _), (address2, _)| address1.cmp(address2));
pairs.into_iter().unzip()
} else {
unreachable!("{:?}", res);
}
};

block_on_all(async move {
let cluster_addresses: Vec<_> = cluster
.cluster
.servers
.iter()
.map(|server| server.connection_info())
.collect();
let client = ClusterClient::builder(cluster_addresses.clone())
.read_from_replicas()
.build()?;
let mut connection = client.get_async_connection().await?;

let route_to_all_nodes = redis::cluster_routing::MultipleNodeRoutingInfo::AllNodes;
let routing = RoutingInfo::MultiNode(route_to_all_nodes);
let res = connection
.route_command(&redis::cmd("INFO"), routing, None)
.await
.unwrap();
let (addresses, infos) = split_to_addresses_and_info(res);

let mut cluster_addresses: Vec<_> = cluster_addresses
.into_iter()
.map(|info| info.addr.to_string())
.collect();
cluster_addresses.sort();

assert_eq!(addresses.len(), 12);
assert_eq!(addresses, cluster_addresses);
assert_eq!(infos.len(), 12);
for i in 0..12 {
let split: Vec<_> = addresses[i].split(":").collect();
assert!(infos[i].contains(&format!("bind={}", split[0])));
assert!(infos[i].contains(&format!("port={}", split[1])));
}

let route_to_all_primaries = redis::cluster_routing::MultipleNodeRoutingInfo::AllMasters;
let routing = RoutingInfo::MultiNode(route_to_all_primaries);
let res = connection
.route_command(&redis::cmd("INFO"), routing, None)
.await
.unwrap();
let (addresses, infos) = split_to_addresses_and_info(res);
assert_eq!(addresses.len(), 6);
assert_eq!(infos.len(), 6);
// verify that all primaries have the correct port & host, and are marked as primaries.
for i in 0..6 {
assert!(cluster_addresses.contains(&addresses[i]));
let split: Vec<_> = addresses[i].split(":").collect();
assert!(infos[i].contains(&format!("bind={}", split[0])));
assert!(infos[i].contains(&format!("port={}", split[1])));
assert!(infos[i].contains("role:primary") || infos[i].contains("role:master"));
}

Ok::<_, RedisError>(())
})
.unwrap();
}

#[ignore] // TODO Handle pipe where the keys do not all go to the same node
#[test]
fn test_async_cluster_basic_pipe() {
Expand Down
Loading