From b61725ee53d3fc3cafbba25bdfe8af97f42b8c8e Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 18 Jan 2024 02:29:28 -0500 Subject: [PATCH] * NEW [hook] Update the key to exchange. key=hash(clientid+topic+id). Signed-off-by: wanghaemq --- nanomq/webhook_post.c | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/nanomq/webhook_post.c b/nanomq/webhook_post.c index 4e44cc276..5fdf07ce1 100644 --- a/nanomq/webhook_post.c +++ b/nanomq/webhook_post.c @@ -246,7 +246,24 @@ webhook_client_disconnect(nng_socket *sock, conf_web_hook *hook_conf, return rv; } -// uint64_t g_index = 0; +static inline uint64_t +gen_hash_nearby_key(char *clientid, char *topic, uint32_t pid) +{ + uint64_t res = 0; + char buf[10]; // should be enough put a u32 + sprintf(buf, "%ld", pid); + + char buf2[100]; // should be enough put the clienid and topic and pid + sprintf(buf2, "%s%s%s", clientid, topic, buf); + + uint32_t key32 = DJBHash(buf2); + res = key32; res = res << 32; res += key32; + // log_info("%s - %s - %s => %lx %llx", clientid, topic, buf, key32, res); + // nng_time ts = nng_timestamp(); + return res; +} + +static uint32_t g_inc_id = 0; inline int hook_entry(nano_work *work, uint8_t reason) @@ -267,13 +284,23 @@ hook_entry(nano_work *work, uint8_t reason) nng_msg_alloc(&msg, 0); nng_msg_header_append(msg, nng_msg_header(work->msg), nng_msg_header_len(work->msg)); nng_msg_append(msg, nng_msg_body(work->msg), nng_msg_len(work->msg)); + uint8_t *body_ptr = nng_msg_body(work->msg); ptrdiff_t offset = (ptrdiff_t)(nng_msg_payload_ptr(work->msg) - body_ptr); nng_msg_set_payload_ptr(msg, (uint8_t *)nng_msg_body(msg) + offset); - nng_time ts = nng_timestamp(); - // nng_mtx_lock(hook_conf->ex_mtx); - // nng_time ts = g_index ++; - // nng_mtx_unlock(hook_conf->ex_mtx); + + // cparam has cloned at outside of hook_entry + char *clientid = (char *)conn_param_get_clientid(work->cparam); + if (clientid == NULL) + goto done; + char *topic = work->pub_packet->var_header.publish.topic_name.body; + if (topic == NULL) + goto done; + nng_mtx_lock(hook_conf->ex_mtx); + uint32_t pid = g_inc_id ++; + nng_mtx_unlock(hook_conf->ex_mtx); + + nng_time ts = (nng_time)gen_hash_nearby_key(clientid, topic, pid); nng_msg_set_timestamp(msg, ts); for (size_t i = 0; i < ex_conf->count; i++) { @@ -325,6 +352,8 @@ hook_entry(nano_work *work, uint8_t reason) default: break; } + +done: // Do not let online event msg trigger webhook work->flag = 0; return rv;