-
-
Notifications
You must be signed in to change notification settings - Fork 24
/
send_envelope.rs
44 lines (40 loc) · 1.71 KB
/
send_envelope.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use cassandra_protocol::error;
use cassandra_protocol::frame::Envelope;
use std::sync::Arc;
use crate::cluster::topology::Node;
use crate::cluster::ConnectionManager;
use crate::retry::{QueryInfo, RetryDecision, RetrySession};
use crate::transport::CdrsTransport;
/// Mid-level interface for sending envelopes to the cluster. Uses a query plan to route envelope to
/// appropriate node, and retry policy for error handling. Returns `None` if no nodes were present
/// in the query plan.
pub async fn send_envelope<T: CdrsTransport + 'static, CM: ConnectionManager<T> + 'static>(
query_plan: impl Iterator<Item = Arc<Node<T, CM>>>,
envelope: &Envelope,
is_idempotent: bool,
mut retry_session: Box<dyn RetrySession + Send + Sync>,
) -> Option<error::Result<Envelope>> {
'next_node: for node in query_plan {
loop {
let transport = node.persistent_connection().await;
match transport {
Ok(transport) => match transport.write_envelope(envelope, false).await {
Ok(envelope) => return Some(Ok(envelope)),
Err(error) => {
let query_info = QueryInfo {
error: &error,
is_idempotent,
};
match retry_session.decide(query_info) {
RetryDecision::RetrySameNode => continue,
RetryDecision::RetryNextNode => continue 'next_node,
RetryDecision::DontRetry => return Some(Err(error)),
}
}
},
Err(error) => return Some(Err(error)),
}
}
}
None
}