Skip to content

Commit

Permalink
Remove dependency library 'parking_lot::RwLock'
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 4, 2023
1 parent 6262294 commit 8a95437
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 59 deletions.
10 changes: 5 additions & 5 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use rmqtt::{
async_trait::async_trait,
log,
serde_json::{self, json},
RwLock,
tokio::sync::RwLock,
};
use rmqtt::{
broker::{
Expand Down Expand Up @@ -72,11 +72,11 @@ impl ClusterPlugin {
async fn new<S: Into<String>>(runtime: &'static Runtime, name: S, descr: S) -> Result<Self> {
let name = name.into();
let cfg = Arc::new(RwLock::new(runtime.settings.plugins.load_config::<PluginConfig>(&name)?));
log::debug!("{} ClusterPlugin cfg: {:?}", name, cfg.read());
log::debug!("{} ClusterPlugin cfg: {:?}", name, cfg.read().await);

let register = runtime.extends.hook_mgr().await.register();
let mut grpc_clients = HashMap::default();
let node_grpc_addrs = cfg.read().node_grpc_addrs.clone();
let node_grpc_addrs = cfg.read().await.node_grpc_addrs.clone();
for node_addr in &node_grpc_addrs {
if node_addr.id != runtime.node.id() {
grpc_clients.insert(
Expand All @@ -86,7 +86,7 @@ impl ClusterPlugin {
}
}
let grpc_clients = Arc::new(grpc_clients);
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
let router = ClusterRouter::get_or_init(grpc_clients.clone(), message_type);
let shared = ClusterShared::get_or_init(grpc_clients.clone(), message_type);
let retainer = ClusterRetainer::get_or_init(grpc_clients.clone(), message_type);
Expand Down Expand Up @@ -115,7 +115,7 @@ impl Plugin for ClusterPlugin {

#[inline]
async fn get_config(&self) -> Result<serde_json::Value> {
self.cfg.read().to_json()
self.cfg.read().await.to_json()
}

#[inline]
Expand Down
9 changes: 5 additions & 4 deletions rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use rmqtt::{
async_trait::async_trait,
log, rand,
serde_json::{self, json},
tokio, RwLock,
tokio,
tokio::sync::RwLock,
};
use rmqtt::{
broker::{
Expand Down Expand Up @@ -125,7 +126,7 @@ impl ClusterPlugin {

//raft init ...
async fn start_raft(cfg: Arc<RwLock<PluginConfig>>, router: &'static ClusterRouter) -> Result<Mailbox> {
let raft_peer_addrs = cfg.read().raft_peer_addrs.clone();
let raft_peer_addrs = cfg.read().await.raft_peer_addrs.clone();

let id = Runtime::instance().node.id();
let raft_laddr = raft_peer_addrs
Expand All @@ -139,7 +140,7 @@ impl ClusterPlugin {
//verify the listening address
parse_addr(&raft_laddr).await?;

let raft = Raft::new(raft_laddr, router, logger, cfg.read().raft.to_raft_config())
let raft = Raft::new(raft_laddr, router, logger, cfg.read().await.raft.to_raft_config())
.map_err(|e| MqttError::StdError(Box::new(e)))?;
let mailbox = raft.mailbox();

Expand Down Expand Up @@ -251,7 +252,7 @@ impl Plugin for ClusterPlugin {

#[inline]
async fn get_config(&self) -> Result<serde_json::Value> {
self.cfg.read().to_json()
self.cfg.read().await.to_json()
}

#[inline]
Expand Down
51 changes: 26 additions & 25 deletions rmqtt-plugins/rmqtt-http-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use salvo::hyper::server::conn::AddrIncoming;
use salvo::prelude::*;

use rmqtt::{
anyhow, bytes, chrono, futures, log,
anyhow,
base64::{engine::general_purpose, Engine as _},
bytes, chrono, futures, log,
serde_json::{self, json},
tokio::sync::oneshot,
HashMap,
base64::{Engine as _, engine::general_purpose},
};
use rmqtt::{
broker::types::NodeId,
Expand Down Expand Up @@ -88,7 +89,7 @@ pub(crate) async fn listen_and_serve(
rx: oneshot::Receiver<()>,
) -> Result<()> {
let (reuseaddr, reuseport) = {
let cfg = cfg.read();
let cfg = cfg.read().await;
(cfg.http_reuseaddr, cfg.http_reuseport)
};
log::info!("HTTP API Listening on {}, reuseaddr: {}, reuseport: {}", laddr, reuseaddr, reuseport);
Expand Down Expand Up @@ -273,7 +274,7 @@ async fn list_apis(res: &mut Response) {
#[handler]
async fn api_logger(req: &mut Request, depot: &mut Depot) {
if let Some(cfg) = depot.obtain::<PluginConfigType>() {
if !cfg.read().http_request_log {
if !cfg.read().await.http_request_log {
return;
}
}
Expand Down Expand Up @@ -308,7 +309,7 @@ async fn api_logger(req: &mut Request, depot: &mut Depot) {
#[handler]
async fn get_brokers(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;

let id = req.param::<NodeId>("id");
if let Some(id) = id {
Expand Down Expand Up @@ -383,7 +384,7 @@ async fn _get_brokers(message_type: MessageType) -> Result<Vec<serde_json::Value
#[handler]
async fn get_nodes(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;

let id = req.param::<NodeId>("id");
if let Some(id) = id {
Expand Down Expand Up @@ -467,7 +468,7 @@ async fn check_health(_req: &mut Request, _depot: &mut Depot, res: &mut Response
#[handler]
async fn get_client(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
let clientid = req.param::<String>("clientid");
if let Some(clientid) = clientid {
match _get_client(message_type, &clientid).await {
Expand Down Expand Up @@ -513,8 +514,8 @@ async fn _get_client(message_type: MessageType, clientid: &str) -> Result<Option
#[handler]
async fn search_clients(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let max_row_limit = cfg.read().max_row_limit;
let message_type = cfg.read().await.message_type;
let max_row_limit = cfg.read().await.max_row_limit;
let mut q = match req.parse_queries::<ClientSearchParams>() {
Ok(q) => q,
Err(e) => return res.set_status_error(StatusError::bad_request().with_detail(e.to_string())),
Expand Down Expand Up @@ -602,7 +603,7 @@ async fn check_online(req: &mut Request, res: &mut Response) {
#[handler]
async fn query_subscriptions(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let max_row_limit = cfg.read().max_row_limit;
let max_row_limit = cfg.read().await.max_row_limit;
let mut q = match req.parse_queries::<SubsSearchParams>() {
Ok(q) => q,
Err(e) => return res.set_status_error(StatusError::bad_request().with_detail(e.to_string())),
Expand Down Expand Up @@ -636,7 +637,7 @@ async fn get_client_subscriptions(req: &mut Request, res: &mut Response) {
#[handler]
async fn get_routes(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let max_row_limit = cfg.read().max_row_limit;
let max_row_limit = cfg.read().await.max_row_limit;
let limit = req.query::<usize>("_limit");
let limit = if let Some(limit) = limit {
if limit > max_row_limit {
Expand Down Expand Up @@ -667,7 +668,7 @@ async fn get_route(req: &mut Request, res: &mut Response) {
#[handler]
async fn publish(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let http_laddr = cfg.read().http_laddr;
let http_laddr = cfg.read().await.http_laddr;

let remote_addr = req.remote_addr().and_then(|addr| {
if let Some(ipv4) = addr.as_ipv4() {
Expand Down Expand Up @@ -823,7 +824,7 @@ async fn subscribe(req: &mut Request, depot: &mut Depot, res: &mut Response) {
}
} else {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
//The session is on another node
#[allow(clippy::mutable_key_type)]
match _subscribe_on_other_node(message_type, node_id, params).await {
Expand Down Expand Up @@ -892,7 +893,7 @@ async fn unsubscribe(req: &mut Request, depot: &mut Depot, res: &mut Response) {
}
} else {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
//The session is on another node
match _unsubscribe_on_other_node(message_type, node_id, params).await {
Ok(()) => res.render(Text::Plain("ok")),
Expand Down Expand Up @@ -922,7 +923,7 @@ async fn _unsubscribe_on_other_node(
#[handler]
async fn all_plugins(depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;

match _all_plugins(message_type).await {
Ok(pluginss) => res.render(Json(pluginss)),
Expand Down Expand Up @@ -977,7 +978,7 @@ async fn _all_plugins(message_type: MessageType) -> Result<Vec<serde_json::Value
#[handler]
async fn node_plugins(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
let node_id = if let Some(node_id) = req.param::<NodeId>("node") {
node_id
} else {
Expand Down Expand Up @@ -1010,7 +1011,7 @@ async fn _node_plugins(node_id: NodeId, message_type: MessageType) -> Result<Vec
#[handler]
async fn node_plugin_info(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
let node_id = if let Some(node_id) = req.param::<NodeId>("node") {
node_id
} else {
Expand Down Expand Up @@ -1057,7 +1058,7 @@ async fn _node_plugin_info(
#[handler]
async fn node_plugin_config(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
let node_id = if let Some(node_id) = req.param::<NodeId>("node") {
node_id
} else {
Expand Down Expand Up @@ -1100,7 +1101,7 @@ async fn _node_plugin_config(node_id: NodeId, name: &str, message_type: MessageT
#[handler]
async fn node_plugin_config_reload(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
let node_id = if let Some(node_id) = req.param::<NodeId>("node") {
node_id
} else {
Expand Down Expand Up @@ -1138,7 +1139,7 @@ async fn _node_plugin_config_reload(node_id: NodeId, name: &str, message_type: M
#[handler]
async fn node_plugin_load(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
let node_id = if let Some(node_id) = req.param::<NodeId>("node") {
node_id
} else {
Expand Down Expand Up @@ -1176,7 +1177,7 @@ async fn _node_plugin_load(node_id: NodeId, name: &str, message_type: MessageTyp
#[handler]
async fn node_plugin_unload(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;
let node_id = if let Some(node_id) = req.param::<NodeId>("node") {
node_id
} else {
Expand Down Expand Up @@ -1214,7 +1215,7 @@ async fn _node_plugin_unload(node_id: NodeId, name: &str, message_type: MessageT
#[handler]
async fn get_stats_sum(depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;

match _get_stats_sum(message_type).await {
Ok(stats_sum) => res.render(Json(stats_sum)),
Expand Down Expand Up @@ -1274,7 +1275,7 @@ async fn _get_stats_sum(message_type: MessageType) -> Result<serde_json::Value>
#[handler]
async fn get_stats(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;

let id = req.param::<NodeId>("id");
if let Some(id) = id {
Expand Down Expand Up @@ -1371,7 +1372,7 @@ async fn _build_stats(id: NodeId, node_status: NodeStatus, stats: serde_json::Va
#[handler]
async fn get_metrics(req: &mut Request, depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;

let id = req.param::<NodeId>("id");
if let Some(id) = id {
Expand Down Expand Up @@ -1448,7 +1449,7 @@ async fn _get_metrics_all(message_type: MessageType) -> Result<Vec<serde_json::V
#[handler]
async fn get_metrics_sum(depot: &mut Depot, res: &mut Response) {
let cfg = depot.obtain::<PluginConfigType>().cloned().unwrap();
let message_type = cfg.read().message_type;
let message_type = cfg.read().await.message_type;

match _get_metrics_sum(message_type).await {
Ok(metrics_sum) => res.render(Json(metrics_sum)),
Expand Down
30 changes: 15 additions & 15 deletions rmqtt-plugins/rmqtt-http-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use config::PluginConfig;
use rmqtt::{
async_trait::async_trait,
log, serde_json,
tokio::{self, sync::oneshot},
RwLock,
tokio::{self, sync::oneshot, sync::RwLock},
};
use rmqtt::{
broker::hook::{Register, Type},
Expand Down Expand Up @@ -61,33 +60,34 @@ impl HttpApiPlugin {
async fn new<S: Into<String>>(runtime: &'static Runtime, name: S, descr: S) -> Result<Self> {
let name = name.into();
let cfg = Arc::new(RwLock::new(runtime.settings.plugins.load_config::<PluginConfig>(&name)?));
log::debug!("{} HttpApiPlugin cfg: {:?}", name, cfg.read());
log::debug!("{} HttpApiPlugin cfg: {:?}", name, cfg.read().await);
let register = runtime.extends.hook_mgr().await.register();
let shutdown_tx = Some(Self::start(runtime, cfg.clone()));
let shutdown_tx = Some(Self::start(runtime, cfg.clone()).await);
Ok(Self { runtime, name, descr: descr.into(), register, cfg, shutdown_tx })
}

fn start(_runtime: &'static Runtime, cfg: PluginConfigType) -> ShutdownTX {
async fn start(_runtime: &'static Runtime, cfg: PluginConfigType) -> ShutdownTX {
let (shutdown_tx, shutdown_rx): (oneshot::Sender<()>, oneshot::Receiver<()>) = oneshot::channel();

let workers = cfg.read().await.workers;
let http_laddr = cfg.read().await.http_laddr;
let _child = std::thread::Builder::new().name("http-api".to_string()).spawn(move || {
let cfg1 = cfg.clone();
let runner = async move {
let laddr = cfg1.read().http_laddr;
let laddr = cfg1.read().await.http_laddr;
if let Err(e) = api::listen_and_serve(laddr, cfg1, shutdown_rx).await {
log::error!("{:?}", e);
}
};

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.worker_threads(cfg.read().workers)
.worker_threads(workers)
.thread_name("http-api-worker")
.thread_stack_size(4 * 1024 * 1024)
.build()
.unwrap();
rt.block_on(runner);
log::info!("Exit HTTP API Server, ..., http://{:?}", cfg.read().http_laddr);
log::info!("Exit HTTP API Server, ..., http://{:?}", http_laddr);
});
shutdown_tx
}
Expand All @@ -98,7 +98,7 @@ impl Plugin for HttpApiPlugin {
#[inline]
async fn init(&mut self) -> Result<()> {
log::info!("{} init", self.name);
let mgs_type = self.cfg.read().message_type;
let mgs_type = self.cfg.read().await.message_type;
self.register.add(Type::GrpcMessageReceived, Box::new(handler::HookHandler::new(mgs_type))).await;
Ok(())
}
Expand All @@ -110,27 +110,27 @@ impl Plugin for HttpApiPlugin {

#[inline]
async fn get_config(&self) -> Result<serde_json::Value> {
self.cfg.read().to_json()
self.cfg.read().await.to_json()
}

#[inline]
async fn load_config(&mut self) -> Result<()> {
let new_cfg = self.runtime.settings.plugins.load_config::<PluginConfig>(&self.name)?;
if !self.cfg.read().changed(&new_cfg) {
if !self.cfg.read().await.changed(&new_cfg) {
return Ok(());
}
let restart_enable = self.cfg.read().restart_enable(&new_cfg);
let restart_enable = self.cfg.read().await.restart_enable(&new_cfg);
if restart_enable {
let new_cfg = Arc::new(RwLock::new(new_cfg));
if let Some(tx) = self.shutdown_tx.take() {
if let Err(e) = tx.send(()) {
log::warn!("shutdown_tx send fail, {:?}", e);
}
}
self.shutdown_tx = Some(Self::start(self.runtime, new_cfg.clone()));
self.shutdown_tx = Some(Self::start(self.runtime, new_cfg.clone()).await);
self.cfg = new_cfg;
} else {
*self.cfg.write() = new_cfg;
*self.cfg.write().await = new_cfg;
}

log::debug!("load_config ok, {:?}", self.cfg);
Expand Down
1 change: 0 additions & 1 deletion rmqtt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ bytestring = { version = "1", features = ["serde"] }
thiserror = "1.0"
anyhow = "1.0"
async-trait = "0.1"
parking_lot = "0.12"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand Down

0 comments on commit 8a95437

Please sign in to comment.