Skip to content

Commit

Permalink
Session transfer does not send LastWill messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jul 28, 2023
1 parent 49fde1e commit 02828fb
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 58 deletions.
16 changes: 9 additions & 7 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@ impl Handler for HookHandler {
}
return (false, acc);
}
Message::Kick(id, clear_subscriptions, is_admin) => {
Message::Kick(id, clean_start, clear_subscriptions, is_admin) => {
let entry = self.shared.inner().entry(id.clone());
log::debug!("{:?}", id);
let new_acc = match entry.try_lock().await {
Ok(mut entry) => match entry.kick(*clear_subscriptions, *is_admin).await {
Ok(o) => {
log::debug!("{:?} offline info: {:?}", id, o);
HookResult::GrpcMessageReply(Ok(MessageReply::Kick(o)))
Ok(mut entry) => {
match entry.kick(*clean_start, *clear_subscriptions, *is_admin).await {
Ok(o) => {
log::debug!("{:?} offline info: {:?}", id, o);
HookResult::GrpcMessageReply(Ok(MessageReply::Kick(o)))
}
Err(e) => HookResult::GrpcMessageReply(Err(e)),
}
Err(e) => HookResult::GrpcMessageReply(Err(e)),
},
}
Err(e) => {
log::warn!("{:?}, try_lock error, {:?}", id, e);
HookResult::GrpcMessageReply(Err(e))
Expand Down
5 changes: 3 additions & 2 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,20 @@ impl Entry for ClusterLockEntry {
#[inline]
async fn kick(
&mut self,
clean_start: bool,
clear_subscriptions: bool,
is_admin: IsAdmin,
) -> Result<Option<SessionOfflineInfo>> {
log::debug!("{:?} ClusterLockEntry kick 1 ...", self.client().map(|c| c.id.clone()));
if let Some(kicked) = self.inner.kick(clear_subscriptions, is_admin).await? {
if let Some(kicked) = self.inner.kick(clean_start, clear_subscriptions, is_admin).await? {
log::debug!("{:?} broadcast kick reply kicked: {:?}", self.id(), kicked);
return Ok(Some(kicked));
}

match kick(
self.cluster_shared.grpc_clients.clone(),
self.cluster_shared.message_type,
Message::Kick(self.id(), true, is_admin),
Message::Kick(self.id(), clean_start, true, is_admin),
)
.await
{
Expand Down
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-cluster-raft/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ impl Handler for HookHandler {
}
return (false, acc);
}
GrpcMessage::Kick(id, clear_subscriptions, is_admin) => {
GrpcMessage::Kick(id, clean_start, clear_subscriptions, is_admin) => {
let mut entry = self.shared.inner().entry(id.clone());
let new_acc = match entry.kick(*clear_subscriptions, *is_admin).await {
let new_acc = match entry.kick(*clean_start, *clear_subscriptions, *is_admin).await {
Ok(Some(o)) => HookResult::GrpcMessageReply(Ok(MessageReply::Kick(Some(o)))),
Ok(None) => HookResult::GrpcMessageReply(Ok(MessageReply::Kick(None))),
Err(e) => HookResult::GrpcMessageReply(Err(e)),
Expand Down
8 changes: 5 additions & 3 deletions rmqtt-plugins/rmqtt-cluster-raft/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@ impl Entry for ClusterLockEntry {
#[inline]
async fn kick(
&mut self,
clean_start: bool,
clear_subscriptions: bool,
is_admin: IsAdmin,
) -> Result<Option<SessionOfflineInfo>> {
log::debug!(
"{:?} ClusterLockEntry kick ..., clear_subscriptions: {}, is_admin: {}",
"{:?} ClusterLockEntry kick ..., clean_start: {}, clear_subscriptions: {}, is_admin: {}",
self.client().map(|c| c.id.clone()),
clean_start,
clear_subscriptions,
is_admin
);
Expand All @@ -141,14 +143,14 @@ impl Entry for ClusterLockEntry {
log::debug!("{:?} kick, prev_node_id: {:?}, is_admin: {}", id, self.prev_node_id, is_admin);
if prev_node_id == id.node_id {
//kicked from local
self.inner.kick(clear_subscriptions, is_admin).await
self.inner.kick(clean_start, clear_subscriptions, is_admin).await
} else {
//kicked from other node
if let Some(client) = self.cluster_shared.grpc_client(prev_node_id) {
let mut msg_sender = MessageSender {
client,
msg_type: self.cluster_shared.message_type,
msg: Message::Kick(id.clone(), true, is_admin), //clear_subscriptions
msg: Message::Kick(id.clone(), clean_start, true, is_admin), //clear_subscriptions
max_retries: 0,
retry_interval: Duration::from_millis(500),
};
Expand Down
2 changes: 1 addition & 1 deletion rmqtt-plugins/rmqtt-http-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ async fn kick_client(req: &mut Request, res: &mut Response) {
.await
.entry(Id::from(Runtime::instance().node.id(), ClientId::from(clientid)));

match entry.kick(true, true).await {
match entry.kick(true, true, true).await {
Err(e) => res.set_status_error(StatusError::service_unavailable().with_detail(e.to_string())),
Ok(None) => res.set_status_code(StatusCode::NOT_FOUND),
Ok(Some(offline_info)) => res.render(Text::Plain(offline_info.id.to_string())),
Expand Down
7 changes: 5 additions & 2 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,22 @@ impl super::Entry for LockEntry {
#[inline]
async fn kick(
&mut self,
clean_start: bool,
clear_subscriptions: bool,
is_admin: IsAdmin,
) -> Result<Option<SessionOfflineInfo>> {
log::debug!(
"{:?} LockEntry kick ..., clear_subscriptions: {}, is_admin: {}",
"{:?} LockEntry kick ..., clean_start: {}, clear_subscriptions: {}, is_admin: {}",
self.client().map(|c| c.id.clone()),
clean_start,
clear_subscriptions,
is_admin
);

if let Some(peer_tx) = self.tx().and_then(|tx| if tx.is_closed() { None } else { Some(tx) }) {
let (tx, rx) = oneshot::channel();
if let Ok(()) = peer_tx.unbounded_send(Message::Kick(tx, self.id.clone(), is_admin)) {
if let Ok(()) = peer_tx.unbounded_send(Message::Kick(tx, self.id.clone(), clean_start, is_admin))
{
match tokio::time::timeout(Duration::from_secs(5), rx).await {
Ok(Ok(())) => {
log::debug!("{:?} kicked, from {:?}", self.id, self.client().map(|c| c.id.clone()));
Expand Down
1 change: 1 addition & 0 deletions rmqtt/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub trait Entry: Sync + Send {
async fn remove_with(&mut self, id: &Id) -> Result<Option<(Session, Tx, ClientInfo)>>;
async fn kick(
&mut self,
clean_start: bool,
clear_subscriptions: bool,
is_admin: IsAdmin,
) -> Result<Option<SessionOfflineInfo>>;
Expand Down
38 changes: 28 additions & 10 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ impl SessionState {
Runtime::instance().extends.hook_mgr().await.message_dropped(Some(state.id.clone()), from, p, Reason::MessageQueueFull).await;
}
},
Message::Kick(sender, by_id, is_admin) => {
log::debug!("{:?} Message::Kick, send kick result, to {:?}, is_admin: {}", state.id, by_id, is_admin);
Message::Kick(sender, by_id, clean_start, is_admin) => {
log::debug!("{:?} Message::Kick, send kick result, to {:?}, clean_start: {}, is_admin: {}", state.id, by_id, clean_start, is_admin);
if !sender.is_closed() {
if sender.send(()).is_err() {
log::warn!("{:?} Message::Kick, send response error, sender is closed", state.id);
Expand All @@ -137,6 +137,9 @@ impl SessionState {
if is_admin {
flags.insert(StateFlags::ByAdminKick);
}
if clean_start {
flags.insert(StateFlags::CleanStart);
}
state.client.add_disconnected_reason(Reason::ConnectKicked(is_admin)).await;
break
}else{
Expand Down Expand Up @@ -216,22 +219,27 @@ impl SessionState {
}
}

let clean_session = state.clean_session().await;

log::debug!(
"{:?} exit online worker, flags: {:?}, clean_session: {}",
"{:?} exit online worker, flags: {:?}, clean_session: {} {}",
state.id,
flags,
state.clean_session().await
clean_session,
flags.contains(StateFlags::CleanStart)
);

Runtime::instance().stats.connections.dec();

//Setting the disconnected state
state.client.set_disconnected(None).await;
if !flags.contains(StateFlags::DisconnectReceived) {

if state.last_will_enable(flags, clean_session) {
if let Err(e) = state.process_last_will().await {
log::error!("{:?} process last will error, {:?}", state.id, e);
}
}

state.sink.close();

//hook, client_disconnected
Expand All @@ -247,7 +255,7 @@ impl SessionState {
if flags.contains(StateFlags::ByAdminKick) {
state.clean(state.client.take_disconnected_reason().await).await;
}
} else if state.clean_session().await {
} else if clean_session {
state.clean(state.client.take_disconnected_reason().await).await;
} else {
//Start offline event loop
Expand Down Expand Up @@ -287,19 +295,22 @@ impl SessionState {
Runtime::instance().extends.hook_mgr().await.message_dropped(Some(state.id.clone()), from, p, Reason::MessageQueueFull).await;
}
},
Message::Kick(sender, by_id, is_admin) => {
log::debug!("{:?} offline Kicked, send kick result, to: {:?}, is_admin: {}", state.id, by_id, is_admin);
Message::Kick(sender, by_id, clean_start, is_admin) => {
log::debug!("{:?} offline Kicked, send kick result, to: {:?}, clean_start: {}, is_admin: {}", state.id, by_id, clean_start, is_admin);
if !sender.is_closed() {
if let Err(e) = sender.send(()) {
log::warn!("{:?} offline Kick send response error, to: {:?}, is_admin: {}, {:?}", state.id, by_id, is_admin, e);
log::warn!("{:?} offline Kick send response error, to: {:?}, clean_start: {}, is_admin: {}, {:?}", state.id, by_id, clean_start, is_admin, e);
}
flags.insert(StateFlags::Kicked);
if is_admin {
flags.insert(StateFlags::ByAdminKick);
}
if clean_start {
flags.insert(StateFlags::CleanStart);
}
break
}else{
log::warn!("{:?} offline Kick sender is closed, to {:?}, is_admin: {}", state.id, by_id, is_admin);
log::warn!("{:?} offline Kick sender is closed, to {:?}, clean_start: {}, is_admin: {}", state.id, by_id, clean_start, is_admin);
}
},
_ => {
Expand Down Expand Up @@ -358,6 +369,13 @@ impl SessionState {
}
}

#[inline]
fn last_will_enable(&self, flags: StateFlags, clean_session: bool) -> bool {
let session_present =
flags.contains(StateFlags::Kicked) && !flags.contains(StateFlags::CleanStart) && !clean_session;
!(flags.contains(StateFlags::DisconnectReceived) || session_present)
}

#[inline]
async fn process_last_will(&self) -> Result<()> {
if let Some(lw) = self.client.last_will() {
Expand Down
6 changes: 4 additions & 2 deletions rmqtt/src/broker/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub type Timestamp = i64;
pub type IsOnline = bool;
pub type IsAdmin = bool;
pub type LimiterName = u16;
pub type CleanStart = bool;

pub type Tx = futures::channel::mpsc::UnboundedSender<Message>;
pub type Rx = futures::channel::mpsc::UnboundedReceiver<Message>;
Expand Down Expand Up @@ -1034,7 +1035,7 @@ pub struct Retain {
#[derive(Debug)]
pub enum Message {
Forward(From, Publish),
Kick(oneshot::Sender<()>, Id, IsAdmin),
Kick(oneshot::Sender<()>, Id, CleanStart, IsAdmin),
Disconnect(Disconnect),
Closed(Reason),
Keepalive,
Expand Down Expand Up @@ -1292,6 +1293,7 @@ bitflags! {
const Kicked = 0b00000001;
const ByAdminKick = 0b00000010;
const DisconnectReceived = 0b00000100;
const CleanStart = 0b00001000;
}
}

Expand Down Expand Up @@ -1434,7 +1436,7 @@ impl Display for Reason {
Reason::Error(r) => r,
Reason::ProtocolError(r) => return write!(f, "ProtocolError({})", r),
Reason::Reasons(reasons) => match reasons.len() {
0 => "Unknown",
0 => "",
1 => return write!(f, "{}", reasons.get(0).map(|r| r.to_string()).unwrap_or_default()),
_ => return write!(f, "{}", reasons.iter().map(|r| r.to_string()).join(",")),
},
Expand Down
27 changes: 14 additions & 13 deletions rmqtt/src/broker/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,20 @@ async fn _handshake<Io: 'static>(
};

// Kick out the current session, if it exists
let (session_present, offline_info) = match entry.kick(packet.clean_session, false).await {
Err(e) => {
return Ok(refused_ack(
handshake,
&connect_info,
ConnectAckReasonV3::ServiceUnavailable,
format!("{:?}", e),
)
.await);
}
Ok(Some(offline_info)) => (!packet.clean_session, Some(offline_info)),
Ok(None) => (false, None),
};
let (session_present, offline_info) =
match entry.kick(packet.clean_session, packet.clean_session, false).await {
Err(e) => {
return Ok(refused_ack(
handshake,
&connect_info,
ConnectAckReasonV3::ServiceUnavailable,
format!("{:?}", e),
)
.await);
}
Ok(Some(offline_info)) => (!packet.clean_session, Some(offline_info)),
Ok(None) => (false, None),
};

let connected_at = chrono::Local::now().timestamp_millis();
let client = ClientInfo::new(connect_info, session_present, superuser, connected_at);
Expand Down
27 changes: 14 additions & 13 deletions rmqtt/src/broker/v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,20 @@ pub async fn _handshake<Io: 'static>(
};

// Kick out the current session, if it exists
let (session_present, offline_info) = match entry.kick(packet.clean_start, false).await {
Err(e) => {
return Ok(refused_ack(
handshake,
&connect_info,
ConnectAckReasonV5::ServerUnavailable,
format!("{:?}", e),
)
.await);
}
Ok(Some(offline_info)) => (!packet.clean_start, Some(offline_info)),
Ok(None) => (false, None),
};
let (session_present, offline_info) =
match entry.kick(packet.clean_start, packet.clean_start, false).await {
Err(e) => {
return Ok(refused_ack(
handshake,
&connect_info,
ConnectAckReasonV5::ServerUnavailable,
format!("{:?}", e),
)
.await);
}
Ok(Some(offline_info)) => (!packet.clean_start, Some(offline_info)),
Ok(None) => (false, None),
};

let connected_at = chrono::Local::now().timestamp_millis();
let client = ClientInfo::new(connect_info, session_present, superuser, connected_at);
Expand Down
6 changes: 3 additions & 3 deletions rmqtt/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use client::NodeGrpcClient;

use crate::broker::session::SessionOfflineInfo;
use crate::broker::types::{
From, Id, IsAdmin, NodeId, Publish, Retain, Route, SessionStatus, SubsSearchParams, SubsSearchResult,
TopicFilter, TopicName,
CleanStart, From, Id, IsAdmin, NodeId, Publish, Retain, Route, SessionStatus, SubsSearchParams,
SubsSearchResult, TopicFilter, TopicName,
};
use crate::broker::{ClearSubscriptions, SubRelations, SubRelationsMap};
use crate::{Addr, ClientId, Result};
Expand All @@ -29,7 +29,7 @@ pub type MessageType = u64;
pub enum Message {
Forwards(From, Publish),
ForwardsTo(From, Publish, SubRelations),
Kick(Id, ClearSubscriptions, IsAdmin),
Kick(Id, CleanStart, ClearSubscriptions, IsAdmin),
GetRetains(TopicFilter),
SubscriptionsSearch(SubsSearchParams),
SubscriptionsGet(ClientId),
Expand Down

0 comments on commit 02828fb

Please sign in to comment.