Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: small performance optimisations #780

Merged
merged 7 commits into from Aug 22, 2023
Merged
97 changes: 51 additions & 46 deletions scylla/src/transport/cluster.rs
Expand Up @@ -237,29 +237,6 @@ impl Cluster {

response_receiver.await.unwrap() // ClusterWorker always responds
}

/// Returns nonempty list of working connections to all shards
pub(crate) async fn get_working_connections(&self) -> Result<Vec<Arc<Connection>>, QueryError> {
let cluster_data: Arc<ClusterData> = self.get_data();
let peers = &cluster_data.known_peers;

let mut result: Vec<Arc<Connection>> = Vec::with_capacity(peers.len());

let mut last_error: Option<QueryError> = None;

for node in peers.values() {
match node.get_working_connections() {
Ok(conns) => result.extend(conns),
Err(e) => last_error = Some(e),
}
}

if result.is_empty() {
return Err(last_error.unwrap()); // By invariant peers is nonempty
}

Ok(result)
}
}

impl ClusterData {
Expand All @@ -269,7 +246,7 @@ impl ClusterData {
datacenter.rack_count = datacenter
.nodes
.iter()
.filter_map(|node| node.rack.clone())
.filter_map(|node| node.rack.as_ref())
.unique()
.count();
}
Expand Down Expand Up @@ -301,27 +278,35 @@ impl ClusterData {
// Take existing Arc<Node> if possible, otherwise create new one
// Changing rack/datacenter but not ip address seems improbable
// so we can just create new node and connections then
let node: Arc<Node> = match known_peers.get(&peer.host_id) {
let peer_host_id = peer.host_id;
let peer_address = peer.address;
let peer_tokens;

let node: Arc<Node> = match known_peers.get(&peer_host_id) {
Some(node) if node.datacenter == peer.datacenter && node.rack == peer.rack => {
if node.address == peer.address {
let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens();
peer_tokens = tokens;
if node.address == peer_address {
node.clone()
} else {
// If IP changes, the Node struct is recreated, but the underlying pool is preserved and notified about the IP change.
Arc::new(Node::inherit_with_ip_changed(node, peer.to_peer_endpoint()))
Arc::new(Node::inherit_with_ip_changed(node, peer_endpoint))
}
}
_ => {
let is_enabled = host_filter.map_or(true, |f| f.accept(&peer));
let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens();
peer_tokens = tokens;
Arc::new(Node::new(
peer.to_peer_endpoint(),
peer_endpoint,
pool_config.clone(),
used_keyspace.clone(),
is_enabled,
))
}
};

new_known_peers.insert(peer.host_id, node.clone());
new_known_peers.insert(peer_host_id, node.clone());

if let Some(dc) = &node.datacenter {
match datacenters.get_mut(dc) {
Expand All @@ -336,7 +321,7 @@ impl ClusterData {
}
}

for token in peer.tokens {
for token in peer_tokens {
ring.push((token, node.clone()));
}

Expand All @@ -345,21 +330,18 @@ impl ClusterData {

Self::update_rack_count(&mut datacenters);

let keyspace_strategies: Vec<Strategy> = metadata
.keyspaces
.values()
.map(|ks| ks.strategy.clone())
.collect();

let locator = tokio::task::spawn_blocking(move || {
ReplicaLocator::new(ring.into_iter(), keyspace_strategies.iter())
let keyspaces = metadata.keyspaces;
wprzytula marked this conversation as resolved.
Show resolved Hide resolved
let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies);
(locator, keyspaces)
})
.await
.unwrap();

ClusterData {
known_peers: new_known_peers,
keyspaces: metadata.keyspaces,
keyspaces,
locator,
}
}
Expand Down Expand Up @@ -476,6 +458,32 @@ impl ClusterData {
pub fn replica_locator(&self) -> &ReplicaLocator {
&self.locator
}

/// Returns nonempty iterator of working connections to all shards.
pub(crate) fn iter_working_connections(
wprzytula marked this conversation as resolved.
Show resolved Hide resolved
wprzytula marked this conversation as resolved.
Show resolved Hide resolved
&self,
) -> Result<impl Iterator<Item = Arc<Connection>> + '_, QueryError> {
// The returned iterator is nonempty by nonemptiness invariant of `self.known_peers`.
assert!(!self.known_peers.is_empty());
let mut peers_iter = self.known_peers.values();

// First we try to find the first working pool of connections.
// If none is found, return error.
let first_working_pool = peers_iter
.by_ref()
.map(|node| node.get_working_connections())
.find_or_first(Result::is_ok)
.expect("impossible: known_peers was asserted to be nonempty")?;

let remaining_pools_iter = peers_iter
.map(|node| node.get_working_connections())
.flatten_ok()
.flatten();

Ok(first_working_pool.into_iter().chain(remaining_pools_iter))
// By an invariant `self.known_peers` is nonempty, so the returned iterator
// is nonempty, too.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that? If all node pools are in MaybePoolConnections::Broken state, every call to node.get_working_connections() will fail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, and hence the resulting iterator will consist of only Err variants.
flatten_ok() does not remove any Err variants; what it does is flattening the Ok variants; that is, Ok(Ok(x)) becomes Ok(x).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a quite large change in semantics. Previously, if all nodes had a broken connection pool then the function returned an error; otherwise you would get a list of current connections. Now, you are putting the responsibility for handling errors to the caller. Despite this change the code seems to be working due to how the current callers are handling the errors from the iterator.

I'd rather see the old semantics, i.e. the function should either fail immediately if there are no connections or return an iterator that returns just Arc<Connection>. Alternatively (but not preferably) you could document the meaning of what the iterator really returns.

}
}

impl ClusterWorker {
Expand Down Expand Up @@ -591,13 +599,10 @@ impl ClusterWorker {
cluster_data: Arc<ClusterData>,
keyspace_name: &VerifiedKeyspaceName,
) -> Result<(), QueryError> {
let mut use_keyspace_futures = Vec::new();

for node in cluster_data.known_peers.values() {
let fut = node.use_keyspace(keyspace_name.clone());
use_keyspace_futures.push(fut);
}

let use_keyspace_futures = cluster_data
.known_peers
.values()
.map(|node| node.use_keyspace(keyspace_name.clone()));
let use_keyspace_results: Vec<Result<(), QueryError>> =
join_all(use_keyspace_futures).await;

Expand Down
39 changes: 17 additions & 22 deletions scylla/src/transport/session.rs
Expand Up @@ -16,7 +16,7 @@ use bytes::Bytes;
use bytes::BytesMut;
use futures::future::join_all;
use futures::future::try_join_all;
use itertools::Either;
use itertools::{Either, Itertools};
pub use scylla_cql::errors::TranslationError;
use scylla_cql::frame::response::result::{PreparedMetadata, Rows};
use scylla_cql::frame::response::NonErrorResponse;
Expand Down Expand Up @@ -860,32 +860,26 @@ impl Session {
/// ```
pub async fn prepare(&self, query: impl Into<Query>) -> Result<PreparedStatement, QueryError> {
let query = query.into();
let query_ref = &query;

let connections = self.cluster.get_working_connections().await?;
let cluster_data = self.get_cluster_data();
let connections_iter = cluster_data.iter_working_connections()?;

// Prepare statements on all connections concurrently
let handles = connections.iter().map(|c| c.prepare(&query));
let mut results = join_all(handles).await;
let handles = connections_iter.map(|c| async move { c.prepare(query_ref).await });
let mut results = join_all(handles).await.into_iter();

// If at least one prepare was successful prepare returns Ok
// If at least one prepare was successful, `prepare()` returns Ok.
// Find the first result that is Ok, or Err if all failed.

// Find first result that is Ok, or Err if all failed
let mut first_ok: Option<Result<PreparedStatement, QueryError>> = None;

while let Some(res) = results.pop() {
let is_ok: bool = res.is_ok();

first_ok = Some(res);

if is_ok {
break;
}
}

let mut prepared: PreparedStatement = first_ok.unwrap()?;
// Safety: there is at least one node in the cluster, and `Cluster::iter_working_connections()`
// returns either an error or an iterator with at least one connection, so there will be at least one result.
let first_ok: Result<PreparedStatement, QueryError> =
results.by_ref().find_or_first(Result::is_ok).unwrap();
let mut prepared: PreparedStatement = first_ok?;

// Validate prepared ids equality
for statement in results.into_iter().flatten() {
for statement in results.flatten() {
if prepared.get_id() != statement.get_id() {
return Err(QueryError::ProtocolError(
"Prepared statement Ids differ, all should be equal",
Expand Down Expand Up @@ -1870,9 +1864,10 @@ impl Session {
}

pub async fn check_schema_agreement(&self) -> Result<bool, QueryError> {
let connections = self.cluster.get_working_connections().await?;
let cluster_data = self.get_cluster_data();
let connections_iter = cluster_data.iter_working_connections()?;

let handles = connections.iter().map(|c| c.fetch_schema_version());
let handles = connections_iter.map(|c| async move { c.fetch_schema_version().await });
let versions = try_join_all(handles).await?;

let local_version: Uuid = versions[0];
wprzytula marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
12 changes: 12 additions & 0 deletions scylla/src/transport/topology.rs
Expand Up @@ -113,6 +113,18 @@ impl Peer {
rack: self.rack.clone(),
}
}

pub(crate) fn into_peer_endpoint_and_tokens(self) -> (PeerEndpoint, Vec<Token>) {
(
PeerEndpoint {
host_id: self.host_id,
address: self.address,
datacenter: self.datacenter,
rack: self.rack,
},
self.tokens,
)
}
}

/// Data used to issue connections to a node that is possibly subject to address translation.
Expand Down