Skip to content

Commit

Permalink
Simplify plug-in implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Feb 21, 2024
1 parent 0264fa9 commit 2fca0fe
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 224 deletions.
20 changes: 3 additions & 17 deletions rmqtt-plugins/rmqtt-acl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,13 @@ use rmqtt::{
use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
broker::types::{AuthResult, PublishAclResult, SubscribeAckReason, SubscribeAclResult, Topic},
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
Result, Runtime,
plugin::{PackageInfo, Plugin},
register, Result, Runtime,
};

mod config;

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(async move { AclPlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) }) })
})
.await?;
Ok(())
}
register!(AclPlugin::new);

#[derive(Plugin)]
struct AclPlugin {
Expand Down
22 changes: 3 additions & 19 deletions rmqtt-plugins/rmqtt-auth-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use rmqtt::{
broker::types::{
AuthResult, Password, PublishAclResult, SubscribeAckReason, SubscribeAclResult, Superuser,
},
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
MqttError, Result, Runtime, TopicName,
plugin::{PackageInfo, Plugin},
register, MqttError, Result, Runtime, TopicName,
};

mod config;
Expand Down Expand Up @@ -72,23 +72,7 @@ impl ACLType {
}
}

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(
async move { AuthHttpPlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) }) },
)
})
.await?;
Ok(())
}
register!(AuthHttpPlugin::new);

#[derive(Plugin)]
struct AuthHttpPlugin {
Expand Down
22 changes: 3 additions & 19 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use rmqtt::{
types::{From, Publish, Reason, To},
},
grpc::{GrpcClients, Message, MessageReply, MessageType},
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
Result, Runtime,
plugin::{PackageInfo, Plugin},
register, Result, Runtime,
};
use router::ClusterRouter;
use shared::ClusterShared;
Expand All @@ -39,23 +39,7 @@ mod shared;

type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(
async move { ClusterPlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) }) },
)
})
.await?;
Ok(())
}
register!(ClusterPlugin::new);

#[derive(Plugin)]
struct ClusterPlugin {
Expand Down
21 changes: 3 additions & 18 deletions rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use rmqtt::{
types::{From, Publish, Reason, To},
},
grpc::{client::NodeGrpcClient, GrpcClients, Message, MessageReply, MessageType},
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
plugin::{PackageInfo, Plugin},
register,
tokio::time::sleep,
Result, Runtime,
};
Expand All @@ -50,23 +51,7 @@ mod shared;

type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(
async move { ClusterPlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) }) },
)
})
.await?;
Ok(())
}
register!(ClusterPlugin::new);

#[derive(Plugin)]
struct ClusterPlugin {
Expand Down
22 changes: 4 additions & 18 deletions rmqtt-plugins/rmqtt-counter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,11 @@ use rmqtt::{async_trait::async_trait, log, FromType};
use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
broker::metrics::Metrics,
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
Result, Runtime,
plugin::{PackageInfo, Plugin},
register, Result, Runtime,
};

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(async move { CounterPlugin::new(runtime).await.map(|p| -> DynPlugin { Box::new(p) }) })
})
.await?;
Ok(())
}
register!(CounterPlugin::new);

#[derive(Plugin)]
struct CounterPlugin {
Expand All @@ -35,7 +21,7 @@ struct CounterPlugin {

impl CounterPlugin {
#[inline]
async fn new(runtime: &'static Runtime) -> Result<Self> {
async fn new<S: Into<String>>(runtime: &'static Runtime, _name: S) -> Result<Self> {
let register = runtime.extends.hook_mgr().await.register();
Ok(Self { register })
}
Expand Down
22 changes: 3 additions & 19 deletions rmqtt-plugins/rmqtt-http-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use rmqtt::{
};
use rmqtt::{
broker::hook::{Register, Type},
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
Result, Runtime,
plugin::{PackageInfo, Plugin},
register, Result, Runtime,
};

mod api;
Expand All @@ -30,23 +30,7 @@ mod types;
type ShutdownTX = oneshot::Sender<()>;
type PluginConfigType = Arc<RwLock<PluginConfig>>;

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(
async move { HttpApiPlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) }) },
)
})
.await?;
Ok(())
}
register!(HttpApiPlugin::new);

