diff --git a/nanomq/apps/broker.c b/nanomq/apps/broker.c index c7eca8049..bcb1f044f 100644 --- a/nanomq/apps/broker.c +++ b/nanomq/apps/broker.c @@ -855,13 +855,14 @@ get_broker_db(void) int broker(conf *nanomq_conf) { - int rv; - int i, j, count; + int rv, i; + uint64_t num_work; nng_socket sock; nng_socket *bridge_sock; nng_pipe pipe_id; // add the num of other proto - uint64_t num_ctx = nanomq_conf->parallel; + nanomq_conf->total_ctx = nanomq_conf->parallel; // match with num of aio + num_work = nanomq_conf->parallel; // match with num of works #if defined(SUPP_RULE_ENGINE) @@ -943,7 +944,8 @@ broker(conf *nanomq_conf) } // set 4 ctx for HTTP as default if (nanomq_conf->http_server.enable) { - num_ctx += HTTP_CTX_NUM; + nanomq_conf->total_ctx += HTTP_CTX_NUM; + num_work += HTTP_CTX_NUM; } } log_debug("HTTP init finished"); @@ -969,12 +971,36 @@ broker(conf *nanomq_conf) log_debug("Hook service started"); } - // bridging client + // caculate total ctx first + if (nanomq_conf->bridge_mode) { + for (size_t t = 0; t < nanomq_conf->bridge.count; t++) { + conf_bridge_node *node = nanomq_conf->bridge.nodes[t]; + if (node->enable) { + // each bridge ctx is init with a broker ctx + nanomq_conf->total_ctx += node->parallel * 2; + num_work += node->parallel; + } + } + +#if defined(SUPP_AWS_BRIDGE) + for (size_t c = 0; c < nanomq_conf->aws_bridge.count; c++) { + log_debug("AWS bridgging service initialization"); + conf_bridge_node *node = + nanomq_conf->aws_bridge.nodes[c]; + if (node->enable) { + nanomq_conf->total_ctx += node->parallel * 2; + num_work += node->parallel; + } + } +#endif + log_trace("total ctx num: %ld", nanomq_conf->total_ctx); + } + + // init bridging client if (nanomq_conf->bridge_mode) { for (size_t t = 0; t < nanomq_conf->bridge.count; t++) { conf_bridge_node *node = nanomq_conf->bridge.nodes[t]; if (node->enable) { - num_ctx += node->parallel; node->sock = (nng_socket *) nng_alloc( sizeof(nng_socket)); #if defined(SUPP_QUIC) @@ -988,21 +1014,10 @@ broker(conf *nanomq_conf) #endif } } - -#if defined(SUPP_AWS_BRIDGE) - for (size_t c = 0; c < nanomq_conf->aws_bridge.count; c++) { - log_debug("AWS bridgging service initialization"); - conf_bridge_node *node = - nanomq_conf->aws_bridge.nodes[c]; - if (node->enable) { - num_ctx += node->parallel; - } - } -#endif log_debug("bridge init finished"); } - // MQTT Broker service - struct work **works = nng_zalloc(num_ctx * sizeof(struct work *)); + // CTX for MQTT Broker service + struct work **works = nng_zalloc(num_work * sizeof(struct work *)); // create broker ctx for (i = 0; i < nanomq_conf->parallel; i++) { works[i] = proto_work_init(sock, inproc_sock, sock, @@ -1060,9 +1075,9 @@ broker(conf *nanomq_conf) // Init exchange part in hook if (nanomq_conf->exchange.count > 0) { - hook_exchange_init(nanomq_conf, num_ctx); + hook_exchange_init(nanomq_conf, nanomq_conf->total_ctx); // create exchange senders in hook - hook_exchange_sender_init(nanomq_conf, works, num_ctx); + hook_exchange_sender_init(nanomq_conf, works, nanomq_conf->total_ctx); // TODO expose this char url_zzz[128] = "tcp://127.0.0.1:10000"; nng_socket *mq_sock = nanomq_conf->exchange.nodes[0]->sock; @@ -1123,7 +1138,7 @@ broker(conf *nanomq_conf) } } - for (i = 0; i < num_ctx; i++) { + for (i = 0; i < num_work; i++) { server_cb(works[i]); // this starts them going (INIT state) } @@ -1180,7 +1195,7 @@ broker(conf *nanomq_conf) bool is_testing = false; #endif -#if (defined DEBUG) && (!defined ASAN) +#if (defined DEBUG) && (!defined ASAN) #if !(defined NANO_PLATFORM_WINDOWS) struct sigaction act; i = 0; @@ -1223,7 +1238,7 @@ broker(conf *nanomq_conf) } for (size_t t = 0; t < conf->bridge.count; t++) { conf_bridge_node *node = conf->bridge.nodes[t]; - size_t aio_count = (conf->parallel + node->parallel * 2); + size_t aio_count = conf->total_ctx; if (node->enable) { for (size_t i = 0; i < aio_count; i++) { nng_aio_finish_error(node->bridge_aio[i], 0); @@ -1240,12 +1255,12 @@ broker(conf *nanomq_conf) // nng_free( // conf->bridge.nodes, sizeof(conf_bridge_node **)); - for (size_t i = 0; i < num_ctx; i++) { + for (size_t i = 0; i < num_work; i++) { nng_free(works[i]->pipe_ct, sizeof(struct pipe_content)); nng_free(works[i], sizeof(struct work)); } - nng_free(works, num_ctx * sizeof(struct work *)); + nng_free(works, num_work * sizeof(struct work *)); break; } nng_msleep(6000); @@ -1619,7 +1634,7 @@ broker_parse_opts(int argc, char **argv, conf *config) int broker_start(int argc, char **argv) { - int i, url, temp, rc, num_ctx = 0; + int i, url, temp, rc; int pid = 0; conf *nanomq_conf; diff --git a/nanomq/bridge.c b/nanomq/bridge.c index 934eed581..4d1d3aeb7 100644 --- a/nanomq/bridge.c +++ b/nanomq/bridge.c @@ -608,17 +608,16 @@ hybrid_cb(void *arg) NANO_NNG_FATAL("nng_cv_alloc mem error", rv); return; } - uint32_t aio_cnt = bridge_arg->conf->parallel + node->parallel * 2; + // uint32_t aio_cnt = bridge_arg->conf->parallel + node->parallel; // alloc an AIO for each ctx bridging use only - node->bridge_aio = nng_alloc(aio_cnt * sizeof(nng_aio *)); + node->bridge_aio = nng_alloc(bridge_arg->conf->total_ctx * sizeof(nng_aio *)); - for (uint32_t num = 0; num < aio_cnt; num++) { + for (uint32_t num = 0; num < bridge_arg->conf->total_ctx; num++) { if ((rv = nng_aio_alloc(&node->bridge_aio[num], NULL, node)) != 0) { NANO_NNG_FATAL("bridge_aio nng_aio_alloc", rv); } } - log_debug("parallel %d aios", aio_cnt); char addr_back[160] = {'\0'}; if (0 != gen_fallback_url(node->address, addr_back)) { @@ -1173,20 +1172,19 @@ bridge_client(nng_socket *sock, conf *config, conf_bridge_node *node) } // alloc an AIO for each ctx bridging use only - node->bridge_aio = nng_alloc( - (config->parallel + node->parallel * 2) * sizeof(nng_aio *)); + node->bridge_aio = nng_alloc(config->total_ctx * sizeof(nng_aio *)); node->sock = (void *) sock; node->bridge_arg = (void *) bridge_arg; - for (uint32_t num = 0; num < (config->parallel + node->parallel * 2); - num++) { + uint32_t num; + for ( num = 0; num < config->total_ctx; num++ ) { if ((rv = nng_aio_alloc( &node->bridge_aio[num], bridge_send_cb, node)) != 0) { NANO_NNG_FATAL("bridge_aio nng_aio_alloc", rv); } - log_debug("parallel %d", num); } + log_debug("parallel %d", num); return 0; } diff --git a/nanomq/include/broker.h b/nanomq/include/broker.h index 9f91e21de..f56ce6471 100644 --- a/nanomq/include/broker.h +++ b/nanomq/include/broker.h @@ -33,12 +33,9 @@ struct work { END, // Clear state and cache before disconnect CLOSE // sending disconnect packet and err code } state; - // 0x00 mqtt_broker - // 0x01 mqtt_bridge - uint8_t proto; - // MQTT version cache - uint8_t proto_ver; - uint8_t flag; // flag for webhook & rule_engine + uint8_t proto; // logic proto + uint8_t proto_ver; // MQTT version cache + uint8_t flag; // flag for webhook & rule_engine nng_aio * aio; nng_msg * msg; nng_msg ** msg_ret; diff --git a/nng b/nng index e51f38c6f..cf88701da 160000 --- a/nng +++ b/nng @@ -1 +1 @@ -Subproject commit e51f38c6fa6fd0370e5072123f86cdf49480cb59 +Subproject commit cf88701dad00dc8fff0c668103cdc7e1d1919e5d