Skip to content

Commit

Permalink
Ensure we always flush regardless of result of process_message
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Dec 8, 2022
1 parent b8ac62e commit 1e21263
Showing 1 changed file with 40 additions and 25 deletions.
65 changes: 40 additions & 25 deletions shotover-proxy/src/server.rs
Expand Up @@ -6,6 +6,7 @@ use anyhow::{anyhow, Context, Result};
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
use metrics::{register_gauge, Gauge};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
Expand Down Expand Up @@ -445,17 +446,12 @@ impl<C: Codec + 'static> Handler<C> {
pub async fn run(
&mut self,
stream: TcpStream,
mut pushed_messages_rx: UnboundedReceiver<Messages>,
pushed_messages_rx: UnboundedReceiver<Messages>,
) -> Result<()> {
debug!("Handler run() started");
// As long as the shutdown signal has not been received, try to read a
// new request frame.
let mut idle_time_seconds: u64 = 1;

let (terminate_tx, terminate_rx) = watch::channel::<()>(());
self.terminate_tasks = Some(terminate_tx);

let (in_tx, mut in_rx) = mpsc::unbounded_channel::<Messages>();
let (in_tx, in_rx) = mpsc::unbounded_channel::<Messages>();
let (out_tx, out_rx) = mpsc::unbounded_channel::<Messages>();

let local_addr = stream.local_addr()?;
Expand Down Expand Up @@ -485,6 +481,43 @@ impl<C: Codec + 'static> Handler<C> {
);
};

let result = self
.process_messages(local_addr, in_rx, out_tx, pushed_messages_rx)
.await;

// Flush messages regardless of if we are shutting down due to a failure or due to application shutdown
match self
.chain
.process_request(
Wrapper::flush_with_chain_name(self.chain.name.clone()),
self.client_details.clone(),
)
.await
{
Ok(_) => {}
Err(e) => error!(
"{:?}",
e.context(format!(
"encountered an error when flushing the chain {} for shutdown",
self.chain.name,
))
),
}

result
}

async fn process_messages(
&mut self,
local_addr: SocketAddr,
mut in_rx: mpsc::UnboundedReceiver<Messages>,
out_tx: mpsc::UnboundedSender<Messages>,
mut pushed_messages_rx: UnboundedReceiver<Messages>,
) -> Result<()> {
// As long as the shutdown signal has not been received, try to read a
// new request frame.
let mut idle_time_seconds: u64 = 1;

while !self.shutdown.is_shutdown() {
// While reading a request frame, also listen for the shutdown signal
debug!("Waiting for message");
Expand Down Expand Up @@ -549,24 +582,6 @@ impl<C: Codec + 'static> Handler<C> {
out_tx.send(modified_messages)?;
}

match self
.chain
.process_request(
Wrapper::flush_with_chain_name(self.chain.name.clone()),
self.client_details.clone(),
)
.await
{
Ok(_) => {}
Err(e) => error!(
"{:?}",
e.context(format!(
"encountered an error when flushing the chain {} for shutdown",
self.chain.name,
))
),
}

Ok(())
}
}
Expand Down

0 comments on commit 1e21263

Please sign in to comment.