From ac13e066d149d764b3724a21b8fa0f9416d169f2 Mon Sep 17 00:00:00 2001 From: jaylin Date: Fri, 19 Jan 2024 19:38:46 +0800 Subject: [PATCH 1/6] * MDF [conf] add total_ctx to conf struct Signed-off-by: jaylin --- nanomq/apps/broker.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/nanomq/apps/broker.c b/nanomq/apps/broker.c index c7eca8049..0b74ba6c4 100644 --- a/nanomq/apps/broker.c +++ b/nanomq/apps/broker.c @@ -861,7 +861,7 @@ broker(conf *nanomq_conf) nng_socket *bridge_sock; nng_pipe pipe_id; // add the num of other proto - uint64_t num_ctx = nanomq_conf->parallel; + nanomq_conf->total_ctx = nanomq_conf->parallel; #if defined(SUPP_RULE_ENGINE) @@ -943,7 +943,7 @@ broker(conf *nanomq_conf) } // set 4 ctx for HTTP as default if (nanomq_conf->http_server.enable) { - num_ctx += HTTP_CTX_NUM; + nanomq_conf->total_ctx += HTTP_CTX_NUM; } } log_debug("HTTP init finished"); @@ -974,7 +974,7 @@ broker(conf *nanomq_conf) for (size_t t = 0; t < nanomq_conf->bridge.count; t++) { conf_bridge_node *node = nanomq_conf->bridge.nodes[t]; if (node->enable) { - num_ctx += node->parallel; + nanomq_conf->total_ctx += node->parallel; node->sock = (nng_socket *) nng_alloc( sizeof(nng_socket)); #if defined(SUPP_QUIC) @@ -995,14 +995,14 @@ broker(conf *nanomq_conf) conf_bridge_node *node = nanomq_conf->aws_bridge.nodes[c]; if (node->enable) { - num_ctx += node->parallel; + nanomq_conf->total_ctx += node->parallel; } } #endif log_debug("bridge init finished"); } // MQTT Broker service - struct work **works = nng_zalloc(num_ctx * sizeof(struct work *)); + struct work **works = nng_zalloc(nanomq_conf->total_ctx * sizeof(struct work *)); // create broker ctx for (i = 0; i < nanomq_conf->parallel; i++) { works[i] = proto_work_init(sock, inproc_sock, sock, @@ -1060,9 +1060,9 @@ broker(conf *nanomq_conf) // Init exchange part in hook if (nanomq_conf->exchange.count > 0) { - hook_exchange_init(nanomq_conf, num_ctx); + hook_exchange_init(nanomq_conf, nanomq_conf->total_ctx); // create exchange senders in hook - hook_exchange_sender_init(nanomq_conf, works, num_ctx); + hook_exchange_sender_init(nanomq_conf, works, nanomq_conf->total_ctx); // TODO expose this char url_zzz[128] = "tcp://127.0.0.1:10000"; nng_socket *mq_sock = nanomq_conf->exchange.nodes[0]->sock; @@ -1123,7 +1123,7 @@ broker(conf *nanomq_conf) } } - for (i = 0; i < num_ctx; i++) { + for (i = 0; i < nanomq_conf->total_ctx; i++) { server_cb(works[i]); // this starts them going (INIT state) } @@ -1240,12 +1240,12 @@ broker(conf *nanomq_conf) // nng_free( // conf->bridge.nodes, sizeof(conf_bridge_node **)); - for (size_t i = 0; i < num_ctx; i++) { + for (size_t i = 0; i < nanomq_conf->total_ctx; i++) { nng_free(works[i]->pipe_ct, sizeof(struct pipe_content)); nng_free(works[i], sizeof(struct work)); } - nng_free(works, num_ctx * sizeof(struct work *)); + nng_free(works, nanomq_conf->total_ctx * sizeof(struct work *)); break; } nng_msleep(6000); @@ -1619,7 +1619,7 @@ broker_parse_opts(int argc, char **argv, conf *config) int broker_start(int argc, char **argv) { - int i, url, temp, rc, num_ctx = 0; + int i, url, temp, rc; int pid = 0; conf *nanomq_conf; From 2d03e9afe7cadd555a279a05c591fa1f8cac1e6d Mon Sep 17 00:00:00 2001 From: jaylin Date: Fri, 19 Jan 2024 19:41:59 +0800 Subject: [PATCH 2/6] * MDF [broker] caculate total ctx num before init bridging client Signed-off-by: jaylin --- nanomq/apps/broker.c | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/nanomq/apps/broker.c b/nanomq/apps/broker.c index 0b74ba6c4..5ece9595c 100644 --- a/nanomq/apps/broker.c +++ b/nanomq/apps/broker.c @@ -969,23 +969,12 @@ broker(conf *nanomq_conf) log_debug("Hook service started"); } - // bridging client + // caculate total ctx first if (nanomq_conf->bridge_mode) { for (size_t t = 0; t < nanomq_conf->bridge.count; t++) { conf_bridge_node *node = nanomq_conf->bridge.nodes[t]; if (node->enable) { nanomq_conf->total_ctx += node->parallel; - node->sock = (nng_socket *) nng_alloc( - sizeof(nng_socket)); -#if defined(SUPP_QUIC) - if (node->hybrid) { - hybrid_bridge_client(node->sock, nanomq_conf, node); - } else { - bridge_client(node->sock, nanomq_conf, node); - } -#else - bridge_client(node->sock, nanomq_conf, node); -#endif } } @@ -999,6 +988,27 @@ broker(conf *nanomq_conf) } } #endif + log_trace("total ctx num: %ld", nanomq_conf->total_ctx); + } + + // init bridging client + if (nanomq_conf->bridge_mode) { + for (size_t t = 0; t < nanomq_conf->bridge.count; t++) { + conf_bridge_node *node = nanomq_conf->bridge.nodes[t]; + if (node->enable) { + node->sock = (nng_socket *) nng_alloc( + sizeof(nng_socket)); +#if defined(SUPP_QUIC) + if (node->hybrid) { + hybrid_bridge_client(node->sock, nanomq_conf, node); + } else { + bridge_client(node->sock, nanomq_conf, node); + } +#else + bridge_client(node->sock, nanomq_conf, node); +#endif + } + } log_debug("bridge init finished"); } // MQTT Broker service From b26c42e9489c584a89c9d933980edb31c1240e11 Mon Sep 17 00:00:00 2001 From: jaylin Date: Fri, 19 Jan 2024 20:16:29 +0800 Subject: [PATCH 3/6] * FIX [broker] take correct number of bridge aio Signed-off-by: jaylin --- nanomq/apps/broker.c | 25 +++++++++++++++---------- nanomq/include/broker.h | 9 +++------ 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/nanomq/apps/broker.c b/nanomq/apps/broker.c index 5ece9595c..96cdcc0f2 100644 --- a/nanomq/apps/broker.c +++ b/nanomq/apps/broker.c @@ -855,13 +855,14 @@ get_broker_db(void) int broker(conf *nanomq_conf) { - int rv; - int i, j, count; + int rv, i; + uint64_t num_work; nng_socket sock; nng_socket *bridge_sock; nng_pipe pipe_id; // add the num of other proto - nanomq_conf->total_ctx = nanomq_conf->parallel; + nanomq_conf->total_ctx = nanomq_conf->parallel; // match with num of aio + num_work = nanomq_conf->parallel; // match with num of works #if defined(SUPP_RULE_ENGINE) @@ -944,6 +945,7 @@ broker(conf *nanomq_conf) // set 4 ctx for HTTP as default if (nanomq_conf->http_server.enable) { nanomq_conf->total_ctx += HTTP_CTX_NUM; + num_work += HTTP_CTX_NUM; } } log_debug("HTTP init finished"); @@ -974,7 +976,9 @@ broker(conf *nanomq_conf) for (size_t t = 0; t < nanomq_conf->bridge.count; t++) { conf_bridge_node *node = nanomq_conf->bridge.nodes[t]; if (node->enable) { - nanomq_conf->total_ctx += node->parallel; + // each bridge ctx is init with a broker ctx + nanomq_conf->total_ctx += node->parallel * 2; + num_work += node->parallel; } } @@ -984,7 +988,8 @@ broker(conf *nanomq_conf) conf_bridge_node *node = nanomq_conf->aws_bridge.nodes[c]; if (node->enable) { - nanomq_conf->total_ctx += node->parallel; + nanomq_conf->total_ctx += node->parallel * 2; + num_work += node->parallel; } } #endif @@ -1011,8 +1016,8 @@ broker(conf *nanomq_conf) } log_debug("bridge init finished"); } - // MQTT Broker service - struct work **works = nng_zalloc(nanomq_conf->total_ctx * sizeof(struct work *)); + // CTX for MQTT Broker service + struct work **works = nng_zalloc(num_work * sizeof(struct work *)); // create broker ctx for (i = 0; i < nanomq_conf->parallel; i++) { works[i] = proto_work_init(sock, inproc_sock, sock, @@ -1133,7 +1138,7 @@ broker(conf *nanomq_conf) } } - for (i = 0; i < nanomq_conf->total_ctx; i++) { + for (i = 0; i < num_work; i++) { server_cb(works[i]); // this starts them going (INIT state) } @@ -1250,12 +1255,12 @@ broker(conf *nanomq_conf) // nng_free( // conf->bridge.nodes, sizeof(conf_bridge_node **)); - for (size_t i = 0; i < nanomq_conf->total_ctx; i++) { + for (size_t i = 0; i < num_work; i++) { nng_free(works[i]->pipe_ct, sizeof(struct pipe_content)); nng_free(works[i], sizeof(struct work)); } - nng_free(works, nanomq_conf->total_ctx * sizeof(struct work *)); + nng_free(works, num_work * sizeof(struct work *)); break; } nng_msleep(6000); diff --git a/nanomq/include/broker.h b/nanomq/include/broker.h index 9f91e21de..f56ce6471 100644 --- a/nanomq/include/broker.h +++ b/nanomq/include/broker.h @@ -33,12 +33,9 @@ struct work { END, // Clear state and cache before disconnect CLOSE // sending disconnect packet and err code } state; - // 0x00 mqtt_broker - // 0x01 mqtt_bridge - uint8_t proto; - // MQTT version cache - uint8_t proto_ver; - uint8_t flag; // flag for webhook & rule_engine + uint8_t proto; // logic proto + uint8_t proto_ver; // MQTT version cache + uint8_t flag; // flag for webhook & rule_engine nng_aio * aio; nng_msg * msg; nng_msg ** msg_ret; From c38382a24b3e1357be58cf0b810a7a5bab4dbd23 Mon Sep 17 00:00:00 2001 From: jaylin Date: Fri, 19 Jan 2024 20:22:14 +0800 Subject: [PATCH 4/6] * FIX [bridge] fix #1616 bridge aio overflow Signed-off-by: jaylin --- nanomq/bridge.c | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/nanomq/bridge.c b/nanomq/bridge.c index 934eed581..4d1d3aeb7 100644 --- a/nanomq/bridge.c +++ b/nanomq/bridge.c @@ -608,17 +608,16 @@ hybrid_cb(void *arg) NANO_NNG_FATAL("nng_cv_alloc mem error", rv); return; } - uint32_t aio_cnt = bridge_arg->conf->parallel + node->parallel * 2; + // uint32_t aio_cnt = bridge_arg->conf->parallel + node->parallel; // alloc an AIO for each ctx bridging use only - node->bridge_aio = nng_alloc(aio_cnt * sizeof(nng_aio *)); + node->bridge_aio = nng_alloc(bridge_arg->conf->total_ctx * sizeof(nng_aio *)); - for (uint32_t num = 0; num < aio_cnt; num++) { + for (uint32_t num = 0; num < bridge_arg->conf->total_ctx; num++) { if ((rv = nng_aio_alloc(&node->bridge_aio[num], NULL, node)) != 0) { NANO_NNG_FATAL("bridge_aio nng_aio_alloc", rv); } } - log_debug("parallel %d aios", aio_cnt); char addr_back[160] = {'\0'}; if (0 != gen_fallback_url(node->address, addr_back)) { @@ -1173,20 +1172,19 @@ bridge_client(nng_socket *sock, conf *config, conf_bridge_node *node) } // alloc an AIO for each ctx bridging use only - node->bridge_aio = nng_alloc( - (config->parallel + node->parallel * 2) * sizeof(nng_aio *)); + node->bridge_aio = nng_alloc(config->total_ctx * sizeof(nng_aio *)); node->sock = (void *) sock; node->bridge_arg = (void *) bridge_arg; - for (uint32_t num = 0; num < (config->parallel + node->parallel * 2); - num++) { + uint32_t num; + for ( num = 0; num < config->total_ctx; num++ ) { if ((rv = nng_aio_alloc( &node->bridge_aio[num], bridge_send_cb, node)) != 0) { NANO_NNG_FATAL("bridge_aio nng_aio_alloc", rv); } - log_debug("parallel %d", num); } + log_debug("parallel %d", num); return 0; } From b52dc4e8b6b9c24c72d086f9c0ef2987e0fe5b37 Mon Sep 17 00:00:00 2001 From: jaylin Date: Sun, 21 Jan 2024 11:37:38 +0800 Subject: [PATCH 5/6] * MDF [nng] move nng head for CI Signed-off-by: jaylin --- nng | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nng b/nng index e51f38c6f..cf88701da 160000 --- a/nng +++ b/nng @@ -1 +1 @@ -Subproject commit e51f38c6fa6fd0370e5072123f86cdf49480cb59 +Subproject commit cf88701dad00dc8fff0c668103cdc7e1d1919e5d From 78233c814b260462200523df0f02ea63b3731ae1 Mon Sep 17 00:00:00 2001 From: wayne Date: Mon, 22 Jan 2024 12:35:21 +0800 Subject: [PATCH 6/6] * FIX [bridge] fix aio memleak Signed-off-by: wayne --- nanomq/apps/broker.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nanomq/apps/broker.c b/nanomq/apps/broker.c index 96cdcc0f2..bcb1f044f 100644 --- a/nanomq/apps/broker.c +++ b/nanomq/apps/broker.c @@ -1195,7 +1195,7 @@ broker(conf *nanomq_conf) bool is_testing = false; #endif -#if (defined DEBUG) && (!defined ASAN) +#if (defined DEBUG) && (!defined ASAN) #if !(defined NANO_PLATFORM_WINDOWS) struct sigaction act; i = 0; @@ -1238,7 +1238,7 @@ broker(conf *nanomq_conf) } for (size_t t = 0; t < conf->bridge.count; t++) { conf_bridge_node *node = conf->bridge.nodes[t]; - size_t aio_count = (conf->parallel + node->parallel * 2); + size_t aio_count = conf->total_ctx; if (node->enable) { for (size_t i = 0; i < aio_count; i++) { nng_aio_finish_error(node->bridge_aio[i], 0);