Skip to content

Commit

Permalink
Log cassandra connection errors (#765)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 29, 2022
1 parent 4104d95 commit 9ac2931
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions shotover-proxy/src/transforms/cassandra/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::codec::{FramedRead, FramedWrite};
use tracing::{info, Instrument};
use tracing::{error, Instrument};

/// Represents a `Request` to a `CassandraConnection`
#[derive(Debug)]
Expand Down Expand Up @@ -85,6 +85,17 @@ async fn tx_process<T: AsyncWrite>(
out_rx: mpsc::UnboundedReceiver<Request>,
return_tx: mpsc::UnboundedSender<Request>,
codec: CassandraCodec,
) {
if let Err(err) = tx_process_fallible(write, out_rx, return_tx, codec).await {
error!("{:?}", err.context("tx_process task terminated"));
}
}

async fn tx_process_fallible<T: AsyncWrite>(
write: WriteHalf<T>,
out_rx: mpsc::UnboundedReceiver<Request>,
return_tx: mpsc::UnboundedSender<Request>,
codec: CassandraCodec,
) -> Result<()> {
let in_w = FramedWrite::new(write, codec);
let rx_stream = UnboundedReceiverStream::new(out_rx).map(|x| {
Expand All @@ -97,22 +108,33 @@ async fn tx_process<T: AsyncWrite>(
}

async fn rx_process<T: AsyncRead>(
read: ReadHalf<T>,
return_rx: mpsc::UnboundedReceiver<Request>,
codec: CassandraCodec,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
) {
if let Err(err) = rx_process_fallible(read, return_rx, codec, pushed_messages_tx).await {
error!("{:?}", err.context("rx_process task terminated"));
}
}

async fn rx_process_fallible<T: AsyncRead>(
read: ReadHalf<T>,
mut return_rx: mpsc::UnboundedReceiver<Request>,
codec: CassandraCodec,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
) -> Result<()> {
let mut in_r = FramedRead::new(read, codec);
let mut reader = FramedRead::new(read, codec);
let mut return_channel_map: HashMap<i16, (oneshot::Sender<Response>, Message)> = HashMap::new();

let mut return_message_map: HashMap<i16, Message> = HashMap::new();

loop {
tokio::select! {
Some(maybe_req) = in_r.next() => {
match maybe_req {
Ok(req) => {
for m in req {
Some(response) = reader.next() => {
match response {
Ok(response) => {
for m in response {
if let Ok(Opcode::Event) = cassandra::raw_frame::get_opcode(m.as_raw_bytes().unwrap()) {
if let Some(ref pushed_messages_tx) = pushed_messages_tx {
pushed_messages_tx.send(vec![m]).unwrap();
Expand All @@ -123,21 +145,20 @@ async fn rx_process<T: AsyncRead>(
return_message_map.insert(stream_id, m);
},
Some((return_tx, original)) => {
return_tx.send(Response {original, response: Ok(m) })
return_tx.send(Response { original, response: Ok(m) })
.map_err(|_| anyhow!("couldn't send message"))?;
}
}
}
}
}
Err(e) => {
info!("Couldn't decode message from upstream host {:?}", e);
return Err(anyhow!("Couldn't decode message from upstream host {:?}", e));
return Err(e.context("Encountered error while communicating with destination cassandra node"));
}
}
},
Some(original_request) = return_rx.recv() => {
let Request { message, return_chan, message_id} = original_request;
let Request { message, return_chan, message_id } = original_request;
match return_message_map.remove(&message_id) {
None => {
return_channel_map.insert(message_id, (return_chan, message));
Expand All @@ -149,11 +170,10 @@ async fn rx_process<T: AsyncRead>(
};
},
else => {
break
return Ok(())
}
}
}
Ok(())
}

pub async fn receive(
Expand Down

0 comments on commit 9ac2931

Please sign in to comment.