Skip to content

Commit

Permalink
Add the "time" field to the hook message body
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jul 21, 2023
1 parent 44d547b commit 6e83d56
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions rmqtt-plugins/rmqtt-web-hook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,15 +502,21 @@ impl JsonFrom for Id {
impl Handler for WebHookHandler {
async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
let typ = param.get_type();

let now = chrono::Local::now();
let now_time = now.format("%Y-%m-%d %H:%M:%S%.3f").to_string();
let bodys = match param {
Parameter::ClientConnect(conn_info) => {
vec![(None, conn_info.to_body())]
let mut body = conn_info.to_body();
if let Some(obj) = body.as_object_mut() {
obj.insert("time".into(), serde_json::Value::String(now_time));
}
vec![(None, body)]
}
Parameter::ClientConnack(conn_info, conn_ack) => {
let mut body = conn_info.to_body();
if let Some(obj) = body.as_object_mut() {
obj.insert("conn_ack".into(), serde_json::Value::String(conn_ack.reason().to_string()));
obj.insert("time".into(), serde_json::Value::String(now_time));
}
vec![(None, body)]
}
Expand All @@ -523,6 +529,7 @@ impl Handler for WebHookHandler {
serde_json::Value::Number(serde_json::Number::from(client.connected_at)),
);
obj.insert("session_present".into(), serde_json::Value::Bool(client.session_present));
obj.insert("time".into(), serde_json::Value::String(now_time));
}
vec![(None, body)]
}
Expand All @@ -534,7 +541,8 @@ impl Handler for WebHookHandler {
"clientid": client.id.client_id,
"username": client.id.username,
"disconnected_at": client.disconnected_at(),
"reason": reason
"reason": reason,
"time": now_time
});
vec![(None, body)]
}
Expand All @@ -549,6 +557,7 @@ impl Handler for WebHookHandler {
"opts": json!({
"qos": subscribe.qos.value()
}),
"time": now_time
});
vec![(Some(subscribe.topic_filter.clone()), body)]
}
Expand All @@ -560,6 +569,7 @@ impl Handler for WebHookHandler {
"clientid": client.id.client_id,
"username": client.id.username,
"topic": unsubscribe.topic_filter,
"time": now_time
});
vec![(Some(unsubscribe.topic_filter.clone()), body)]
}
Expand All @@ -574,6 +584,7 @@ impl Handler for WebHookHandler {
"opts": json!({
"qos": subscribe.qos.value()
}),
"time": now_time
});
vec![(Some(subscribe.topic_filter.clone()), body)]
}
Expand All @@ -586,6 +597,7 @@ impl Handler for WebHookHandler {
"clientid": client.id.client_id,
"username": client.id.username,
"topic": topic,
"time": now_time
});
vec![(Some(topic), body)]
}
Expand All @@ -597,6 +609,7 @@ impl Handler for WebHookHandler {
"clientid": client.id.client_id,
"username": client.id.username,
"created_at": session.created_at,
"time": now_time
});
vec![(None, body)]
}
Expand All @@ -607,7 +620,8 @@ impl Handler for WebHookHandler {
"ipaddress": client.id.remote_addr,
"clientid": client.id.client_id,
"username": client.id.username,
"reason": reason
"reason": reason,
"time": now_time
});
vec![(None, body)]
}
Expand All @@ -622,6 +636,7 @@ impl Handler for WebHookHandler {
"packet_id": publish.packet_id(),
"payload": base64::encode(publish.payload()),
"ts": publish.create_time(),
"time": now_time
});
let body = client.id.from(body);
vec![(Some(topic.clone()), body)]
Expand All @@ -637,7 +652,8 @@ impl Handler for WebHookHandler {
"packet_id": publish.packet_id(),
"payload": base64::encode(publish.payload()),
"pts": publish.create_time(),
"ts": chrono::Local::now().timestamp_millis(),
"ts": now.timestamp_millis(),
"time": now_time
});
let body = client.id.to(body);
let body = from.from(body);
Expand All @@ -654,7 +670,8 @@ impl Handler for WebHookHandler {
"packet_id": publish.packet_id(),
"payload": base64::encode(publish.payload()),
"pts": publish.create_time(),
"ts": chrono::Local::now().timestamp_millis(),
"ts": now.timestamp_millis(),
"time": now_time
});
let body = client.id.to(body);
let body = from.from(body);
Expand All @@ -671,7 +688,8 @@ impl Handler for WebHookHandler {
"payload": base64::encode(publish.payload()),
"reason": reason,
"pts": publish.create_time(),
"ts": chrono::Local::now().timestamp_millis(),
"ts": now.timestamp_millis(),
"time": now_time
});
let mut body = from.from(body);
if let Some(to) = to {
Expand All @@ -691,6 +709,7 @@ impl Handler for WebHookHandler {
if !bodys.is_empty() {
let mut tx = self.tx.read().clone();
for (topic, body) in bodys {
//chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f")
if let Err(e) = tx.try_send(Message::Body(typ, topic, body)) {
log::warn!("web-hook send error, typ: {:?}, {:?}", typ, e);
}
Expand Down

0 comments on commit 6e83d56

Please sign in to comment.