Skip to content

Commit

Permalink
Upgrade deps to rmqtt-raft = "0.3.4"
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Oct 14, 2023
1 parent 49fad0d commit a6d5c8b
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion rmqtt-plugins/rmqtt-cluster-raft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"

[dependencies]
rmqtt = "0.2"
rmqtt-raft = { version = "0.3.3", features = ["reuse"] }
rmqtt-raft = { version = "0.3.4", features = ["reuse"] }
#rmqtt-raft = { path = "../../../rmqtt-raft", features = ["reuse"] }
serde = { version = "1.0", features = ["derive"] }
backoff = { version = "0.4", features = ["futures", "tokio"] }
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-cluster-raft/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Handler for HookHandler {
if let Err(e) = retry(BACKOFF_STRATEGY.clone(), || async {
let msg = msg.clone();
let mailbox = raft_mailbox.clone();
let res = async move { mailbox.send(msg).await }
let res = async move { mailbox.send_proposal(msg).await }
.spawn(task_exec_queue())
.result()
.await
Expand Down Expand Up @@ -73,7 +73,7 @@ impl Handler for HookHandler {
if let Err(e) = retry(BACKOFF_STRATEGY.clone(), || async {
let msg = msg.clone();
let mailbox = raft_mailbox.clone();
let res = async move { mailbox.send(msg).await }
let res = async move { mailbox.send_proposal(msg).await }
.spawn(task_exec_queue())
.result()
.await
Expand Down
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-cluster-raft/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Router for &'static ClusterRouter {

let msg = Message::Add { topic_filter, id, opts }.encode()?;
let mailbox = self.raft_mailbox().await;
let _ = async move { mailbox.send(msg).await.map_err(anyhow::Error::new) }
let _ = async move { mailbox.send_proposal(msg).await.map_err(anyhow::Error::new) }
.spawn(task_exec_queue())
.result()
.await
Expand All @@ -136,7 +136,7 @@ impl Router for &'static ClusterRouter {
if let Err(e) = retry(BACKOFF_STRATEGY.clone(), || async {
let msg = msg.clone();
let mailbox = raft_mailbox.clone();
let res = async move { mailbox.send(msg).await }
let res = async move { mailbox.send_proposal(msg).await }
.spawn(task_exec_queue())
.result()
.await
Expand Down
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-cluster-raft/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Entry for ClusterLockEntry {
async fn try_lock(&self) -> Result<Box<dyn Entry>> {
let msg = RaftMessage::HandshakeTryLock { id: self.id() }.encode()?;
let raft_mailbox = self.cluster_shared.router.raft_mailbox().await;
let reply = raft_mailbox.send(msg).await.map_err(anyhow::Error::new)?;
let reply = raft_mailbox.send_proposal(msg).await.map_err(anyhow::Error::new)?;
let mut prev_node_id = None;
if !reply.is_empty() {
match RaftMessageReply::decode(&reply)? {
Expand Down Expand Up @@ -92,7 +92,7 @@ impl Entry for ClusterLockEntry {
async fn set(&mut self, session: Session, tx: Tx, conn: ClientInfo) -> Result<()> {
let msg = RaftMessage::Connected { id: session.id.clone() }.encode()?;
let raft_mailbox = self.cluster_shared.router.raft_mailbox().await;
let reply = raft_mailbox.send(msg).await.map_err(anyhow::Error::new)?;
let reply = raft_mailbox.send_proposal(msg).await.map_err(anyhow::Error::new)?;
if !reply.is_empty() {
let reply = RaftMessageReply::decode(&reply)?;
match reply {
Expand Down

0 comments on commit a6d5c8b

Please sign in to comment.