diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index 3df4d4f90..0bbfae9f1 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -6,6 +6,7 @@ use anyhow::{anyhow, Context, Result}; use futures::future::join_all; use futures::{SinkExt, StreamExt}; use metrics::{register_gauge, Gauge}; +use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; @@ -445,17 +446,12 @@ impl Handler { pub async fn run( &mut self, stream: TcpStream, - mut pushed_messages_rx: UnboundedReceiver, + pushed_messages_rx: UnboundedReceiver, ) -> Result<()> { - debug!("Handler run() started"); - // As long as the shutdown signal has not been received, try to read a - // new request frame. - let mut idle_time_seconds: u64 = 1; - let (terminate_tx, terminate_rx) = watch::channel::<()>(()); self.terminate_tasks = Some(terminate_tx); - let (in_tx, mut in_rx) = mpsc::unbounded_channel::(); + let (in_tx, in_rx) = mpsc::unbounded_channel::(); let (out_tx, out_rx) = mpsc::unbounded_channel::(); let local_addr = stream.local_addr()?; @@ -485,6 +481,43 @@ impl Handler { ); }; + let result = self + .process_messages(local_addr, in_rx, out_tx, pushed_messages_rx) + .await; + + // Flush messages regardless of if we are shutting down due to a failure or due to application shutdown + match self + .chain + .process_request( + Wrapper::flush_with_chain_name(self.chain.name.clone()), + self.client_details.clone(), + ) + .await + { + Ok(_) => {} + Err(e) => error!( + "{:?}", + e.context(format!( + "encountered an error when flushing the chain {} for shutdown", + self.chain.name, + )) + ), + } + + result + } + + async fn process_messages( + &mut self, + local_addr: SocketAddr, + mut in_rx: mpsc::UnboundedReceiver, + out_tx: mpsc::UnboundedSender, + mut pushed_messages_rx: UnboundedReceiver, + ) -> Result<()> { + // As long as the shutdown signal has not been received, try to read a + // new request frame. + let mut idle_time_seconds: u64 = 1; + while !self.shutdown.is_shutdown() { // While reading a request frame, also listen for the shutdown signal debug!("Waiting for message"); @@ -549,24 +582,6 @@ impl Handler { out_tx.send(modified_messages)?; } - match self - .chain - .process_request( - Wrapper::flush_with_chain_name(self.chain.name.clone()), - self.client_details.clone(), - ) - .await - { - Ok(_) => {} - Err(e) => error!( - "{:?}", - e.context(format!( - "encountered an error when flushing the chain {} for shutdown", - self.chain.name, - )) - ), - } - Ok(()) } } diff --git a/shotover-proxy/tests/runner/observability_int_tests.rs b/shotover-proxy/tests/runner/observability_int_tests.rs index b1cbb9bfd..41bae6673 100644 --- a/shotover-proxy/tests/runner/observability_int_tests.rs +++ b/shotover-proxy/tests/runner/observability_int_tests.rs @@ -81,8 +81,17 @@ async fn test_metrics() { query_count{name="redis-chain"} shotover_available_connections{source="RedisSource"} shotover_chain_failures{chain="redis_chain"} +shotover_chain_latency_count{chain="redis_chain",client_details="127.0.0.1"} shotover_chain_latency_count{chain="redis_chain"} +shotover_chain_latency_sum{chain="redis_chain",client_details="127.0.0.1"} shotover_chain_latency_sum{chain="redis_chain"} +shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0"} +shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.5"} +shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.9"} +shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.95"} +shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.99"} +shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.999"} +shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="1"} shotover_chain_latency{chain="redis_chain",quantile="0"} shotover_chain_latency{chain="redis_chain",quantile="0.5"} shotover_chain_latency{chain="redis_chain",quantile="0.9"}