Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #1719 #1721

Merged
merged 6 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions nanomq/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -739,11 +739,13 @@ hybrid_cb(void *arg)
int
hybrid_bridge_client(nng_socket *sock, conf *config, conf_bridge_node *node)
{
bridge_param *bridge_arg;
bridge_param *bridge_arg = NULL;
if ((bridge_arg = nng_alloc(sizeof(bridge_param))) == NULL) {
log_error("memory error in allocating bridge client");
return NNG_ENOMEM;
}
bridge_arg->exec_mtx = NULL;
bridge_arg->exec_cv = NULL;

bridge_arg->config = node;
bridge_arg->sock = sock;
Expand All @@ -755,18 +757,18 @@ hybrid_bridge_client(nng_socket *sock, conf *config, conf_bridge_node *node)
int rv = nng_mtx_alloc(&bridge_arg->exec_mtx);
if (rv != 0) {
NANO_NNG_FATAL("nng_mtx_alloc", rv);
return rv;
goto error;
}
rv = nng_cv_alloc(&bridge_arg->exec_cv, bridge_arg->exec_mtx);
if (rv != 0) {
NANO_NNG_FATAL("nng_cv_alloc", rv);
return rv;
goto error;
}

rv = nng_thread_create(&hybrid_thr, hybrid_cb, (void *)bridge_arg);
if (rv != 0) {
NANO_NNG_FATAL("nng_thread_create", rv);
return rv;
goto error;
}

nng_mtx_lock(bridge_arg->exec_mtx);
Expand All @@ -775,6 +777,17 @@ hybrid_bridge_client(nng_socket *sock, conf *config, conf_bridge_node *node)
nng_cv_free(bridge_arg->exec_cv);
bridge_arg->exec_cv = NULL;

error:
if(bridge_arg->exec_cv != NULL) {
nng_cv_free(bridge_arg->exec_cv);
}
if(bridge_arg->exec_mtx != NULL) {
nng_mtx_free(bridge_arg->exec_mtx);
}
if(bridge_arg != NULL) {
nng_free(bridge_arg, sizeof(bridge_param));
}

return rv;
}

Expand Down
6 changes: 4 additions & 2 deletions nanomq/webhook_inproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ hook_cb(void *arg)
rv = nng_pull0_open(&sock);
if (rv != 0) {
log_error("nng_pull0_open %d", rv);
nng_free(works, works_num * sizeof(struct hook_work *));
return;
}

Expand All @@ -721,7 +722,7 @@ hook_cb(void *arg)
// NanoMQ core thread talks to others via INPROC
if ((rv = nng_listen(sock, HOOK_IPC_URL, NULL, 0)) != 0) {
log_error("hook nng_listen %d", rv);
return;
goto out;
}

if (hook_search_limit == NULL)
Expand All @@ -730,7 +731,7 @@ hook_cb(void *arg)
if (0 != (rv = nng_aio_alloc(&hook_search_reset_aio,
hook_search_reset, &conf->parquet))) {
log_error("hook hook_search reset aio init failed %d", rv);
return;
goto out;
}
nng_aio_finish(hook_search_reset_aio, 0); // Start
log_info("hook hook_search reset aio started");
Expand All @@ -744,6 +745,7 @@ hook_cb(void *arg)
nng_msleep(3600000); // neither pause() nor sleep() portable
}

out:
wanghaEMQ marked this conversation as resolved.
Show resolved Hide resolved
// Free hook search reset aio and limit atomic
if (hook_search_limit)
nng_atomic_free(hook_search_limit);
Expand Down
9 changes: 8 additions & 1 deletion nanomq/webhook_post.c
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,15 @@ flush_smsg_to_disk(nng_msg **smsg, size_t len, void *handle, nng_aio *aio)
keys = nng_alloc(sizeof(uint64_t)* len);
datas = nng_alloc(sizeof(void *) * len);
lens = nng_alloc(sizeof(uint32_t) * len);
if (!datas || !keys || !lens)
if (!datas || !keys || !lens) {
if (keys)
nng_free(keys, sizeof(uint64_t) * len);
if (datas)
nng_free(datas, sizeof(void *) * len);
if (len)
nng_free(lens, sizeof(uint32_t) * len);
return NNG_ENOMEM;
}

int len2 = 0;
for (int i=0; i<(int)len; ++i) {
Expand Down
Loading