Skip to content

Commit

Permalink
Add local_exec() and scheduler_init(), scheduled CPU load updates
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Oct 15, 2023
1 parent 54dbc94 commit 151fa8d
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 4 deletions.
5 changes: 4 additions & 1 deletion rmqtt-bin/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rmqtt::ntex_mqtt::{
};
use rmqtt::settings::{listener::Listener, Options, Settings};
use rmqtt::{log, structopt::StructOpt, tokio};
use rmqtt::{logger::logger_init, MqttError, Result, Runtime, SessionState};
use rmqtt::{logger::logger_init, runtime, MqttError, Result, Runtime, SessionState};

mod ws;

Expand Down Expand Up @@ -56,6 +56,9 @@ async fn main() {

Settings::logs();

//init scheduler
runtime::scheduler_init().await.unwrap();

//register plugin
plugin::registers(plugin::default_startups()).await.unwrap();

Expand Down
159 changes: 156 additions & 3 deletions rmqtt/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
use once_cell::sync::OnceCell;
use rust_box::task_exec_queue::{Builder, TaskExecQueue};
use std::fmt;
use std::iter::Sum;
use std::rc::Rc;
use std::thread::ThreadId;
use std::time::Duration;

use once_cell::sync::OnceCell;
use rust_box::stream_ext::LimiterExt;
use rust_box::task_exec_queue::{Builder, LocalBuilder, LocalSender, LocalTaskExecQueue, TaskExecQueue};
use systemstat::Platform;
use tokio::spawn;
use tokio::task::spawn_local;
use tokio_cron_scheduler::JobScheduler;

use crate::logger::{config_logger, Logger};
use crate::{
broker::{metrics::Metrics, stats::Stats},
broker::{metrics::Metrics, stats::Stats, types::DashMap},
extend,
node::Node,
plugin,
settings::Settings,
Result,
};

pub struct Runtime {
Expand Down Expand Up @@ -63,6 +72,16 @@ impl Runtime {
pub fn instance() -> &'static Self {
INSTANCE.get().unwrap()
}

#[inline]
pub fn local_exec() -> Rc<LocalTaskExecQueue> {
get_local_exec()
}

#[inline]
pub fn local_exec_stats() -> TaskExecStats {
get_local_stats()
}
}

impl fmt::Debug for Runtime {
Expand All @@ -71,3 +90,137 @@ impl fmt::Debug for Runtime {
Ok(())
}
}

pub async fn scheduler_init() -> Result<()> {
//Execute every 5 seconds
let async_job_5 = tokio_cron_scheduler::Job::new_async("*/5 * * * * *", move |_uuid, _l| {
Box::pin(async move {
Runtime::instance().node.update_cpuload().await;
})
})
.map_err(anyhow::Error::new)?;
Runtime::instance().sched.add(async_job_5).await.map_err(anyhow::Error::new)?;

Ok(())
}

#[inline]
fn get_local_exec() -> Rc<LocalTaskExecQueue> {
std::thread_local! {
pub static LOCAL_EXECUTORS: Rc<LocalTaskExecQueue> = {
let exec_workers = Runtime::instance().settings.task.local_exec_workers;
let exec_queue_max = Runtime::instance().settings.task.local_exec_queue_max;
let (tokens, period) = Runtime::instance().settings.task.local_exec_rate_limit;
let tokens = tokens.get() as usize;

let rate_limiter = leaky_bucket::RateLimiter::builder()
.initial(tokens)
.refill(tokens / 10)
.interval(period / 10)
.max(tokens)
.fair(true)
.build();

let (tx, rx) = futures::channel::mpsc::channel(exec_queue_max);

let (exec, task_runner) = LocalBuilder::default()
.workers(exec_workers)
.queue_max(exec_queue_max)
.with_channel(LocalSender::new(tx), rx.leaky_bucket_limiter(rate_limiter))
.build();

let exec1 = exec.clone();

spawn_local(async move {
futures::future::join(task_runner, async move {
loop {
set_local_stats(&exec1).await;
tokio::time::sleep(Duration::from_secs(3)).await;
}
})
.await;
});
Rc::new(exec)
};
}

LOCAL_EXECUTORS.with(|exec| exec.clone())
}

#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct TaskExecStats {
active_count: isize,
completed_count: isize,
pending_wakers_count: usize,
waiting_wakers_count: usize,
waiting_count: isize,
rate: f64,
}

impl TaskExecStats {
#[inline]
pub async fn from_exec() -> Self {
let exec = &Runtime::instance().exec;
Self {
active_count: exec.active_count(),
completed_count: exec.completed_count().await,
pending_wakers_count: exec.pending_wakers_count(),
waiting_wakers_count: exec.waiting_wakers_count(),
waiting_count: exec.waiting_count(),
rate: exec.rate().await,
}
}

#[inline]
pub fn from_local_exec() -> Self {
get_local_stats()
}

#[inline]
fn add2(mut self, other: &Self) -> Self {
self.add(other);
self
}

#[inline]
pub fn add(&mut self, other: &Self) {
self.active_count += other.active_count;
self.completed_count += other.completed_count;
self.pending_wakers_count += other.pending_wakers_count;
self.waiting_wakers_count += other.waiting_wakers_count;
self.waiting_count += other.waiting_count;
self.rate += other.rate;
}
}

impl Sum for TaskExecStats {
fn sum<I: Iterator<Item = TaskExecStats>>(iter: I) -> Self {
iter.fold(TaskExecStats::default(), |acc, x| acc.add2(&x))
}
}

impl Sum<&'static TaskExecStats> for TaskExecStats {
fn sum<I: Iterator<Item = &'static TaskExecStats>>(iter: I) -> Self {
iter.fold(TaskExecStats::default(), |acc, x| acc.add2(x))
}
}

static LOCAL_ACTIVE_COUNTS: OnceCell<DashMap<ThreadId, TaskExecStats>> = OnceCell::new();

#[inline]
async fn set_local_stats(exec: &LocalTaskExecQueue) {
let active_counts = LOCAL_ACTIVE_COUNTS.get_or_init(DashMap::default);
let mut entry = active_counts.entry(std::thread::current().id()).or_default();
let stats = entry.value_mut();
stats.active_count = exec.active_count();
stats.completed_count = exec.completed_count().await;
stats.pending_wakers_count = exec.pending_wakers_count();
stats.waiting_wakers_count = exec.waiting_wakers_count();
stats.waiting_count = exec.waiting_count();
stats.rate = exec.rate().await;
}

#[inline]
fn get_local_stats() -> TaskExecStats {
LOCAL_ACTIVE_COUNTS.get().map(|m| m.iter().map(|item| item.value().clone()).sum()).unwrap_or_default()
}

0 comments on commit 151fa8d

Please sign in to comment.