Skip to content

Commit

Permalink
add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit committed Sep 17, 2023
1 parent bd00cf2 commit 8cc3d6c
Showing 1 changed file with 103 additions and 0 deletions.
103 changes: 103 additions & 0 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,109 @@ fn test_async_cluster_basic_script() {
.unwrap();
}

#[test]
fn test_async_route_flush_to_specific_node() {
let cluster = TestClusterContext::new(3, 0);

block_on_all(async move {
let mut connection = cluster.async_connection().await;
let _: () = connection.set("foo", "bar").await.unwrap();
let _: () = connection.set("bar", "foo").await.unwrap();

let route = redis::cluster_routing::Route::new(1, redis::cluster_routing::SlotAddr::Master);
let single_node_route = redis::cluster_routing::SingleNodeRoutingInfo::SpecificNode(route);
let routing = RoutingInfo::SingleNode(single_node_route);
assert_eq!(
connection
.route_command(&redis::cmd("FLUSHALL"), routing, None)
.await
.unwrap(),
Value::Okay
);
let res: String = connection.get("foo").await.unwrap();
assert_eq!(res, "bar".to_string());
let res2: Option<String> = connection.get("bar").await.unwrap();
assert_eq!(res2, None);
Ok::<_, RedisError>(())
})
.unwrap();
}

#[test]
fn test_async_route_info_to_nodes() {
let cluster = TestClusterContext::new(12, 1);

let split_to_addresses_and_info = |res| -> (Vec<String>, Vec<String>) {
if let Value::Bulk(values) = res {
let mut pairs: Vec<_> = values
.into_iter()
.map(|value| redis::from_redis_value::<(String, String)>(&value).unwrap())
.collect();
pairs.sort_by(|(address1, _), (address2, _)| address1.cmp(address2));
pairs.into_iter().unzip()
} else {
unreachable!("{:?}", res);
}
};

block_on_all(async move {
let cluster_addresses: Vec<_> = cluster
.cluster
.servers
.iter()
.map(|server| server.connection_info())
.collect();
let client = ClusterClient::builder(cluster_addresses.clone())
.read_from_replicas()
.build()?;
let mut connection = client.get_async_connection().await?;

let route_to_all_nodes = redis::cluster_routing::MultipleNodeRoutingInfo::AllNodes;
let routing = RoutingInfo::MultiNode(route_to_all_nodes);
let res = connection
.route_command(&redis::cmd("INFO"), routing, None)
.await
.unwrap();
let (addresses, infos) = split_to_addresses_and_info(res);

let mut cluster_addresses: Vec<_> = cluster_addresses
.into_iter()
.map(|info| info.addr.to_string())
.collect();
cluster_addresses.sort();

assert_eq!(addresses.len(), 12);
assert_eq!(addresses, cluster_addresses);
assert_eq!(infos.len(), 12);
for i in 0..12 {
let split: Vec<_> = addresses[i].split(":").collect();
assert!(infos[i].contains(&format!("bind={}", split[0])));
assert!(infos[i].contains(&format!("port={}", split[1])));
}

let route_to_all_primaries = redis::cluster_routing::MultipleNodeRoutingInfo::AllMasters;
let routing = RoutingInfo::MultiNode(route_to_all_primaries);
let res = connection
.route_command(&redis::cmd("INFO"), routing, None)
.await
.unwrap();
let (addresses, infos) = split_to_addresses_and_info(res);
assert_eq!(addresses.len(), 6);
assert_eq!(infos.len(), 6);
// verify that all primaries have the correct port & host, and are marked as primaries.
for i in 0..6 {
assert!(cluster_addresses.contains(&addresses[i]));
let split: Vec<_> = addresses[i].split(":").collect();
assert!(infos[i].contains(&format!("bind={}", split[0])));
assert!(infos[i].contains(&format!("port={}", split[1])));
assert!(infos[i].contains("role:primary") || infos[i].contains("role:master"));
}

Ok::<_, RedisError>(())
})
.unwrap();
}

#[ignore] // TODO Handle pipe where the keys do not all go to the same node
#[test]
fn test_async_cluster_basic_pipe() {
Expand Down

0 comments on commit 8cc3d6c

Please sign in to comment.