Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support handle msgs from external in HOOK_WAIT state. #1572

Merged
merged 15 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 273 additions & 15 deletions nanomq/webhook_inproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@
#include "nng/protocol/pipeline0/pull.h"
#include "nng/protocol/pipeline0/push.h"
#include "nng/supplemental/http/http.h"
#include "nng/supplemental/nanolib/cJSON.h"
#include "nng/supplemental/nanolib/conf.h"
#include "nng/supplemental/nanolib/log.h"
#include "nng/supplemental/nanolib/utils.h"
#include "nng/supplemental/util/platform.h"

#include "nng/mqtt/mqtt_client.h"

#ifdef SUPP_PARQUET
#include "nng/supplemental/nanolib/parquet.h"
#endif

#define NANO_LMQ_INIT_CAP 16
#define EXTERNAL2NANO_IPC "IPC://EXTERNAL2NANO:"

Expand All @@ -41,12 +48,131 @@
uint32_t id;
bool busy;
conf_exchange *exchange;
nng_socket *mqtt_sock;
};

static void webhook_cb(void *arg);

static nng_thread *inproc_thr;

static int
send_mqtt_msg_cat(nng_socket *sock, const char *topic, nng_msg **msgs, uint32_t len)
{
int rv;
nng_msg *pubmsg;
uint32_t sz = 0;
for (int i=0; i<len; ++i) {
uint32_t diff;
diff = nng_msg_len(msgs[i]) -
((uintptr_t)nng_msg_payload_ptr(msgs[i]) - (uintptr_t) nng_msg_body(msgs[i]));
sz += diff;
}
char *buf = nng_alloc(sizeof(char) * sz);
int pos = 0;
for (int i=0; i<len; ++i) {
uint32_t diff;
diff = nng_msg_len(msgs[i]) -
JaylinYu marked this conversation as resolved.
Show resolved Hide resolved
((uintptr_t)nng_msg_payload_ptr(msgs[i]) - (uintptr_t) nng_msg_body(msgs[i]));
if (sz >= pos + diff)
memcpy(buf + pos, nng_msg_payload_ptr(msgs[i]), diff);
else
log_error("buffer overflow!");
pos += diff;
}

nng_mqtt_msg_alloc(&pubmsg, 0);
nng_mqtt_msg_set_packet_type(pubmsg, NNG_MQTT_PUBLISH);
nng_mqtt_msg_set_publish_qos(pubmsg, 1);
nng_mqtt_msg_set_publish_retain(pubmsg, 0);
nng_mqtt_msg_set_publish_payload(pubmsg, (uint8_t *) buf, pos);
nng_mqtt_msg_set_publish_topic(pubmsg, topic);
// property *plist = mqtt_property_alloc();
// nng_mqtt_msg_set_publish_property(pubmsg, plist);
Dismissed Show dismissed Hide dismissed

// log_info("Publishing to '%s' '%s'...\n", topic, buf);
Dismissed Show dismissed Hide dismissed

if ((rv = nng_sendmsg(*sock, pubmsg, 0)) != 0) {
log_error("nng_sendmsg", rv);
}
nng_free(buf, pos);
return rv;
}

#ifdef SUPP_PARQUET

static char *
get_file_bname(char *fpath)
Fixed Show fixed Hide fixed
{
char * bname;
#ifdef _WIN32
if ((bname = malloc(strlen(fpath)+16)) == NULL) return NULL;
char ext[16];
_splitpath_s(fpath,
NULL, 0, // Don't need drive
NULL, 0, // Don't need directory
bname, strlen(fpath) + 15, // just the filename
ext , 15);
strncpy(bname+strlen(bname), ext, 15);
#else
#include <libgen.h>
// strcpy(bname, basename(fpath));

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
bname = basename(fpath);
#endif
return bname;
}

static int
send_mqtt_msg_file(nng_socket *sock, const char *topic, const char **fpaths, uint32_t len)
Fixed Show fixed Hide fixed
{
int rv;
const char ** filenames = malloc(sizeof(char *) * len);
for (int i=0; i<len; ++i) {
filenames[i] = get_file_bname((char *)fpaths[i]);
}

// Create a json as payload to trigger file transport
cJSON *obj = cJSON_CreateObject();
cJSON *files_obj = cJSON_CreateStringArray(fpaths, len);
cJSON_AddItemToObject(obj, "files", files_obj);
if (!files_obj)
return -1;

cJSON *filenames_obj = cJSON_CreateStringArray(filenames, len);
if (!filenames_obj)
return -1;
cJSON_AddItemToObject(obj, "filenames", filenames_obj);
cJSON * delete_obj = cJSON_AddNumberToObject(obj, "delete", -1);
Fixed Show fixed Hide fixed

char *buf = cJSON_PrintUnformatted(obj);
cJSON_Delete(obj);
for (int i=0; i<len; ++i)
filenames[i];
Fixed Show fixed Hide fixed
free(filenames);

// create a PUBLISH message
nng_msg *pubmsg;
nng_mqtt_msg_alloc(&pubmsg, 0);
nng_mqtt_msg_set_packet_type(pubmsg, NNG_MQTT_PUBLISH);
nng_mqtt_msg_set_publish_dup(pubmsg, 0);
nng_mqtt_msg_set_publish_qos(pubmsg, 0);
nng_mqtt_msg_set_publish_retain(pubmsg, 0);
nng_mqtt_msg_set_publish_payload(
pubmsg, (uint8_t *) buf, strlen(buf));
nng_mqtt_msg_set_publish_topic(pubmsg, topic);
// property *plist = mqtt_property_alloc();
// nng_mqtt_msg_set_publish_property(pubmsg, plist);
Dismissed Show dismissed Hide dismissed

log_info("Publishing to '%s' '%s'...\n", topic, buf);
if ((rv = nng_sendmsg(*sock, pubmsg, 0)) != 0) {
log_error("nng_sendmsg", rv);
}
free(buf);

return rv;
}

