Skip to content

Commit

Permalink
Fix the issue where, when restarting a node with a specified LeaderId…
Browse files Browse the repository at this point in the history
…, it attempts to forcefully set itself as the Leader (even though there is already an existing Leader in the cluster), instead of simply obtaining Leader information from other nodes.
  • Loading branch information
rmqtt committed Oct 23, 2023
1 parent e000efe commit 443bc16
Showing 1 changed file with 27 additions and 22 deletions.
49 changes: 27 additions & 22 deletions rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,7 @@ use std::time::Duration;
use config::PluginConfig;
use handler::HookHandler;
use retainer::ClusterRetainer;
use rmqtt::{
ahash, anyhow,
async_trait::async_trait,
log, rand,
serde_json::{self, json},
tokio,
};
use rmqtt::{ahash, anyhow, async_trait::async_trait, log, NodeId, rand, serde_json::{self, json}, tokio};
use rmqtt::{
broker::{
error::MqttError,
Expand Down Expand Up @@ -162,22 +156,15 @@ impl ClusterPlugin {
Some(leader_info) => {
log::info!("Specify a leader: {:?}", leader_info);
if id == leader_info.id {
//This node is the leader.
None
//First, check if the Leader exists.
let actual_leader_info = find_actual_leader(&raft, peer_addrs, 3).await?;
if actual_leader_info.is_some() {
log::info!("Leader already exists, {:?}", actual_leader_info);
}
actual_leader_info
} else {
//The other nodes are leader.
let mut actual_leader_info = None;
for i in 0..30 {
actual_leader_info = raft
.find_leader_info(peer_addrs.clone())
.await
.map_err(|e| MqttError::StdError(Box::new(e)))?;
if actual_leader_info.is_some() {
break;
}
log::info!("Leader not found, rounds: {}", i);
sleep(Duration::from_millis(500)).await;
}
let actual_leader_info = find_actual_leader(&raft, peer_addrs, 60).await?;
let (actual_leader_id, actual_leader_addr) =
actual_leader_info.ok_or_else(|| MqttError::from("Leader does not exist"))?;
if actual_leader_id != leader_info.id {
Expand Down Expand Up @@ -262,7 +249,7 @@ impl Plugin for ClusterPlugin {

let raft_mailbox = Self::start_raft(self.cfg.clone(), self.router).await?;

for i in 0..30 {
for i in 0..60 {
match raft_mailbox.status().await {
Ok(status) => {
if status.is_started() {
Expand Down Expand Up @@ -387,6 +374,24 @@ async fn parse_addr(addr: &str) -> Result<SocketAddr> {
Err(MqttError::from(format!("Parsing address{:?} error", addr)))
}

async fn find_actual_leader(
raft: &Raft<&ClusterRouter>,
peer_addrs: Vec<String>,
rounds: usize,
) -> Result<Option<(NodeId, String)>> {
let mut actual_leader_info = None;
for i in 0..rounds {
actual_leader_info =
raft.find_leader_info(peer_addrs.clone()).await.map_err(|e| MqttError::StdError(Box::new(e)))?;
if actual_leader_info.is_some() {
break;
}
log::info!("Leader not found, rounds: {}", i);
sleep(Duration::from_millis(500)).await;
}
Ok(actual_leader_info)
}

pub(crate) struct MessageSender {
client: NodeGrpcClient,
msg_type: MessageType,
Expand Down

0 comments on commit 443bc16

Please sign in to comment.