Skip to content

Commit

Permalink
Remove Retainer
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Feb 23, 2024
1 parent 2fca0fe commit 635d748
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 110 deletions.
19 changes: 4 additions & 15 deletions rmqtt-plugins/rmqtt-cluster-raft/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,16 @@ use rmqtt::{

use super::config::{retry, BACKOFF_STRATEGY};
use super::message::{Message, RaftGrpcMessage, RaftGrpcMessageReply};
use super::{hook_message_dropped, retainer::ClusterRetainer, shared::ClusterShared, task_exec_queue};
use super::{hook_message_dropped, shared::ClusterShared, task_exec_queue};

pub(crate) struct HookHandler {
shared: &'static ClusterShared,
retainer: &'static ClusterRetainer,
raft_mailbox: Mailbox,
}

impl HookHandler {
pub(crate) fn new(
shared: &'static ClusterShared,
retainer: &'static ClusterRetainer,
raft_mailbox: Mailbox,
) -> Self {
Self { shared, retainer, raft_mailbox }
pub(crate) fn new(shared: &'static ClusterShared, raft_mailbox: Mailbox) -> Self {
Self { shared, raft_mailbox }
}
}

Expand Down Expand Up @@ -120,13 +115,7 @@ impl Handler for HookHandler {
}
GrpcMessage::GetRetains(topic_filter) => {
log::debug!("[GrpcMessage::GetRetains] topic_filter: {:?}", topic_filter);
let new_acc = match self.retainer.inner().get(topic_filter).await {
Ok(retains) => {
HookResult::GrpcMessageReply(Ok(MessageReply::GetRetains(retains)))
}
Err(e) => HookResult::GrpcMessageReply(Err(e)),
};
return (false, Some(new_acc));
unreachable!()
}
GrpcMessage::SubscriptionsGet(clientid) => {
let id = Id::from(Runtime::instance().node.id(), clientid.clone());
Expand Down
10 changes: 2 additions & 8 deletions rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::time::Duration;

use config::PluginConfig;
use handler::HookHandler;
use retainer::ClusterRetainer;

use rmqtt::anyhow::anyhow;
use rmqtt::{
Expand Down Expand Up @@ -45,7 +44,6 @@ use shared::ClusterShared;
mod config;
mod handler;
mod message;
mod retainer;
mod router;
mod shared;

Expand All @@ -60,7 +58,6 @@ struct ClusterPlugin {
cfg: Arc<PluginConfig>,
grpc_clients: GrpcClients,
shared: &'static ClusterShared,
retainer: &'static ClusterRetainer,

router: &'static ClusterRouter,
raft_mailbox: Option<Mailbox>,
Expand Down Expand Up @@ -95,10 +92,9 @@ impl ClusterPlugin {
let grpc_clients = Arc::new(grpc_clients);
let router = ClusterRouter::get_or_init(cfg.try_lock_timeout);
let shared = ClusterShared::get_or_init(router, grpc_clients.clone(), node_names, cfg.message_type);
let retainer = ClusterRetainer::get_or_init(grpc_clients.clone(), cfg.message_type);
let raft_mailbox = None;
let cfg = Arc::new(cfg);
Ok(Self { runtime, register, cfg, grpc_clients, shared, retainer, router, raft_mailbox })
Ok(Self { runtime, register, cfg, grpc_clients, shared, router, raft_mailbox })
}

//raft init ...
Expand Down Expand Up @@ -210,9 +206,7 @@ impl ClusterPlugin {

#[inline]
async fn hook_register(&self, typ: Type) {
self.register
.add(typ, Box::new(HookHandler::new(self.shared, self.retainer, self.raft_mailbox())))
.await;
self.register.add(typ, Box::new(HookHandler::new(self.shared, self.raft_mailbox()))).await;
}

fn raft_mailbox(&self) -> Mailbox {
Expand Down
87 changes: 0 additions & 87 deletions rmqtt-plugins/rmqtt-cluster-raft/src/retainer.rs

This file was deleted.

0 comments on commit 635d748

Please sign in to comment.