Skip to content

Commit

Permalink
Allow specifying a Leader
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 9, 2023
1 parent ad862e8 commit 0f6f169
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 12 deletions.
5 changes: 5 additions & 0 deletions rmqtt-plugins/rmqtt-cluster-raft.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ message_type = 198
node_grpc_addrs = ["1@127.0.0.1:5363", "2@127.0.0.1:5364", "3@127.0.0.1:5365"]
#Raft peer address list
raft_peer_addrs = ["1@127.0.0.1:6003", "2@127.0.0.1:6004", "3@127.0.0.1:6005"]

#Specify a leader id, when the value is 0 or not specified, the first node
#will be designated as the Leader. Default value: 0
leader_id = 0

#Handshake lock timeout
try_lock_timeout = "10s"
task_exec_queue_workers = 500
Expand Down
26 changes: 25 additions & 1 deletion rmqtt-plugins/rmqtt-cluster-raft/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use serde::Serialize;

use rmqtt::grpc::MessageType;
use rmqtt::settings::{deserialize_duration, deserialize_duration_option, NodeAddr, Options};
use rmqtt::Result;
use rmqtt::{MqttError, NodeId, Result};
use rmqtt::{once_cell::sync::Lazy, serde_json};

pub(crate) static BACKOFF_STRATEGY: Lazy<ExponentialBackoff> = Lazy::new(|| {
Expand All @@ -21,22 +21,46 @@ pub(crate) static BACKOFF_STRATEGY: Lazy<ExponentialBackoff> = Lazy::new(|| {

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PluginConfig {

#[serde(default = "PluginConfig::message_type_default")]
pub message_type: MessageType,

pub node_grpc_addrs: Vec<NodeAddr>,

pub raft_peer_addrs: Vec<NodeAddr>,

#[serde(default)]
pub leader_id: NodeId,

#[serde(default = "PluginConfig::try_lock_timeout_default", deserialize_with = "deserialize_duration")]
pub try_lock_timeout: Duration, //Message::HandshakeTryLock

#[serde(default = "PluginConfig::task_exec_queue_workers_default")]
pub task_exec_queue_workers: usize,

#[serde(default = "PluginConfig::task_exec_queue_max_default")]
pub task_exec_queue_max: usize,

#[serde(default = "PluginConfig::raft_default")]
pub raft: RaftConfig,
}

impl PluginConfig {

#[inline]
pub fn leader(&self) -> Result<Option<&NodeAddr>> {
if self.leader_id == 0 {
Ok(None)
}else{
let leader = self.raft_peer_addrs
.iter()
.find(|leader| leader.id == self.leader_id)
.map(|leader| leader )
.ok_or_else(|| MqttError::from("Leader does not exist"))?;
Ok(Some(leader))
}
}

#[inline]
pub fn to_json(&self) -> Result<serde_json::Value> {
Ok(serde_json::to_value(self)?)
Expand Down
50 changes: 39 additions & 11 deletions rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use rmqtt::{
log, rand,
serde_json::{self, json},
tokio,
tokio::sync::RwLock,
};
use rmqtt::{
broker::{
Expand Down Expand Up @@ -70,7 +69,7 @@ struct ClusterPlugin {
name: String,
descr: String,
register: Box<dyn Register>,
cfg: Arc<RwLock<PluginConfig>>,
cfg: Arc<PluginConfig>,
grpc_clients: GrpcClients,
shared: &'static ClusterShared,
retainer: &'static ClusterRetainer,
Expand Down Expand Up @@ -109,7 +108,7 @@ impl ClusterPlugin {
let shared = ClusterShared::get_or_init(router, grpc_clients.clone(), node_names, cfg.message_type);
let retainer = ClusterRetainer::get_or_init(grpc_clients.clone(), cfg.message_type);
let raft_mailbox = None;
let cfg = Arc::new(RwLock::new(cfg));
let cfg = Arc::new(cfg);
Ok(Self {
runtime,
name,
Expand All @@ -125,8 +124,8 @@ impl ClusterPlugin {
}

//raft init ...
async fn start_raft(cfg: Arc<RwLock<PluginConfig>>, router: &'static ClusterRouter) -> Result<Mailbox> {
let raft_peer_addrs = cfg.read().await.raft_peer_addrs.clone();
async fn start_raft(cfg: Arc<PluginConfig>, router: &'static ClusterRouter) -> Result<Mailbox> {
let raft_peer_addrs = cfg.raft_peer_addrs.clone();

let id = Runtime::instance().node.id();
let raft_laddr = raft_peer_addrs
Expand All @@ -140,7 +139,7 @@ impl ClusterPlugin {
//verify the listening address
parse_addr(&raft_laddr).await?;

let raft = Raft::new(raft_laddr, router, logger, cfg.read().await.raft.to_raft_config())
let raft = Raft::new(raft_laddr, router, logger, cfg.raft.to_raft_config())
.map_err(|e| MqttError::StdError(Box::new(e)))?;
let mailbox = raft.mailbox();

Expand All @@ -152,10 +151,40 @@ impl ClusterPlugin {
}
log::info!("peer_addrs: {:?}", peer_addrs);

let leader_info =
raft.find_leader_info(peer_addrs).await.map_err(|e| MqttError::StdError(Box::new(e)))?;
let leader_info = match cfg.leader()? {
Some(leader_info) => {
log::info!("Specify a leader: {:?}", leader_info);
if id == leader_info.id {
//This node is the leader.
None
}else{
//The other nodes are leader.
let mut actual_leader_info = None;
for i in 0..30 {
actual_leader_info = raft.find_leader_info(peer_addrs.clone())
.await.map_err(|e| MqttError::StdError(Box::new(e)))?;
if actual_leader_info.is_some() {
break
}
log::info!("Leader not found, rounds: {}", i);
sleep(Duration::from_millis(500)).await;
}
let (actual_leader_id, actual_leader_addr) = actual_leader_info.ok_or_else(|| MqttError::from("Leader does not exist"))?;
if actual_leader_id != leader_info.id {
return Err(MqttError::from(format!("Not the expected Leader, the expected one is {:?}", leader_info)));
}
Some((actual_leader_id, actual_leader_addr))
}
},
None => {
log::info!("Search for the existing leader ... ");
let leader_info = raft.find_leader_info(peer_addrs).await.map_err(|e| MqttError::StdError(Box::new(e)))?;
log::info!("The information about the located leader: {:?}", leader_info);
leader_info
}
};

// let (status_tx, status_rx) = futures::channel::oneshot::channel::<Result<Status>>();
//let (status_tx, status_rx) = futures::channel::oneshot::channel::<Result<Status>>();
let _child = std::thread::Builder::new().name("cluster-raft".to_string()).spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -166,7 +195,6 @@ impl ClusterPlugin {
.unwrap();

let runner = async move {
let id = Runtime::instance().node.id();
log::info!("leader_info: {:?}", leader_info);
let raft_handle = match leader_info {
Some((leader_id, leader_addr)) => {
Expand Down Expand Up @@ -252,7 +280,7 @@ impl Plugin for ClusterPlugin {

#[inline]
async fn get_config(&self) -> Result<serde_json::Value> {
self.cfg.read().await.to_json()
self.cfg.to_json()
}

#[inline]
Expand Down

0 comments on commit 0f6f169

Please sign in to comment.