#endif

static void
send_msg(conf_web_hook *conf, nng_msg *msg)
{
Expand All @@ -72,8 +198,8 @@
nng_http_client_connect(client, aio);

// Wait for it to finish.

nng_aio_wait(aio);

if ((rv = nng_aio_result(aio)) != 0) {
log_error("Connect failed: %s", nng_strerror(rv));
nng_aio_finish_sync(aio, rv);
Expand Down Expand Up @@ -188,19 +314,8 @@
body = (char *) nng_msg_body(msg);
if (nng_msg_len(msg) > strlen(EXTERNAL2NANO_IPC) &&
0 == strncmp(body, EXTERNAL2NANO_IPC, strlen(EXTERNAL2NANO_IPC))) {
log_warn("I got a msg from ekuiper!");
// Update the position
body += strlen(EXTERNAL2NANO_IPC);

cJSON *root = cJSON_Parse(body);
uint32_t key = cJSON_GetObjectItem(root,"key")->valueint;
uint32_t offset = cJSON_GetObjectItem(root,"offset")->valueint;
log_warn("key %ld offset %ld", key, offset);
// Get msgs from exchange then send in HOOK_WAIT

cJSON_Delete(root);
nng_msg_free(msg);
work->state = HOOK_WAIT;
nng_aio_finish(work->aio, 0);
break;
}

Expand All @@ -219,14 +334,134 @@
nng_recv_aio(work->sock, work->aio);
break;
case HOOK_WAIT:
//MQ
// Search on MQ and Parquet
work->msg = nng_aio_get_msg(work->aio);
msg = work->msg;
work->msg = NULL;

// TODO match exchange with IPC msg (by MQ name)
nng_socket *ex_sock = exconf->nodes[0]->sock;
if (exconf->count == 0) {
log_error("Exchange is not enabled");
nng_msg_free(msg);
goto skip;
}

body = (char *) nng_msg_body(msg);
// Update the position
body += strlen(EXTERNAL2NANO_IPC);

cJSON *root = cJSON_Parse(body);
uint32_t key = cJSON_GetObjectItem(root,"key")->valueint;
uint32_t offset = cJSON_GetObjectItem(root,"offset")->valueint;
log_warn("key %ld offset %ld", key, offset);

nng_aio *aio;
nng_aio_alloc(&aio, NULL, NULL);
nng_aio_set_prov_data(aio, (void *)(uintptr_t)key);
nng_aio_set_msg(aio, (nng_msg *)(uintptr_t)offset);
// search msgs from MQ
nng_recv_aio(*ex_sock, aio);

nng_aio_wait(aio);
if (nng_aio_result(aio) != 0) {
nng_aio_free(aio);
goto skip;
}

nng_msg **msgs_res = (nng_msg **)nng_aio_get_msg(aio);
uint32_t msgs_len = (uintptr_t)nng_aio_get_prov_data(aio);
nng_aio_free(aio);

// Get msgs and send to localhost:1883 to active handler
if (msgs_len > 0 && msgs_res != NULL) {
log_info("Publishing %ld msgs took from exchange...", msgs_len);

// TODO NEED Clone before took from exchange instead of here
for (int i=0; i<msgs_len; ++i)
nng_msg_clone(msgs_res[i]);

send_mqtt_msg_cat(work->mqtt_sock, "$file/upload/webhook", msgs_res, msgs_len);

for (int i=0; i<msgs_len; ++i)
nng_msg_free(msgs_res[i]);
nng_free(msgs_res, sizeof(nng_msg *) * msgs_len);
#ifdef SUPP_PARQUET
} else {
// TODO Ask Parquet
// Get file names and send to localhost:1883 to active handler
log_info("Ask parquet ...", msgs_len);
const char **fnames = NULL;
uint32_t sz;
if (offset == 0) {
sz = 1;
const char *fname = parquet_find(key);
if (fname) {
fnames = malloc(sizeof(char *) * sz);
fnames[0] = fname;
}
} else {
fnames = parquet_find_span(key, offset, &sz);
}
if (fnames) {
send_mqtt_msg_file(work->mqtt_sock, "$file/upload/webhook", fnames, sz);
for (int i=0; i<(int)sz; ++i)
nng_free((void *)fnames[i], 0);
nng_free(fnames, sz);
}
#endif
}

cJSON_Delete(root);
nng_msg_free(msg);
skip:
// Start next recv
work->state = HOOK_RECV;
nng_recv_aio(work->sock, work->aio);
break;
default:
NANO_NNG_FATAL("bad state!", NNG_ESTATE);
break;
}
}

