Skip to content

Commit

Permalink
document cassandra connection rx_process task (#867)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 20, 2022
1 parent 0e83fec commit b6edc8f
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions shotover-proxy/src/transforms/cassandra/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,25 @@ async fn rx_process_fallible<T: AsyncRead>(
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
) -> Result<()> {
let mut reader = FramedRead::new(read, codec);
let mut return_channel_map: HashMap<i16, (oneshot::Sender<Response>, Message)> = HashMap::new();

let mut return_message_map: HashMap<i16, Message> = HashMap::new();
// Invariants:
// * client must not reuse a stream_id until the client has received a response with that stream_id (not required by the protocol)
// * every response from the server must match the stream_id of the request it is responding to (required by the protocol)
// + events are not responses and so dont follow this invariant
//
// Implementation:
// To process a message we need to receive things from two different sources:
// 1. the response from the cassandra server
// 2. the oneshot::Sender and original message from the tx_process task
//
// We can receive these in any order.
// In order to handle that we have two seperate maps.
//
// We store the sender + original message here if we receive from the tx_process task first
let mut from_tx_process: HashMap<i16, (oneshot::Sender<Response>, Message)> = HashMap::new();

// We store the response message here if we receive from the server first.
let mut from_server: HashMap<i16, Message> = HashMap::new();

loop {
tokio::select! {
Expand All @@ -208,9 +224,9 @@ async fn rx_process_fallible<T: AsyncRead>(
pushed_messages_tx.send(vec![m]).unwrap();
}
} else if let Some(stream_id) = m.stream_id() {
match return_channel_map.remove(&stream_id) {
match from_tx_process.remove(&stream_id) {
None => {
return_message_map.insert(stream_id, m);
from_server.insert(stream_id, m);
},
Some((return_tx, original)) => {
return_tx.send(Response { original, response: Ok(m) })
Expand All @@ -232,9 +248,9 @@ async fn rx_process_fallible<T: AsyncRead>(
},
original_request = return_rx.recv() => {
if let Some(Request { message, return_chan, message_id }) = original_request {
match return_message_map.remove(&message_id) {
match from_server.remove(&message_id) {
None => {
return_channel_map.insert(message_id, (return_chan, message));
from_tx_process.insert(message_id, (return_chan, message));
}
Some(m) => {
return_chan.send(Response { original: message, response: Ok(m) })
Expand Down

0 comments on commit b6edc8f

Please sign in to comment.