Skip to content

Commit

Permalink
Optimize the codes
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jun 29, 2023
1 parent c23d568 commit 29b3be1
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use config::PluginConfig;
use handler::HookHandler;
use retainer::ClusterRetainer;
use rmqtt::{
ahash,
ahash, anyhow,
async_trait::async_trait,
log, rand,
serde_json::{self, json},
Expand Down Expand Up @@ -44,7 +44,6 @@ mod router;
mod shared;

type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
// pub(crate) type GrpcClients = Arc<DashMap<NodeId, NodeGrpcClient>>;

#[inline]
pub async fn register(
Expand Down Expand Up @@ -144,7 +143,8 @@ impl ClusterPlugin {
//verify the listening address
parse_addr(&raft_laddr).await?;

let raft = Raft::new(raft_laddr, router, logger, cfg.read().raft.to_raft_config());
let raft = Raft::new(raft_laddr, router, logger, cfg.read().raft.to_raft_config())
.map_err(|e| MqttError::Error(Box::new(e)))?;
let mailbox = raft.mailbox();

let mut peer_addrs = Vec::new();
Expand All @@ -158,6 +158,7 @@ impl ClusterPlugin {
let leader_info =
raft.find_leader_info(peer_addrs).await.map_err(|e| MqttError::Error(Box::new(e)))?;

// let (status_tx, status_rx) = futures::channel::oneshot::channel::<Result<Status>>();
let _child = std::thread::Builder::new().name("cluster-raft".to_string()).spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -169,7 +170,6 @@ impl ClusterPlugin {

let runner = async move {
let id = Runtime::instance().node.id();

log::info!("leader_info: {:?}", leader_info);
let raft_handle = match leader_info {
Some((leader_id, leader_addr)) => {
Expand Down Expand Up @@ -223,13 +223,13 @@ impl Plugin for ClusterPlugin {

let raft_mailbox = Self::start_raft(self.cfg.clone(), self.router).await?;

for _ in 0..30 {
for i in 0..30 {
match raft_mailbox.status().await {
Ok(status) => {
if status.is_started() {
break;
}
log::info!("{} Initializing cluster", self.name);
log::info!("{} Initializing cluster, raft status({}): {:?}", self.name, i, status);
}
Err(e) => {
log::info!("{} init error, {:?}", self.name, e);
Expand Down Expand Up @@ -261,10 +261,17 @@ impl Plugin for ClusterPlugin {
#[inline]
async fn start(&mut self) -> Result<()> {
log::info!("{} start", self.name);
let raft_mailbox = self.raft_mailbox();
*self.runtime.extends.router_mut().await = Box::new(self.router);
*self.runtime.extends.shared_mut().await = Box::new(self.shared);
self.register.start().await;
Ok(())
let status = raft_mailbox.status().await.map_err(anyhow::Error::new)?;
log::info!("raft status: {:?}", status);
if status.is_started() {
Ok(())
} else {
Err(MqttError::from("Raft cluster status is abnormal"))
}
}

#[inline]
Expand Down Expand Up @@ -326,10 +333,10 @@ async fn parse_addr(addr: &str) -> Result<SocketAddr> {
match addr.to_socket_addrs() {
Ok(mut to_socket_addrs) => {
if let Some(a) = to_socket_addrs.next() {
log::info!("Round: {}, parse_addr, addr is {:?}", i, a);
log::info!("Round: {}, parse_addr({:?}), addr is {:?}", i, addr, a);
return Ok(a);
} else {
log::warn!("Round: {}, parse_addr, next is None", i);
log::warn!("Round: {}, parse_addr({:?}), next is None", i, addr);
}
}
Err(e) => {
Expand All @@ -338,7 +345,7 @@ async fn parse_addr(addr: &str) -> Result<SocketAddr> {
}
tokio::time::sleep(Duration::from_millis((rand::random::<u64>() % 300) + 500)).await;
}
Err(MqttError::from("Parsing address error"))
Err(MqttError::from(format!("Parsing address{:?} error", addr)))
}

pub(crate) struct MessageSender {
Expand Down

0 comments on commit 29b3be1

Please sign in to comment.