From 02db3cf4e824f6c1c5c93e70fc889a3642701bfa Mon Sep 17 00:00:00 2001 From: Adam Kloboucnik Date: Thu, 6 Oct 2022 10:56:25 +0200 Subject: [PATCH 1/3] Respond to server's heartbeat --- src/client/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/client/mod.rs b/src/client/mod.rs index 93f47c3..2368375 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -25,6 +25,7 @@ use rabbitmq_stream_protocol::{ delete::Delete, delete_publisher::DeletePublisherCommand, generic::GenericResponse, + heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::{OpenCommand, OpenResponse}, peer_properties::{PeerPropertiesCommand, PeerPropertiesResponse}, @@ -77,6 +78,7 @@ impl MessageHandler for Client { match &item { Some(Ok(response)) => match response.kind_ref() { ResponseKind::Tunes(tune) => self.handle_tune_command(tune).await, + ResponseKind::Heartbeat(_) => self.handle_heart_beat_command().await, _ => { if let Some(handler) = self.state.read().await.handler.as_ref() { let handler = handler.clone(); @@ -484,4 +486,9 @@ impl Client { self.tune_notifier.notify_one(); } + + async fn handle_heart_beat_command(&self) { + trace!("handling heartbeat"); + let _ = self.channel.send(HeartBeatCommand::default().into()).await; + } } From fb44cccc666eaa878474614ceb00ea01743232fb Mon Sep 17 00:00:00 2001 From: Adam Kloboucnik Date: Thu, 6 Oct 2022 11:38:57 +0200 Subject: [PATCH 2/3] Start sending heartbeat from client after tune command received. --- src/client/mod.rs | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 2368375..27fb93b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -55,10 +55,11 @@ use self::{ use std::{ collections::HashMap, sync::{atomic::AtomicU64, Arc}, + time::Duration, }; use std::{future::Future, sync::atomic::Ordering}; use tokio::sync::RwLock; -use tokio::{net::TcpStream, sync::Notify}; +use tokio::{net::TcpStream, sync::Notify, task::JoinHandle}; use tokio_util::codec::Framed; type SinkConnection = SplitSink, Request>; @@ -70,6 +71,7 @@ pub struct ClientState { handler: Option>, heartbeat: u32, max_frame_size: u32, + heartbeat_task: Option>>, } #[async_trait::async_trait] @@ -134,6 +136,7 @@ impl Client { handler: None, heartbeat: broker.heartbeat, max_frame_size: broker.max_frame_size, + heartbeat_task: None, }; let mut client = Client { dispatcher, @@ -166,6 +169,12 @@ impl Client { } pub async fn close(&self) -> RabbitMQStreamResult<()> { + let mut state = self.state.write().await; + if let Some(ref heartbeat_task) = state.heartbeat_task { + heartbeat_task.abort(); + state.heartbeat_task = None + } + if self.channel.is_closed() { return Err(ClientError::AlreadyClosed); } @@ -472,18 +481,37 @@ impl Client { async fn handle_tune_command(&self, tunes: &TunesCommand) { let mut state = self.state.write().await; + let old_heartbeat = state.heartbeat; state.heartbeat = self.max_value(self.opts.heartbeat, tunes.heartbeat); state.max_frame_size = self.max_value(self.opts.max_frame_size, tunes.max_frame_size); let heart_beat = state.heartbeat; let max_frame_size = state.max_frame_size; - drop(state); let _ = self .channel .send(TunesCommand::new(max_frame_size, heart_beat).into()) .await; + // if hearbeat interval changed, abort the old heartbeat task and start a new one + if old_heartbeat != heart_beat { + if let Some(ref heartbeat_task) = state.heartbeat_task { + heartbeat_task.abort(); + state.heartbeat_task = None + } + let c = self.channel.clone(); + let heartbeat_interval = (heart_beat / 2).max(1); + let handle = tokio::task::spawn(async move { + loop { + trace!("sending heartbeat"); + let _ = c.send(HeartBeatCommand::default().into()).await; + tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await; + } + }); + state.heartbeat_task = Some(Arc::new(handle)); + } + drop(state); + self.tune_notifier.notify_one(); } From 3a7499d6f15433ade55c3d26b32d10dce8f074c8 Mon Sep 17 00:00:00 2001 From: Adam Kloboucnik Date: Tue, 11 Oct 2022 14:02:29 +0200 Subject: [PATCH 3/3] Only start heartbeat task if heart_beat is not disabled (!= 0) --- src/client/mod.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 27fb93b..7b8755c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -499,16 +499,18 @@ impl Client { heartbeat_task.abort(); state.heartbeat_task = None } - let c = self.channel.clone(); - let heartbeat_interval = (heart_beat / 2).max(1); - let handle = tokio::task::spawn(async move { - loop { - trace!("sending heartbeat"); - let _ = c.send(HeartBeatCommand::default().into()).await; - tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await; - } - }); - state.heartbeat_task = Some(Arc::new(handle)); + if heart_beat != 0 { + let c = self.channel.clone(); + let heartbeat_interval = (heart_beat / 2).max(1); + let handle = tokio::task::spawn(async move { + loop { + trace!("sending heartbeat"); + let _ = c.send(HeartBeatCommand::default().into()).await; + tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await; + } + }); + state.heartbeat_task = Some(Arc::new(handle)); + } } drop(state);