Skip to content

Commit

Permalink
Fix the issue with the HTTP API: /api/v1/subscriptions where the form…
Browse files Browse the repository at this point in the history
…at of the 'opts' field in the response is incorrect.
  • Loading branch information
rmqtt committed Oct 23, 2023
1 parent 443bc16 commit c9759c7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
11 changes: 10 additions & 1 deletion rmqtt-plugins/rmqtt-http-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,15 @@ async fn query_subscriptions(req: &mut Request, depot: &mut Depot, res: &mut Res
if q._limit == 0 || q._limit > max_row_limit {
q._limit = max_row_limit;
}
let replys = Runtime::instance().extends.shared().await.query_subscriptions(q).await;
let replys = Runtime::instance()
.extends
.shared()
.await
.query_subscriptions(q)
.await
.into_iter()
.map(|res| res.to_json())
.collect::<Vec<serde_json::Value>>();
res.render(Json(replys));
}

Expand All @@ -625,6 +633,7 @@ async fn get_client_subscriptions(req: &mut Request, res: &mut Response) {
.await
.entry(Id::from(Runtime::instance().node.id(), ClientId::from(clientid)));
if let Some(subs) = entry.subscriptions().await {
let subs = subs.into_iter().map(|res| res.to_json()).collect::<Vec<serde_json::Value>>();
res.render(Json(subs));
} else {
res.set_status_code(StatusCode::NOT_FOUND)
Expand Down
8 changes: 4 additions & 4 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl super::Entry for LockEntry {
clientid: s.id.client_id.clone(),
client_addr: s.id.remote_addr,
topic: TopicFilter::from(topic_filter.as_ref()),
opts: opts.to_json(),
opts: opts.clone(),
})
.collect::<Vec<_>>();
Some(subs)
Expand Down Expand Up @@ -756,7 +756,7 @@ impl DefaultRouter {
clientid: client_id.clone(),
client_addr: id.remote_addr,
topic: topic_filter.clone(),
opts: opts.to_json(),
opts: opts.clone(),
})
} else {
None
Expand Down Expand Up @@ -804,7 +804,7 @@ impl DefaultRouter {
clientid: client_id.clone(),
client_addr: id.remote_addr,
topic: topic_filter.clone(),
opts: opts.to_json(),
opts: opts.clone(),
})
} else {
None
Expand Down Expand Up @@ -839,7 +839,7 @@ impl DefaultRouter {
clientid: client_id.clone(),
client_addr: id.remote_addr,
topic: topic_filter.clone(),
opts: opts.to_json(),
opts: opts.clone(),
})
} else {
None
Expand Down
15 changes: 14 additions & 1 deletion rmqtt/src/broker/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,20 @@ pub struct SubsSearchResult {
pub clientid: ClientId,
pub client_addr: Option<SocketAddr>,
pub topic: TopicFilter,
pub opts: serde_json::Value, //SubscriptionOptions
pub opts: SubscriptionOptions,
}

impl SubsSearchResult {
#[inline]
pub fn to_json(self) -> serde_json::Value {
json!({
"node_id": self.node_id,
"clientid": self.clientid,
"client_addr": self.client_addr,
"topic": self.topic,
"opts": self.opts.to_json(),
})
}
}

#[derive(Deserialize, Serialize, Debug, Default, PartialEq, Eq, Hash, Clone)]
Expand Down

0 comments on commit c9759c7

Please sign in to comment.