Skip to content

Commit

Permalink
Optimize the MqttError
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 1, 2023
1 parent 5caacb0 commit e9b6b22
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 83 deletions.
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-cluster-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl ClusterPlugin {
parse_addr(&raft_laddr).await?;

let raft = Raft::new(raft_laddr, router, logger, cfg.read().raft.to_raft_config())
.map_err(|e| MqttError::Error(Box::new(e)))?;
.map_err(|e| MqttError::StdError(Box::new(e)))?;
let mailbox = raft.mailbox();

let mut peer_addrs = Vec::new();
Expand All @@ -152,7 +152,7 @@ impl ClusterPlugin {
log::info!("peer_addrs: {:?}", peer_addrs);

let leader_info =
raft.find_leader_info(peer_addrs).await.map_err(|e| MqttError::Error(Box::new(e)))?;
raft.find_leader_info(peer_addrs).await.map_err(|e| MqttError::StdError(Box::new(e)))?;

// let (status_tx, status_rx) = futures::channel::oneshot::channel::<Result<Status>>();
let _child = std::thread::Builder::new().name("cluster-raft".to_string()).spawn(move || {
Expand Down
2 changes: 1 addition & 1 deletion rmqtt-plugins/rmqtt-web-hook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl Plugin for WebHookPlugin {
let new_tx = Self::start(self.runtime, new_cfg.clone(), self.writers.clone());
if let Err(e) = self.tx.write().try_send(Message::Exit) {
log::error!("restart web-hook failed, {:?}", e);
return Err(MqttError::Error(Box::new(e)));
return Err(MqttError::StdError(Box::new(e)));
}
self.cfg = new_cfg;
*self.tx.write() = new_tx;
Expand Down
102 changes: 22 additions & 80 deletions rmqtt/src/broker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,40 @@ pub enum MqttError {
Timeout(Duration),
#[error("send packet error, {0}")]
SendPacketError(SendPacketError),
#[error("send error, {0}")]
SendError(String),
#[error("{0}")]
Error(Box<dyn std::error::Error + Send + Sync>),
JoinError(#[from] JoinError),
#[error("{0}")]
IoError(std::io::Error),
StdError(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("{0}")]
Error(String),
#[error("{0}")]
IoError(#[from] std::io::Error),
#[error("{0}")]
WSError(#[from] tokio_tungstenite::tungstenite::Error),
#[error("{0}")]
TokioTryLockError(#[from] tokio::sync::TryLockError),
#[error("{0}")]
Msg(String),
#[error("{0}")]
Anyhow(anyhow::Error),
Anyhow(#[from] anyhow::Error),
#[error("gprc error: `{0}`")]
Grpc(#[from] tonic::transport::Error),
#[error("{0}")]
Json(serde_json::Error),
Json(#[from] serde_json::Error),
#[error("topic error, {0}")]
TopicError(String),
#[error("utf8 error, {0}")]
Utf8Error(Utf8Error),
Utf8Error(#[from] Utf8Error),
#[error("too many subscriptions")]
TooManySubscriptions,
#[error("{0}")]
ConfigError(ConfigError),
ConfigError(#[from] ConfigError),
#[error("{0}")]
AddrParseError(AddrParseError),
AddrParseError(#[from] AddrParseError),
#[error("{0}")]
ParseIntError(ParseIntError),
ParseIntError(#[from] ParseIntError),
#[error("listener config is error")]
ListenerConfigError,
#[error("{0}")]
Expand Down Expand Up @@ -93,66 +105,17 @@ impl From<TopicError> for MqttError {
}
}

impl From<Utf8Error> for MqttError {
#[inline]
fn from(e: Utf8Error) -> Self {
MqttError::Utf8Error(e)
}
}

impl From<anyhow::Error> for MqttError {
#[inline]
fn from(e: anyhow::Error) -> Self {
MqttError::Anyhow(e)
}
}

impl From<serde_json::Error> for MqttError {
#[inline]
fn from(e: serde_json::Error) -> Self {
MqttError::Json(e)
}
}

impl From<std::io::Error> for MqttError {
#[inline]
fn from(e: std::io::Error) -> Self {
MqttError::IoError(e)
}
}

impl<T: Send + Sync + core::fmt::Debug> From<SendError<T>> for MqttError {
#[inline]
fn from(e: SendError<T>) -> Self {
MqttError::Msg(format!("{:?}", e))
}
}

impl From<JoinError> for MqttError {
#[inline]
fn from(e: JoinError) -> Self {
MqttError::Msg(format!("{:?}", e))
}
}

impl From<Box<dyn std::error::Error + Send + Sync>> for MqttError {
#[inline]
fn from(e: Box<dyn std::error::Error + Send + Sync>) -> Self {
MqttError::Error(e)
MqttError::SendError(format!("{:?}", e))
}
}

impl From<Box<dyn std::error::Error>> for MqttError {
#[inline]
fn from(e: Box<dyn std::error::Error>) -> Self {
MqttError::Msg(e.to_string())
}
}

impl From<tokio_tungstenite::tungstenite::Error> for MqttError {
#[inline]
fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
MqttError::Error(Box::new(e))
MqttError::Error(e.to_string())
}
}

Expand All @@ -172,30 +135,9 @@ impl std::convert::TryFrom<MqttError> for v5::PublishResult {
}
}

impl From<ConfigError> for MqttError {
#[inline]
fn from(e: ConfigError) -> Self {
MqttError::ConfigError(e)
}
}

impl From<AddrParseError> for MqttError {
#[inline]
fn from(e: AddrParseError) -> Self {
MqttError::AddrParseError(e)
}
}

impl From<MqttError> for tonic::Status {
#[inline]
fn from(e: MqttError) -> Self {
tonic::Status::new(tonic::Code::Unavailable, format!("{:?}", e))
}
}

impl From<tokio::sync::TryLockError> for MqttError {
#[inline]
fn from(e: tokio::sync::TryLockError) -> Self {
MqttError::Anyhow(anyhow::Error::new(e))
}
}

0 comments on commit e9b6b22

Please sign in to comment.