Skip to content

Commit

Permalink
Limit the number of concurrent Kick operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Oct 24, 2023
1 parent 138de7d commit 0d3a3d3
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions rmqtt-plugins/rmqtt-cluster-raft/src/shared.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use std::collections::HashSet;
use std::time::Duration;

use futures::future::FutureExt;
use once_cell::sync::OnceCell;

use rmqtt::anyhow::Error;
use rmqtt::broker::Router;
use rmqtt::grpc::MessageBroadcaster;
use rmqtt::serde_json::json;
use rmqtt::{anyhow, async_trait::async_trait, futures, log, once_cell, serde_json};
use rmqtt::{
anyhow, anyhow::Error, async_trait::async_trait, futures, futures::future::FutureExt, log,
once_cell::sync::OnceCell, rust_box::task_exec_queue::SpawnExt, serde_json, serde_json::json,
};
use rmqtt::{
broker::{
default::DefaultShared,
Expand All @@ -18,8 +14,9 @@ use rmqtt::{
SubRelationsMap, SubsSearchParams, SubsSearchResult, Subscribe, SubscribeReturn,
SubscriptionSize, To, Tx, Unsubscribe,
},
Entry, Shared,
Entry, Router, Shared,
},
grpc::MessageBroadcaster,
grpc::{Message, MessageReply, MessageType},
MqttError, Result, Runtime,
};
Expand All @@ -28,7 +25,7 @@ use super::message::{
get_client_node_id, Message as RaftMessage, MessageReply as RaftMessageReply, RaftGrpcMessage,
RaftGrpcMessageReply,
};
use super::{ClusterRouter, GrpcClients, HashMap, MessageSender, NodeGrpcClient};
use super::{task_exec_queue, ClusterRouter, GrpcClients, HashMap, MessageSender, NodeGrpcClient};

pub struct ClusterLockEntry {
inner: Box<dyn Entry>,
Expand Down Expand Up @@ -148,38 +145,48 @@ impl Entry for ClusterLockEntry {
} else {
//kicked from other node
if let Some(client) = self.cluster_shared.grpc_client(prev_node_id) {
let mut msg_sender = MessageSender {
client,
msg_type: self.cluster_shared.message_type,
msg: Message::Kick(id.clone(), clean_start, true, is_admin), //clear_subscriptions
max_retries: 0,
retry_interval: Duration::from_millis(500),
};
match msg_sender.send().await {
Ok(reply) => {
if let MessageReply::Kick(Some(kicked)) = reply {
log::debug!("{:?} kicked: {:?}", id, kicked);
Ok(Some(kicked))
} else {
log::info!(
"{:?} Message::Kick from other node, prev_node_id: {:?}, reply: {:?}",
let message_type = self.cluster_shared.message_type;
let kick_fut = async move {
let mut msg_sender = MessageSender {
client,
msg_type: message_type,
msg: Message::Kick(id.clone(), clean_start, true, is_admin), //clear_subscriptions
max_retries: 0,
retry_interval: Duration::from_millis(500),
};
match msg_sender.send().await {
Ok(reply) => {
if let MessageReply::Kick(Some(kicked)) = reply {
log::debug!("{:?} kicked: {:?}", id, kicked);
Some(kicked)
} else {
log::info!(
"{:?} Message::Kick from other node, prev_node_id: {:?}, reply: {:?}",
id,
prev_node_id,
reply
);
None
}
}
Err(e) => {
log::error!(
"{:?} Message::Kick from other node, prev_node_id: {:?}, error: {:?}",
id,
prev_node_id,
reply
e
);
Ok(None)
None
}
}
Err(e) => {
log::error!(
"{:?} Message::Kick from other node, prev_node_id: {:?}, error: {:?}",
id,
prev_node_id,
e
);
Ok(None)
}
}
};

let reply = kick_fut
.spawn(task_exec_queue())
.result()
.await
.map_err(|e| MqttError::from(e.to_string()))?;
Ok(reply)
} else {
return Err(MqttError::Msg(format!(
"kick error, grpc_client is not exist, prev_node_id: {:?}",
Expand Down

0 comments on commit 0d3a3d3

Please sign in to comment.