Skip to content

Commit

Permalink
cluster_client: use ClusterParams struct to pass params
Browse files Browse the repository at this point in the history
- this is to simplify passing multiple params to & inside ClusterConnection impl
  • Loading branch information
utkarshgupta137 committed Aug 31, 2022
1 parent 53e6c4b commit e723f98
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 79 deletions.
19 changes: 9 additions & 10 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use super::{
Cmd, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, ErrorKind, IntoConnectionInfo,
RedisError, RedisResult, Value,
};
use crate::cluster_client::ClusterParams;

pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
use crate::cluster_pipeline::UNROUTABLE_ERROR;
Expand Down Expand Up @@ -95,25 +96,23 @@ impl TlsMode {

impl ClusterConnection {
pub(crate) fn new(
cluster_params: ClusterParams,
initial_nodes: Vec<ConnectionInfo>,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<ClusterConnection> {
let connections = Self::create_initial_connections(
&initial_nodes,
read_from_replicas,
username.clone(),
password.clone(),
cluster_params.read_from_replicas,
cluster_params.username.clone(),
cluster_params.password.clone(),
)?;

let connection = ClusterConnection {
connections: RefCell::new(connections),
slots: RefCell::new(SlotMap::new()),
auto_reconnect: RefCell::new(true),
read_from_replicas,
username,
password,
read_from_replicas: cluster_params.read_from_replicas,
username: cluster_params.username,
password: cluster_params.password,
read_timeout: RefCell::new(None),
write_timeout: RefCell::new(None),
#[cfg(feature = "tls")]
Expand All @@ -135,7 +134,7 @@ impl ClusterConnection {
},
#[cfg(not(feature = "tls"))]
tls: None,
initial_nodes,
initial_nodes: initial_nodes.to_vec(),
};
connection.refresh_slots()?;

Expand Down
128 changes: 59 additions & 69 deletions redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use crate::cluster::ClusterConnection;
use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo};
use crate::types::{ErrorKind, RedisError, RedisResult};

use super::{
ConnectionAddr, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, RedisResult,
};
/// Redis cluster specific parameters.
#[derive(Default, Clone)]
pub(crate) struct ClusterParams {
pub(crate) password: Option<String>,
pub(crate) username: Option<String>,
pub(crate) read_from_replicas: bool,
}

/// Used to configure and build a [`ClusterClient`].
pub struct ClusterClientBuilder {
initial_nodes: RedisResult<Vec<ConnectionInfo>>,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
cluster_params: ClusterParams,
}

impl ClusterClientBuilder {
Expand All @@ -22,9 +26,7 @@ impl ClusterClientBuilder {
.into_iter()
.map(|x| x.into_connection_info())
.collect(),
read_from_replicas: false,
username: None,
password: None,
cluster_params: ClusterParams::default(),
}
}

Expand All @@ -39,68 +41,69 @@ impl ClusterClientBuilder {
/// usernames, an error is returned.
pub fn build(self) -> RedisResult<ClusterClient> {
let initial_nodes = self.initial_nodes?;

let mut nodes = Vec::with_capacity(initial_nodes.len());
let mut connection_info_password = None::<String>;
let mut connection_info_username = None::<String>;

for (index, info) in initial_nodes.into_iter().enumerate() {
if let ConnectionAddr::Unix(_) = info.addr {
let mut cluster_params = self.cluster_params;
let password = if cluster_params.password.is_none() {
cluster_params.password = initial_nodes[0].redis.password.clone();
&cluster_params.password
} else {
&None
};
let username = if cluster_params.username.is_none() {
cluster_params.username = initial_nodes[0].redis.username.clone();
&cluster_params.username
} else {
&None
};

for node in initial_nodes {
if let ConnectionAddr::Unix(_) = node.addr {
return Err(RedisError::from((ErrorKind::InvalidClientConfig,
"This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
"This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
}

if self.password.is_none() {
if index == 0 {
connection_info_password = info.redis.password.clone();
} else if connection_info_password != info.redis.password {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Cannot use different password among initial nodes.",
)));
}
if password.is_some() && node.redis.password != *password {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Cannot use different password among initial nodes.",
)));
}

if self.username.is_none() {
if index == 0 {
connection_info_username = info.redis.username.clone();
} else if connection_info_username != info.redis.username {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Cannot use different username among initial nodes.",
)));
}
if username.is_some() && node.redis.username != *username {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Cannot use different username among initial nodes.",
)));
}

nodes.push(info);
nodes.push(node);
}

