Skip to content

Commit

Permalink
Add statistical mode, which can be specified by specific statistical …
Browse files Browse the repository at this point in the history
…items.
  • Loading branch information
rmqtt committed Mar 18, 2024
1 parent d765c91 commit 4b71993
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 24 deletions.
7 changes: 6 additions & 1 deletion rmqtt-plugins/rmqtt-retainer/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use rmqtt::{
timestamp_millis, tokio,
tokio::sync::RwLock,
tokio::time::sleep,
NodeId, Retain, TimestampMillis, TopicName,
NodeId, Retain, StatsMergeMode, TimestampMillis, TopicName,
};

use rmqtt::{MqttError, Result, Topic, TopicFilter};
Expand Down Expand Up @@ -430,6 +430,11 @@ impl RetainStorage for &'static Retainer {
.map(|v| *v)
.unwrap_or(-1)
}

#[inline]
fn stats_merge_mode(&self) -> StatsMergeMode {
StatsMergeMode::Max
}
}

#[derive(Clone)]
Expand Down
5 changes: 5 additions & 0 deletions rmqtt/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ pub trait RetainStorage: Sync + Send {

///
async fn max(&self) -> isize;

#[inline]
fn stats_merge_mode(&self) -> StatsMergeMode {
StatsMergeMode::None
}
}

#[async_trait]
Expand Down
78 changes: 55 additions & 23 deletions rmqtt/src/broker/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ use once_cell::sync::OnceCell;
use crate::broker::executor::{get_active_count, get_rate};
#[cfg(feature = "debug")]
use crate::runtime::TaskExecStats;
use crate::{HashMap, NodeId, Runtime};
use crate::{HashMap, NodeId, Runtime, StatsMergeMode};

type Current = AtomicIsize;
type Max = AtomicIsize;

#[derive(Serialize, Deserialize)]
pub struct Counter(Current, Max);
pub struct Counter(Current, Max, StatsMergeMode);

