Skip to content

Commit

Permalink
Ensure we always flush regardless of result of process_message
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Dec 8, 2022
1 parent 1af4c96 commit 8ac2736
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
65 changes: 40 additions & 25 deletions shotover-proxy/src/server.rs
Expand Up @@ -6,6 +6,7 @@ use anyhow::{anyhow, Context, Result};
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
use metrics::{register_gauge, Gauge};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
Expand Down Expand Up @@ -445,17 +446,12 @@ impl<C: Codec + 'static> Handler<C> {
pub async fn run(
&mut self,
stream: TcpStream,
mut pushed_messages_rx: UnboundedReceiver<Messages>,
pushed_messages_rx: UnboundedReceiver<Messages>,
) -> Result<()> {
debug!("Handler run() started");
// As long as the shutdown signal has not been received, try to read a
// new request frame.
let mut idle_time_seconds: u64 = 1;

let (terminate_tx, terminate_rx) = watch::channel::<()>(());
self.terminate_tasks = Some(terminate_tx);

let (in_tx, mut in_rx) = mpsc::unbounded_channel::<Messages>();
let (in_tx, in_rx) = mpsc::unbounded_channel::<Messages>();
let (out_tx, out_rx) = mpsc::unbounded_channel::<Messages>();

let local_addr = stream.local_addr()?;
Expand Down Expand Up @@ -485,6 +481,43 @@ impl<C: Codec + 'static> Handler<C> {
);
};

let result = self
.process_messages(local_addr, in_rx, out_tx, pushed_messages_rx)
.await;

// Flush messages regardless of if we are shutting down due to a failure or due to application shutdown
match self
.chain
.process_request(
Wrapper::flush_with_chain_name(self.chain.name.clone()),
self.client_details.clone(),
)
.await
{
Ok(_) => {}
Err(e) => error!(
"{:?}",
e.context(format!(
"encountered an error when flushing the chain {} for shutdown",
self.chain.name,
))
),
}

result
}

async fn process_messages(
&mut self,
local_addr: SocketAddr,
mut in_rx: mpsc::UnboundedReceiver<Messages>,
out_tx: mpsc::UnboundedSender<Messages>,
mut pushed_messages_rx: UnboundedReceiver<Messages>,
) -> Result<()> {
// As long as the shutdown signal has not been received, try to read a
// new request frame.
let mut idle_time_seconds: u64 = 1;

while !self.shutdown.is_shutdown() {
// While reading a request frame, also listen for the shutdown signal
debug!("Waiting for message");
Expand Down Expand Up @@ -549,24 +582,6 @@ impl<C: Codec + 'static> Handler<C> {
out_tx.send(modified_messages)?;
}

match self
.chain
.process_request(
Wrapper::flush_with_chain_name(self.chain.name.clone()),
self.client_details.clone(),
)
.await
{
Ok(_) => {}
Err(e) => error!(
"{:?}",
e.context(format!(
"encountered an error when flushing the chain {} for shutdown",
self.chain.name,
))
),
}

Ok(())
}
}
Expand Down
9 changes: 9 additions & 0 deletions shotover-proxy/tests/runner/observability_int_tests.rs
Expand Up @@ -81,8 +81,17 @@ async fn test_metrics() {
query_count{name="redis-chain"}
shotover_available_connections{source="RedisSource"}
shotover_chain_failures{chain="redis_chain"}
shotover_chain_latency_count{chain="redis_chain",client_details="127.0.0.1"}
shotover_chain_latency_count{chain="redis_chain"}
shotover_chain_latency_sum{chain="redis_chain",client_details="127.0.0.1"}
shotover_chain_latency_sum{chain="redis_chain"}
shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0"}
shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.5"}
shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.9"}
shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.95"}
shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.99"}
shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="0.999"}
shotover_chain_latency{chain="redis_chain",client_details="127.0.0.1",quantile="1"}
shotover_chain_latency{chain="redis_chain",quantile="0"}
shotover_chain_latency{chain="redis_chain",quantile="0.5"}
shotover_chain_latency{chain="redis_chain",quantile="0.9"}
Expand Down

0 comments on commit 8ac2736

Please sign in to comment.