diff --git a/rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs b/rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs index 5dbdf5a5..33a2ce6e 100644 --- a/rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs +++ b/rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs @@ -12,7 +12,7 @@ use config::PluginConfig; use handler::HookHandler; use retainer::ClusterRetainer; use rmqtt::{ - ahash, + ahash, anyhow, async_trait::async_trait, log, rand, serde_json::{self, json}, @@ -44,7 +44,6 @@ mod router; mod shared; type HashMap = std::collections::HashMap; -// pub(crate) type GrpcClients = Arc>; #[inline] pub async fn register( @@ -144,7 +143,8 @@ impl ClusterPlugin { //verify the listening address parse_addr(&raft_laddr).await?; - let raft = Raft::new(raft_laddr, router, logger, cfg.read().raft.to_raft_config()); + let raft = Raft::new(raft_laddr, router, logger, cfg.read().raft.to_raft_config()) + .map_err(|e| MqttError::Error(Box::new(e)))?; let mailbox = raft.mailbox(); let mut peer_addrs = Vec::new(); @@ -158,6 +158,7 @@ impl ClusterPlugin { let leader_info = raft.find_leader_info(peer_addrs).await.map_err(|e| MqttError::Error(Box::new(e)))?; + // let (status_tx, status_rx) = futures::channel::oneshot::channel::>(); let _child = std::thread::Builder::new().name("cluster-raft".to_string()).spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -169,7 +170,6 @@ impl ClusterPlugin { let runner = async move { let id = Runtime::instance().node.id(); - log::info!("leader_info: {:?}", leader_info); let raft_handle = match leader_info { Some((leader_id, leader_addr)) => { @@ -223,13 +223,13 @@ impl Plugin for ClusterPlugin { let raft_mailbox = Self::start_raft(self.cfg.clone(), self.router).await?; - for _ in 0..30 { + for i in 0..30 { match raft_mailbox.status().await { Ok(status) => { if status.is_started() { break; } - log::info!("{} Initializing cluster", self.name); + log::info!("{} Initializing cluster, raft status({}): {:?}", self.name, i, status); } Err(e) => { log::info!("{} init error, {:?}", self.name, e); @@ -261,10 +261,17 @@ impl Plugin for ClusterPlugin { #[inline] async fn start(&mut self) -> Result<()> { log::info!("{} start", self.name); + let raft_mailbox = self.raft_mailbox(); *self.runtime.extends.router_mut().await = Box::new(self.router); *self.runtime.extends.shared_mut().await = Box::new(self.shared); self.register.start().await; - Ok(()) + let status = raft_mailbox.status().await.map_err(anyhow::Error::new)?; + log::info!("raft status: {:?}", status); + if status.is_started() { + Ok(()) + } else { + Err(MqttError::from("Raft cluster status is abnormal")) + } } #[inline] @@ -326,10 +333,10 @@ async fn parse_addr(addr: &str) -> Result { match addr.to_socket_addrs() { Ok(mut to_socket_addrs) => { if let Some(a) = to_socket_addrs.next() { - log::info!("Round: {}, parse_addr, addr is {:?}", i, a); + log::info!("Round: {}, parse_addr({:?}), addr is {:?}", i, addr, a); return Ok(a); } else { - log::warn!("Round: {}, parse_addr, next is None", i); + log::warn!("Round: {}, parse_addr({:?}), next is None", i, addr); } } Err(e) => { @@ -338,7 +345,7 @@ async fn parse_addr(addr: &str) -> Result { } tokio::time::sleep(Duration::from_millis((rand::random::() % 300) + 500)).await; } - Err(MqttError::from("Parsing address error")) + Err(MqttError::from(format!("Parsing address{:?} error", addr))) } pub(crate) struct MessageSender {