Ok(ClusterClient {
initial_nodes: nodes,
read_from_replicas: self.read_from_replicas,
username: self.username.or(connection_info_username),
password: self.password.or(connection_info_password),
cluster_params,
})
}

/// Set password for new ClusterClient.
/// Sets password for new ClusterClient.
pub fn password(mut self, password: String) -> ClusterClientBuilder {
self.password = Some(password);
self.cluster_params.password = Some(password);
self
}

/// Set username for new ClusterClient.
/// Sets username for new ClusterClient.
pub fn username(mut self, username: String) -> ClusterClientBuilder {
self.username = Some(username);
self.cluster_params.username = Some(username);
self
}

/// Enable read from replicas for new ClusterClient (default is false).
/// Enables read from replicas for new ClusterClient (default is false).
///
/// If True, then read queries will go to the replica nodes & write queries will go to the
/// primary nodes. If there are no replica nodes, then all queries will go to the primary nodes.
pub fn read_from_replicas(mut self) -> ClusterClientBuilder {
self.read_from_replicas = true;
self.cluster_params.read_from_replicas = true;
self
}

Expand All @@ -113,17 +116,16 @@ impl ClusterClientBuilder {
/// Use `read_from_replicas()`.
#[deprecated(since = "0.22.0", note = "Use read_from_replicas()")]
pub fn readonly(mut self, read_from_replicas: bool) -> ClusterClientBuilder {
self.read_from_replicas = read_from_replicas;
self.cluster_params.read_from_replicas = read_from_replicas;
self
}
}

/// This is a Redis cluster client.
#[derive(Clone)]
pub struct ClusterClient {
initial_nodes: Vec<ConnectionInfo>,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
cluster_params: ClusterParams,
}

impl ClusterClient {
Expand All @@ -145,19 +147,14 @@ impl ClusterClient {
ClusterClientBuilder::new(initial_nodes)
}

/// Opens connections to Redis Cluster nodes and returns a
/// Creates new connections to Redis Cluster nodes and return a
/// [`ClusterConnection`].
///
/// # Errors
///
/// An error is returned if there is a failure to open connections or to create slots.
/// An error is returned if there is a failure while creating connections or slots.
pub fn get_connection(&self) -> RedisResult<ClusterConnection> {
ClusterConnection::new(
self.initial_nodes.clone(),
self.read_from_replicas,
self.username.clone(),
self.password.clone(),
)
ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
}

/// Use `new()`.
Expand All @@ -167,16 +164,9 @@ impl ClusterClient {
}
}

impl Clone for ClusterClient {
fn clone(&self) -> ClusterClient {
ClusterClient::new(self.initial_nodes.clone()).unwrap()
}
}

#[cfg(test)]
mod tests {
use super::{ClusterClient, ClusterClientBuilder};
use super::{ConnectionInfo, IntoConnectionInfo};
use super::{ClusterClient, ClusterClientBuilder, ConnectionInfo, IntoConnectionInfo};

fn get_connection_data() -> Vec<ConnectionInfo> {
vec![
Expand Down Expand Up @@ -217,20 +207,20 @@ mod tests {
#[test]
fn give_no_password() {
let client = ClusterClient::new(get_connection_data()).unwrap();
assert_eq!(client.password, None);
assert_eq!(client.cluster_params.password, None);
}

#[test]
fn give_password_by_initial_nodes() {
let client = ClusterClient::new(get_connection_data_with_password()).unwrap();
assert_eq!(client.password, Some("password".to_string()));
assert_eq!(client.cluster_params.password, Some("password".to_string()));
}

#[test]
fn give_username_and_password_by_initial_nodes() {
let client = ClusterClient::new(get_connection_data_with_username_and_password()).unwrap();
assert_eq!(client.password, Some("password".to_string()));
assert_eq!(client.username, Some("user1".to_string()));
assert_eq!(client.cluster_params.password, Some("password".to_string()));
assert_eq!(client.cluster_params.username, Some("user1".to_string()));
}

#[test]
Expand Down Expand Up @@ -260,7 +250,7 @@ mod tests {
.username("user1".to_string())
.build()
.unwrap();
assert_eq!(client.password, Some("pass".to_string()));
assert_eq!(client.username, Some("user1".to_string()));
assert_eq!(client.cluster_params.password, Some("pass".to_string()));
assert_eq!(client.cluster_params.username, Some("user1".to_string()));
}
}

0 comments on commit e723f98

Please sign in to comment.