Skip to content

Commit

Permalink
Change tokio channel to futures channel
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jul 5, 2023
1 parent 662ed4a commit 16a9d7a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-http-api/src/subs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) async fn subscribe(params: SubscribeParams) -> Result<HashMap<TopicFi
for sub in subs {
let topic_filter = sub.topic_filter.clone();
let (reply_tx, reply_rx) = oneshot::channel();
let send_reply = tx.clone().send(MqttMessage::Subscribe(sub, reply_tx));
let send_reply = tx.unbounded_send(MqttMessage::Subscribe(sub, reply_tx));

let reply_fut = async move {
let reply = if let Err(send_err) = send_reply {
Expand Down Expand Up @@ -59,7 +59,7 @@ pub(crate) async fn unsubscribe(params: UnsubscribeParams) -> Result<()> {

let unsub = Unsubscribe::from(&topic_filter, shared_sub_supported)?;
let (reply_tx, reply_rx) = oneshot::channel();
tx.send(MqttMessage::Unsubscribe(unsub, reply_tx))?;
tx.unbounded_send(MqttMessage::Unsubscribe(unsub, reply_tx)).map_err(anyhow::Error::new)?;
reply_rx.await.map_err(anyhow::Error::new)??;
Ok(())
}
10 changes: 5 additions & 5 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl super::Entry for LockEntry {

if let Some(peer_tx) = self.tx().and_then(|tx| if tx.is_closed() { None } else { Some(tx) }) {
let (tx, rx) = oneshot::channel();
if let Ok(()) = peer_tx.send(Message::Kick(tx, self.id.clone(), is_admin)) {
if let Ok(()) = peer_tx.unbounded_send(Message::Kick(tx, self.id.clone(), is_admin)) {
match tokio::time::timeout(Duration::from_secs(5), rx).await {
Ok(Ok(())) => {
log::debug!("{:?} kicked, from {:?}", self.id, self.client().map(|c| c.id.clone()));
Expand Down Expand Up @@ -291,9 +291,9 @@ impl super::Entry for LockEntry {
log::warn!("{:?} forward, from:{:?}, error: Tx is None", self.id, from);
return Err((from, p, Reason::from_static("Tx is None")));
};
if let Err(e) = tx.send(Message::Forward(from, p)) {
if let Err(e) = tx.unbounded_send(Message::Forward(from, p)) {
log::warn!("{:?} forward, error: {:?}", self.id, e);
if let Message::Forward(from, p) = e.0 {
if let Message::Forward(from, p) = e.into_inner() {
return Err((from, p, Reason::from_static("Tx is closed")));
}
}
Expand Down Expand Up @@ -457,7 +457,7 @@ impl Shared for &'static DefaultShared {
continue;
};

if let Err(e) = tx.send(Message::Forward(from.clone(), p)) {
if let Err(e) = tx.unbounded_send(Message::Forward(from.clone(), p)) {
log::warn!(
"forwards, from:{:?}, to:{:?}, topic_filter:{:?}, topic:{:?}, error:{:?}",
from,
Expand All @@ -466,7 +466,7 @@ impl Shared for &'static DefaultShared {
publish.topic,
e
);
if let Message::Forward(from, p) = e.0 {
if let Message::Forward(from, p) = e.into_inner() {
errs.push((to, from, p, Reason::from_static("Connection Tx is closed")));
}
}
Expand Down
13 changes: 6 additions & 7 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::sync::Arc;

use futures::StreamExt;
use ntex_mqtt::types::MQTT_LEVEL_5;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::time::{Duration, Instant};

Expand Down Expand Up @@ -62,7 +61,7 @@ impl SessionState {

#[inline]
pub(crate) async fn start(mut self, keep_alive: u16) -> (Self, Tx) {
let (msg_tx, mut msg_rx) = mpsc::unbounded_channel();
let (msg_tx, mut msg_rx) = futures::channel::mpsc::unbounded();
self.tx.replace(msg_tx.clone());
let mut state = self.clone();
ntex::rt::spawn(async move {
Expand Down Expand Up @@ -120,7 +119,7 @@ impl SessionState {
state.client.add_disconnected_reason(Reason::from_static("Timeout(Read/Write)")).await;
break
},
msg = msg_rx.recv() => {
msg = msg_rx.next() => {
log::debug!("{:?} recv msg: {:?}", id, msg);
if let Some(msg) = msg{
match msg{
Expand Down Expand Up @@ -288,7 +287,7 @@ impl SessionState {

loop {
tokio::select! {
msg = msg_rx.recv() => {
msg = msg_rx.next() => {
log::debug!("{:?} recv offline msg: {:?}", id, msg);
if let Some(msg) = msg{
match msg{
Expand Down Expand Up @@ -333,8 +332,8 @@ impl SessionState {
#[inline]
pub(crate) async fn forward(&self, from: From, p: Publish) {
let res = if let Some(ref tx) = self.tx {
if let Err(e) = tx.send(Message::Forward(from, p)) {
if let Message::Forward(from, p) = e.0 {
if let Err(e) = tx.unbounded_send(Message::Forward(from, p)) {
if let Message::Forward(from, p) = e.into_inner() {
Err((from, p, "Send Publish message error, Tx is closed"))
} else {
Ok(())
Expand All @@ -361,7 +360,7 @@ impl SessionState {
#[inline]
pub(crate) fn send(&self, msg: Message) -> Result<()> {
if let Some(ref tx) = self.tx {
tx.send(msg)?;
tx.unbounded_send(msg).map_err(anyhow::Error::new)?;
Ok(())
} else {
Err(MqttError::from("Message Sender is None"))
Expand Down
5 changes: 2 additions & 3 deletions rmqtt/src/broker/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub use ntex_mqtt::v5::{
};
use serde::de::{Deserialize, Deserializer};
use serde::ser::{Serialize, SerializeStruct, Serializer};
use tokio::sync::mpsc;
use tokio::sync::oneshot;

use crate::{MqttError, Result, Runtime};
Expand Down Expand Up @@ -58,8 +57,8 @@ pub type IsOnline = bool;
pub type IsAdmin = bool;
pub type LimiterName = u16;

pub type Tx = mpsc::UnboundedSender<Message>;
pub type Rx = mpsc::UnboundedReceiver<Message>;
pub type Tx = futures::channel::mpsc::UnboundedSender<Message>;
pub type Rx = futures::channel::mpsc::UnboundedReceiver<Message>;

pub type DashSet<V> = dashmap::DashSet<V, ahash::RandomState>;
pub type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
Expand Down

0 comments on commit 16a9d7a

Please sign in to comment.