impl Clone for Counter {
fn clone(&self) -> Self {
Counter(
AtomicIsize::new(self.0.load(Ordering::SeqCst)),
AtomicIsize::new(self.1.load(Ordering::SeqCst)),
self.2.clone(),
)
}
}
Expand All @@ -39,7 +40,12 @@ impl Default for Counter {
impl Counter {
#[inline]
pub fn new() -> Self {
Counter(AtomicIsize::new(0), AtomicIsize::new(0))
Counter(AtomicIsize::new(0), AtomicIsize::new(0), StatsMergeMode::None)
}

#[inline]
pub fn new_with(c: isize, max: isize, m: StatsMergeMode) -> Self {
Counter(AtomicIsize::new(c), AtomicIsize::new(max), m)
}

#[inline]
Expand Down Expand Up @@ -89,11 +95,21 @@ impl Counter {
self.0.fetch_min(count, Ordering::SeqCst);
}

#[inline]
pub fn count_max(&self, count: isize) {
self.0.fetch_max(count, Ordering::SeqCst);
}

#[inline]
pub fn max_max(&self, max: isize) {
self.1.fetch_max(max, Ordering::SeqCst);
}

#[inline]
pub fn max_min(&self, max: isize) {
self.1.fetch_min(max, Ordering::SeqCst);
}

#[inline]
pub fn count(&self) -> isize {
self.0.load(Ordering::SeqCst)
Expand All @@ -116,6 +132,11 @@ impl Counter {
self.1.store(other.1.load(Ordering::SeqCst), Ordering::SeqCst);
}

#[inline]
pub fn merge(&self, other: &Self) {
stats_merge(&self.2, self, other);
}

#[inline]
pub fn to_json(&self) -> serde_json::Value {
json!({
Expand All @@ -139,8 +160,8 @@ pub struct Stats {
pub in_inflights: Counter,
pub forwards: Counter,
pub message_storages: Counter,
pub retaineds: Counter,

retaineds_map: HashMap<NodeId, Counter>,
topics_map: HashMap<NodeId, Counter>,
routes_map: HashMap<NodeId, Counter>,

Expand Down Expand Up @@ -177,8 +198,8 @@ impl Stats {
in_inflights: Counter::new(),
forwards: Counter::new(),
message_storages: Counter::new(),
retaineds: Counter::new(),

retaineds_map: HashMap::default(),
topics_map: HashMap::default(),
routes_map: HashMap::default(),

Expand Down Expand Up @@ -224,14 +245,10 @@ impl Stats {
self.message_storages.max_max(message_mgr.max().await);
}

let mut retaineds_map = HashMap::default();
{
let retaineds = {
let retain = Runtime::instance().extends.retain().await;
let c = Counter::new();
c.current_set(retain.count().await);
c.max_max(retain.max().await);
retaineds_map.insert(node_id, c);
}
Counter::new_with(retain.count().await, retain.max().await, retain.stats_merge_mode())
};

#[cfg(feature = "debug")]
let shared = Runtime::instance().extends.shared().await;
Expand Down Expand Up @@ -269,7 +286,7 @@ impl Stats {
forwards: self.forwards.clone(),
message_storages: self.message_storages.clone(),

retaineds_map,
retaineds,
topics_map,
routes_map,

Expand Down Expand Up @@ -304,8 +321,8 @@ impl Stats {
self.in_inflights.add(&other.in_inflights);
self.forwards.add(&other.forwards);
self.message_storages.add(&other.message_storages);
self.retaineds.merge(&other.retaineds);

self.retaineds_map.extend(other.retaineds_map);
self.topics_map.extend(other.topics_map);
self.routes_map.extend(other.routes_map);

Expand Down Expand Up @@ -342,13 +359,6 @@ impl Stats {
let topics = router.merge_topics(&self.topics_map);
let routes = router.merge_routes(&self.routes_map);

let retaineds = self
.retaineds_map
.iter()
.max_by(|(_, c1), (_, c2)| c1.count().cmp(&c2.count()))
.map(|(_, c)| c.clone())
.unwrap_or_default();

let mut json_val = json!({
"handshakings.count": self.handshakings.count(),
"handshakings.max": self.handshakings.max(),
Expand All @@ -363,8 +373,8 @@ impl Stats {
"subscriptions.max": self.subscriptions.max(),
"subscriptions_shared.count": self.subscriptions_shared.count(),
"subscriptions_shared.max": self.subscriptions_shared.max(),
"retaineds.count": retaineds.count(),
"retaineds.max": retaineds.max(),
"retaineds.count": self.retaineds.count(),
"retaineds.max": self.retaineds.max(),

"message_queues.count": self.message_queues.count(),
"message_queues.max": self.message_queues.max(),
Expand Down Expand Up @@ -401,3 +411,25 @@ impl Stats {
json_val
}
}

#[inline]
fn stats_merge<'a>(mode: &StatsMergeMode, c: &'a Counter, o: &Counter) -> &'a Counter {
match mode {
StatsMergeMode::None => {}
StatsMergeMode::Sum => {
c.add(o);
}
StatsMergeMode::Max => {
c.count_max(o.count());
c.max_max(o.max());
}
StatsMergeMode::Min => {
c.count_min(o.count());
c.max_min(o.max());
}
_ => {
log::info!("unimplemented!");
}
}
c
}
9 changes: 9 additions & 0 deletions rmqtt/src/broker/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2475,6 +2475,15 @@ pub fn format_timestamp_millis(t: TimestampMillis) -> String {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum StatsMergeMode {
None, // Represents no merging;
Sum, // Represents summing the data;
Average, // Represents averaging the data;
Max, // Represents taking the maximum value of the data;
Min, // Represents taking the minimum value of the data;
}

#[test]
fn test_reason() {
assert_eq!(Reason::ConnectKicked(false).is_kicked(false), true);
Expand Down

0 comments on commit 4b71993

Please sign in to comment.