static nng_msg *
create_connect_msg()
{
// create a CONNECT message
/* CONNECT */
nng_msg *connmsg;
nng_mqtt_msg_alloc(&connmsg, 0);
nng_mqtt_msg_set_packet_type(connmsg, NNG_MQTT_CONNECT);
nng_mqtt_msg_set_connect_proto_version(connmsg, 4);
nng_mqtt_msg_set_connect_client_id(connmsg, "hook-trigger");
nng_mqtt_msg_encode(connmsg);
return connmsg;
}

static void
trigger_tcp_disconnect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
{
int reason = 0;
// get disconnect reason
nng_pipe_get_int(p, NNG_OPT_MQTT_DISCONNECT_REASON, &reason);
// property *prop;
// nng_pipe_get_ptr(p, NNG_OPT_MQTT_DISCONNECT_PROPERTY, &prop);
Dismissed Show dismissed Hide dismissed
log_warn("bridge client disconnected! RC [%d] \n", reason);
}

static void
trigger_tcp_connect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
{
int reason = 0;
// get connect reason
nng_pipe_get_int(p, NNG_OPT_MQTT_CONNECT_REASON, &reason);
// get property for MQTT V5
// property *prop;
// nng_pipe_get_ptr(p, NNG_OPT_MQTT_CONNECT_PROPERTY, &prop);
Dismissed Show dismissed Hide dismissed
log_info("trigger connected! RC [%d]", reason);
}

static struct hook_work *
alloc_work(nng_socket sock, conf_web_hook *conf, conf_exchange *exconf)
{
Expand Down Expand Up @@ -263,6 +498,7 @@
{
conf *conf = arg;
nng_socket sock;
nng_socket mqtt_sock;
struct hook_work **works =
nng_zalloc(conf->web_hook.pool_size * sizeof(struct hook_work *));

Expand All @@ -272,13 +508,35 @@
/* Create the socket. */
rv = nng_pull0_open(&sock);
if (rv != 0) {
log_error("nng_rep0_open %d", rv);
log_error("nng_pull0_open %d", rv);
return;
}

/* Create a mqtt sock */
rv = nng_mqtt_client_open(&mqtt_sock);
if (rv != 0) {
log_error("nng_mqtt_client_open %d", rv);
return;
}
nng_dialer dialer;
// need to expose url
if ((rv = nng_dialer_create(&dialer, mqtt_sock, "mqtt-tcp://127.0.0.1:1883"))) {
log_error("nng_dialer_create failed %d", rv);
return;
}
nng_msg *connmsg = create_connect_msg();
if (0 != nng_dialer_set_ptr(dialer, NNG_OPT_MQTT_CONNMSG, connmsg)) {
log_warn("Error in updating connmsg");
}
nng_mqtt_set_connect_cb(mqtt_sock, trigger_tcp_connect_cb, NULL);
nng_mqtt_set_disconnect_cb(mqtt_sock, trigger_tcp_disconnect_cb, NULL);

nng_dialer_start(dialer, NNG_FLAG_NONBLOCK);

for (i = 0; i < conf->web_hook.pool_size; i++) {
works[i] = alloc_work(sock, &conf->web_hook, &conf->exchange);
works[i]->id = i;
works[i]->mqtt_sock = &mqtt_sock;

Check warning

Code scanning / CodeQL

Local variable address stored in non-local memory Warning

A stack address (
source
) may be assigned to a non-local variable.
}
// NanoMQ core thread talks to others via INPROC
if ((rv = nng_listen(sock, WEB_HOOK_INPROC_URL, NULL, 0)) != 0) {
Expand Down
5 changes: 3 additions & 2 deletions nanomq/webhook_post.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,17 @@ hook_entry(nano_work *work, uint8_t reason)
nng_aio_wait(aio);

nng_aio_set_prov_data(aio, (void *)(uintptr_t)nkey);
nng_msg_clone(msg);
nng_aio_set_msg(aio, msg);

ex_sock = ex_conf->nodes[i]->sock;
nng_send_aio(*ex_sock, aio);
if (g_msg_index % 2000 == 0)
printf("%d msgs in exchange\n", g_msg_index);
break;
}
}
nng_msg_free(msg); // Cloned for each exchange before
}

if (!hook_conf->enable)
return 0;
switch (work->flag) {
Expand Down
Loading