Skip to content

Commit

Permalink
Add plugin: rmqtt-sys-topic
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 8, 2023
1 parent f9b6339 commit dd0015d
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"rmqtt-plugins/rmqtt-counter",
"rmqtt-plugins/rmqtt-http-api",
"rmqtt-plugins/rmqtt-retainer",
"rmqtt-plugins/rmqtt-sys-topic",
"rmqtt-bin",
"rmqtt-macros"
]
Expand All @@ -24,6 +25,7 @@ rmqtt-cluster-raft = { path = "rmqtt-plugins/rmqtt-cluster-raft" }
rmqtt-counter = { path = "rmqtt-plugins/rmqtt-counter" }
rmqtt-http-api = { path = "rmqtt-plugins/rmqtt-http-api" }
rmqtt-retainer = { path = "rmqtt-plugins/rmqtt-retainer" }
rmqtt-sys-topic = { path = "rmqtt-plugins/rmqtt-sys-topic" }

[workspace.package]
version = "0.2.14"
Expand Down
Empty file.
3 changes: 2 additions & 1 deletion examples/cluster-raft/1/rmqtt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ plugins.dir = "./plugins/"
plugins.default_startups = [
"rmqtt-retainer",
"rmqtt-cluster-raft",
# "rmqtt-auth-http",
#"rmqtt-auth-http",
"rmqtt-sys-topic",
"rmqtt-web-hook"
]

Expand Down
Empty file.
3 changes: 2 additions & 1 deletion examples/cluster-raft/2/rmqtt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ plugins.dir = "./plugins/"
plugins.default_startups = [
"rmqtt-retainer",
"rmqtt-cluster-raft",
# "rmqtt-auth-http",
#"rmqtt-auth-http",
"rmqtt-sys-topic",
"rmqtt-web-hook"
]

Expand Down
Empty file.
3 changes: 2 additions & 1 deletion examples/cluster-raft/3/rmqtt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ plugins.dir = "./plugins/"
plugins.default_startups = [
"rmqtt-retainer",
"rmqtt-cluster-raft",
# "rmqtt-auth-http",
#"rmqtt-auth-http",
"rmqtt-sys-topic",
"rmqtt-web-hook"
]

Expand Down
2 changes: 2 additions & 0 deletions rmqtt-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ rmqtt-cluster-raft = "0.1"
rmqtt-counter = "0.1"
rmqtt-http-api = "0.1"
rmqtt-retainer = "0.1"
rmqtt-sys-topic = "0.1"
#rmqtt-plugin-template = "0.1"

[package.metadata.plugins]
Expand All @@ -52,6 +53,7 @@ rmqtt-auth-http = { }
rmqtt-cluster-broadcast = { immutable = true }
rmqtt-cluster-raft = { immutable = true }
rmqtt-retainer = { }
rmqtt-sys-topic = { }
#rmqtt-plugin-template = { }

[build-dependencies]
Expand Down
9 changes: 9 additions & 0 deletions rmqtt-plugins/rmqtt-sys-topic.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
##--------------------------------------------------------------------
## rmqtt-sys-topic
##--------------------------------------------------------------------

#$SYS system message publish QoS
publish_qos = 1

#$SYS system message publish period
publish_interval = "1m"
11 changes: 11 additions & 0 deletions rmqtt-plugins/rmqtt-sys-topic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "rmqtt-sys-topic"
version = "0.1.0"
authors = ["rmqtt <rmqttd@126.com>"]
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rmqtt = "0.2"
serde = { version = "1.0", features = ["derive"] }
67 changes: 67 additions & 0 deletions rmqtt-plugins/rmqtt-sys-topic/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use serde::de::{self, Deserialize, Deserializer};
use std::time::Duration;

use rmqtt::broker::types::QoS;
use rmqtt::serde_json;
use rmqtt::settings::to_duration;
use rmqtt::Result;

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PluginConfig {
#[serde(
default = "PluginConfig::publish_qos_default",
deserialize_with = "PluginConfig::deserialize_publish_qos"
)]
pub publish_qos: QoS,

#[serde(
default = "PluginConfig::publish_interval_default",
deserialize_with = "PluginConfig::deserialize_publish_interval"
)]
pub publish_interval: Duration,
}

impl PluginConfig {
#[inline]
fn publish_qos_default() -> QoS {
QoS::AtLeastOnce
}

#[inline]
fn publish_interval_default() -> Duration {
Duration::from_secs(60)
}

#[inline]
pub fn to_json(&self) -> Result<serde_json::Value> {
Ok(serde_json::to_value(self)?)
}

#[inline]
fn deserialize_publish_qos<'de, D>(deserializer: D) -> Result<QoS, D::Error>
where
D: Deserializer<'de>,
{
let qos = match u8::deserialize(deserializer)? {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => return Err(de::Error::custom("QoS configuration error, only values (0,1,2) are supported")),
};
Ok(qos)
}

#[inline]
pub fn deserialize_publish_interval<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let v = String::deserialize(deserializer)?;
let d = to_duration(&v);
if d < Duration::from_secs(1) {
Err(de::Error::custom("'publish_interval' must be greater than 1 second"))
} else {
Ok(d)
}
}
}
Loading

0 comments on commit dd0015d

Please sign in to comment.