#[derive(Plugin)]
struct HttpApiPlugin {
Expand Down
22 changes: 3 additions & 19 deletions rmqtt-plugins/rmqtt-message-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use std::sync::Arc;

use rmqtt::{
broker::hook::Register,
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
Result, Runtime,
plugin::{PackageInfo, Plugin},
register, Result, Runtime,
};
use rmqtt_storage::init_db;

Expand All @@ -30,23 +30,7 @@ mod config;
mod ram;
mod storage;

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(
async move { StoragePlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) }) },
)
})
.await?;
Ok(())
}
register!(StoragePlugin::new);

#[derive(Plugin)]
struct StoragePlugin {
Expand Down
22 changes: 4 additions & 18 deletions rmqtt-plugins/rmqtt-plugin-template/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,11 @@ extern crate rmqtt_macros;
use async_trait::async_trait;
use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
Result, Runtime,
plugin::{PackageInfo, Plugin},
register, Result, Runtime,
};

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(async move { Template::new(runtime).await.map(|p| -> DynPlugin { Box::new(p) }) })
})
.await?;
Ok(())
}
register!(Template::new);

#[derive(Plugin)]
struct Template {
Expand All @@ -32,7 +18,7 @@ struct Template {

impl Template {
#[inline]
async fn new(runtime: &'static Runtime) -> Result<Self> {
async fn new(runtime: &'static Runtime, _name: &'static str) -> Result<Self> {
let register = runtime.extends.hook_mgr().await.register();
Ok(Self { _runtime: runtime, register })
}
Expand Down
22 changes: 3 additions & 19 deletions rmqtt-plugins/rmqtt-retainer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,15 @@ use rmqtt::{async_trait::async_trait, log, serde_json, tokio::sync::RwLock, toki
use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
grpc::{Message, MessageReply},
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
Result, Runtime,
plugin::{PackageInfo, Plugin},
register, Result, Runtime,
};
use std::sync::Arc;

mod config;
mod retainer;

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(
async move { RetainerPlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) }) },
)
})
.await?;
Ok(())
}
register!(RetainerPlugin::new);

#[derive(Plugin)]
struct RetainerPlugin {
Expand Down
24 changes: 4 additions & 20 deletions rmqtt-plugins/rmqtt-session-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
broker::inflight::InflightMessage,
broker::types::DisconnectInfo,
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
ClientId, From, MqttError, Publish, Result, Runtime, Session, SessionState, SessionSubMap, SessionSubs,
TimestampMillis,
plugin::{PackageInfo, Plugin},
register, ClientId, From, MqttError, Publish, Result, Runtime, Session, SessionState, SessionSubMap,
SessionSubs, TimestampMillis,
};

use rmqtt_storage::{init_db, DefaultStorageDB, List, Map, StorageType};
Expand All @@ -47,23 +47,7 @@ enum RebuildChanType {

type OfflineMessageOptionType = Option<(ClientId, From, Publish)>;

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(
async move { StoragePlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) }) },
)
})
.await?;
Ok(())
}
register!(StoragePlugin::new);

#[derive(Plugin)]
struct StoragePlugin {
Expand Down
23 changes: 4 additions & 19 deletions rmqtt-plugins/rmqtt-sys-topic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use rmqtt::{
use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
broker::types::{From, Id, QoSEx},
plugin::{DynPlugin, DynPluginResult, PackageInfo, Plugin},
ClientId, NodeId, Publish, PublishProperties, QoS, Result, Runtime, SessionState, TopicName, UserName,
plugin::{PackageInfo, Plugin},
register, ClientId, NodeId, Publish, PublishProperties, QoS, Result, Runtime, SessionState, TopicName,
UserName,
};
use std::convert::From as _;
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -29,23 +30,7 @@ use std::time::Duration;

mod config;

#[inline]
pub async fn register(
runtime: &'static Runtime,
name: &'static str,
default_startup: bool,
immutable: bool,
) -> Result<()> {
runtime
.plugins
.register(name, default_startup, immutable, move || -> DynPluginResult {
Box::pin(async move {
SystemTopicPlugin::new(runtime, name).await.map(|p| -> DynPlugin { Box::new(p) })
})
})
.await?;
Ok(())
}
register!(SystemTopicPlugin::new);

#[derive(Plugin)]
struct SystemTopicPlugin {
Expand Down
Loading

0 comments on commit 2fca0fe

Please sign in to comment.