Skip to content

Commit

Permalink
Verify the listening address before starting the Raft service
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jun 21, 2023
1 parent 5f93b3c commit 10c8d95
Showing 1 changed file with 48 additions and 63 deletions.
111 changes: 48 additions & 63 deletions rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
#[macro_use]
extern crate serde;

use rmqtt_raft::{Mailbox, Raft};
use std::convert::From as _f;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;

use rmqtt_raft::{Mailbox, Raft};

use config::PluginConfig;
use handler::HookHandler;
use retainer::ClusterRetainer;
use rmqtt::{
ahash, anyhow,
ahash,
async_trait::async_trait,
log,
log, rand,
serde_json::{self, json},
tokio, RwLock,
};
Expand Down Expand Up @@ -129,30 +129,32 @@ impl ClusterPlugin {
}

//raft init ...
async fn start_raft(cfg: Arc<RwLock<PluginConfig>>, router: &'static ClusterRouter) -> Mailbox {
async fn start_raft(cfg: Arc<RwLock<PluginConfig>>, router: &'static ClusterRouter) -> Result<Mailbox> {
let raft_peer_addrs = cfg.read().raft_peer_addrs.clone();

let id = Runtime::instance().node.id();
let raft_laddr = raft_peer_addrs
.iter()
.find(|peer| peer.id == id)
.map(|peer| peer.addr.to_string())
.expect("raft listening address does not exist");
.ok_or(MqttError::from("raft listening address does not exist"))?;
let logger = Runtime::instance().logger.clone();
log::info!("raft_laddr: {:?}", raft_laddr);

//verify the listening address
parse_addr(&raft_laddr).await?;

let raft = Raft::new(raft_laddr, router, logger, cfg.read().raft.to_raft_config());
let mailbox = raft.mailbox();

let peer_addrs = raft_peer_addrs
.iter()
.filter_map(|peer| if peer.id != id { Some(peer.addr.to_string()) } else { None })
.collect();

.collect::<Vec<String>>();
log::info!("peer_addrs: {:?}", peer_addrs);

let leader_info =
raft.find_leader_info(peer_addrs).await.map_err(|e| MqttError::Error(Box::new(e))).unwrap();
log::info!("leader_info: {:?}", leader_info);
raft.find_leader_info(peer_addrs).await.map_err(|e| MqttError::Error(Box::new(e)))?;

let _child = std::thread::Builder::new().name("cluster-raft".to_string()).spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
Expand All @@ -165,28 +167,34 @@ impl ClusterPlugin {

let runner = async move {
let id = Runtime::instance().node.id();

log::info!("leader_info: {:?}", leader_info);
let raft_handle = match leader_info {
Some((leader_id, leader_addr)) => {
log::info!(
"running in follower mode, leader_id: {}, leader_addr: {}",
leader_id,
leader_addr
);
tokio::spawn(raft.join(id, Some(leader_id), leader_addr))
tokio::spawn(raft.join(id, Some(leader_id), leader_addr)).await
}
None => {
log::info!("running in leader mode");
tokio::spawn(raft.lead(id))
tokio::spawn(raft.lead(id)).await
}
};
raft_handle.await.unwrap().unwrap();

if let Err(_) | Ok(Err(_)) = raft_handle {
log::error!("Raft service startup failed, {:?}", raft_handle);
tokio::time::sleep(Duration::from_millis(500)).await;
std::process::exit(-1);
}
};

rt.block_on(runner);
log::info!("exit cluster raft worker.");
});

mailbox
log::info!("exit cluster raft worker");
})?;
Ok(mailbox)
}

#[inline]
Expand All @@ -211,9 +219,9 @@ impl Plugin for ClusterPlugin {
async fn init(&mut self) -> Result<()> {
log::info!("{} init", self.name);

let raft_mailbox = Self::start_raft(self.cfg.clone(), self.router).await;
let raft_mailbox = Self::start_raft(self.cfg.clone(), self.router).await?;

for _ in 0..20 {
for _ in 0..30 {
match raft_mailbox.status().await {
Ok(status) => {
if status.is_started() {
Expand All @@ -225,7 +233,7 @@ impl Plugin for ClusterPlugin {
log::info!("{} init error, {:?}", self.name, e);
}
}
sleep(Duration::from_millis(300)).await;
sleep(Duration::from_millis(500)).await;
}

self.raft_mailbox.replace(raft_mailbox.clone());
Expand All @@ -251,19 +259,9 @@ impl Plugin for ClusterPlugin {
#[inline]
async fn start(&mut self) -> Result<()> {
log::info!("{} start", self.name);
let raft_mailbox = self.raft_mailbox();
*self.runtime.extends.router_mut().await = Box::new(self.router);
*self.runtime.extends.shared_mut().await = Box::new(self.shared);
self.register.start().await;

for i in 0..10 {
let status = raft_mailbox.status().await.map_err(anyhow::Error::new)?;
log::info!("raft status({}): {:?}", i, status);
if status.is_started() {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
Ok(())
}

Expand Down Expand Up @@ -321,6 +319,26 @@ impl Plugin for ClusterPlugin {
}
}

async fn parse_addr(addr: &str) -> Result<SocketAddr> {
for i in 0..10 {
match addr.to_socket_addrs() {
Ok(mut to_socket_addrs) => {
if let Some(a) = to_socket_addrs.next() {
log::info!("Round: {}, parse_addr, addr is {:?}", i, a);
return Ok(a);
} else {
log::warn!("Round: {}, parse_addr, next is None", i);
}
}
Err(e) => {
log::warn!("Round: {}, {:?}", i, e);
}
}
tokio::time::sleep(Duration::from_millis((rand::random::<u64>() % 300) + 500)).await;
}
return Err(MqttError::from("Parsing address error"));
}

pub(crate) struct MessageSender {
client: NodeGrpcClient,
msg_type: MessageType,
Expand Down Expand Up @@ -352,39 +370,6 @@ impl MessageSender {
}
}

//pub struct MessageBroadcaster {
// grpc_clients: GrpcClients,
// msg_type: MessageType,
// msg: Option<Message>,
//}
//
//impl MessageBroadcaster {
// pub fn new(grpc_clients: GrpcClients, msg_type: MessageType, msg: Message) -> Self {
// Self { grpc_clients, msg_type, msg: Some(msg) }
// }
//
// #[inline]
// pub async fn join_all(&mut self) -> Vec<Result<MessageReply>> {
// let msg_type = self.msg_type;
// let msg = self.msg.take().unwrap();
// let mut senders = Vec::new();
// let max_idx = self.grpc_clients.len() - 1;
// for (i, (_, (_, grpc_client))) in self.grpc_clients.iter().enumerate() {
// let grpc_client = grpc_client.clone();
// if i == max_idx {
// let fut = async move { grpc_client.send_message(msg_type, msg).await };
// senders.push(fut.boxed());
// break;
// } else {
// let msg = msg.clone();
// let fut = async move { grpc_client.send_message(msg_type, msg).await };
// senders.push(fut.boxed());
// }
// }
// futures::future::join_all(senders).await
// }
//}

#[inline]
pub(crate) async fn hook_message_dropped(droppeds: Vec<(To, From, Publish, Reason)>) {
for (to, from, publish, reason) in droppeds {
Expand Down

0 comments on commit 10c8d95

Please sign in to comment.