diff --git a/shotover-proxy/src/transforms/chain.rs b/shotover-proxy/src/transforms/chain.rs index 2dd245e29..290564e65 100644 --- a/shotover-proxy/src/transforms/chain.rs +++ b/shotover-proxy/src/transforms/chain.rs @@ -17,6 +17,7 @@ type InnerChain = Vec; pub struct BufferedChainMessages { pub local_addr: SocketAddr, pub messages: Messages, + pub flush: bool, pub return_chan: Option>, } @@ -25,6 +26,7 @@ impl BufferedChainMessages { BufferedChainMessages { local_addr, messages: m, + flush: false, return_chan: None, } } @@ -32,11 +34,13 @@ impl BufferedChainMessages { pub fn new( m: Messages, local_addr: SocketAddr, + flush: bool, return_chan: oneshot::Sender, ) -> Self { BufferedChainMessages { local_addr, messages: m, + flush, return_chan: Some(return_chan), } } @@ -98,6 +102,7 @@ impl BufferedChain { .send(BufferedChainMessages::new( wrapper.messages, wrapper.local_addr, + wrapper.flush, one_tx, )) .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) @@ -106,7 +111,12 @@ impl BufferedChain { Some(timeout) => { self.send_handle .send_timeout( - BufferedChainMessages::new(wrapper.messages, wrapper.local_addr, one_tx), + BufferedChainMessages::new( + wrapper.messages, + wrapper.local_addr, + wrapper.flush, + one_tx, + ), Duration::from_micros(timeout), ) .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) @@ -122,27 +132,34 @@ impl BufferedChain { wrapper: Wrapper<'_>, buffer_timeout_micros: Option, ) -> Result<()> { - match buffer_timeout_micros { - None => { - self.send_handle - .send(BufferedChainMessages::new_with_no_return( - wrapper.messages, - wrapper.local_addr, - )) - .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) - .await? - } - Some(timeout) => { - self.send_handle - .send_timeout( - BufferedChainMessages::new_with_no_return( + if wrapper.flush { + // To obey flush request we need to ensure messages have completed sending before returning. + // In order to achieve that we need to use the regular process_request method. + self.process_request(wrapper, buffer_timeout_micros).await?; + } else { + // When there is no flush we can return much earlier by not waiting for a response. + match buffer_timeout_micros { + None => { + self.send_handle + .send(BufferedChainMessages::new_with_no_return( wrapper.messages, wrapper.local_addr, - ), - Duration::from_micros(timeout), - ) - .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) - .await? + )) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .await? + } + Some(timeout) => { + self.send_handle + .send_timeout( + BufferedChainMessages::new_with_no_return( + wrapper.messages, + wrapper.local_addr, + ), + Duration::from_micros(timeout), + ) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .await? + } } } Ok(()) @@ -280,6 +297,7 @@ impl TransformChainBuilder { local_addr, return_chan, messages, + flush, }) = rx.recv().await { #[cfg(test)] @@ -287,12 +305,9 @@ impl TransformChainBuilder { count_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } - let chain_response = chain - .process_request( - Wrapper::new_with_chain_name(messages, chain.name.clone(), local_addr), - chain.name.clone(), - ) - .await; + let mut wrapper = Wrapper::new_with_chain_name(messages, chain.name.clone(), local_addr); + wrapper.flush = flush; + let chain_response = chain.process_request(wrapper, chain.name.clone()).await; if let Err(e) = &chain_response { error!("Internal error in buffered chain: {e:?}"); diff --git a/shotover-proxy/src/transforms/mod.rs b/shotover-proxy/src/transforms/mod.rs index 96e47a08b..1d6d54c24 100644 --- a/shotover-proxy/src/transforms/mod.rs +++ b/shotover-proxy/src/transforms/mod.rs @@ -474,7 +474,7 @@ impl<'a> Clone for Wrapper<'a> { client_details: self.client_details.clone(), chain_name: self.chain_name.clone(), local_addr: self.local_addr, - flush: false, + flush: self.flush, } } }