Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIx # 1616 #1617

Merged
merged 6 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 42 additions & 27 deletions nanomq/apps/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -855,13 +855,14 @@
int
broker(conf *nanomq_conf)
{
int rv;
int i, j, count;
int rv, i;
uint64_t num_work;
nng_socket sock;
nng_socket *bridge_sock;
nng_pipe pipe_id;
// add the num of other proto
uint64_t num_ctx = nanomq_conf->parallel;
nanomq_conf->total_ctx = nanomq_conf->parallel; // match with num of aio
num_work = nanomq_conf->parallel; // match with num of works


#if defined(SUPP_RULE_ENGINE)
Expand Down Expand Up @@ -943,13 +944,14 @@
}
// set 4 ctx for HTTP as default
if (nanomq_conf->http_server.enable) {
num_ctx += HTTP_CTX_NUM;
nanomq_conf->total_ctx += HTTP_CTX_NUM;
num_work += HTTP_CTX_NUM;
}
}
log_debug("HTTP init finished");

// Exchange service
for (int i = 0; i < nanomq_conf->exchange.count; i++) {

Check notice

Code scanning / CodeQL

Declaration hides variable Note

Variable i hides another variable of the same name (on
line 858
).
conf_exchange_node *node = nanomq_conf->exchange.nodes[i];
if (node == NULL) {
log_error("Wrong exchange %d configuration!", i);
Expand All @@ -969,12 +971,36 @@
log_debug("Hook service started");
}

// bridging client
// caculate total ctx first
if (nanomq_conf->bridge_mode) {
for (size_t t = 0; t < nanomq_conf->bridge.count; t++) {
conf_bridge_node *node = nanomq_conf->bridge.nodes[t];
if (node->enable) {
// each bridge ctx is init with a broker ctx
nanomq_conf->total_ctx += node->parallel * 2;
num_work += node->parallel;
}
}

#if defined(SUPP_AWS_BRIDGE)
for (size_t c = 0; c < nanomq_conf->aws_bridge.count; c++) {
log_debug("AWS bridgging service initialization");
conf_bridge_node *node =
nanomq_conf->aws_bridge.nodes[c];
if (node->enable) {
nanomq_conf->total_ctx += node->parallel * 2;
num_work += node->parallel;
}
}
#endif
log_trace("total ctx num: %ld", nanomq_conf->total_ctx);
}

// init bridging client
if (nanomq_conf->bridge_mode) {
for (size_t t = 0; t < nanomq_conf->bridge.count; t++) {
conf_bridge_node *node = nanomq_conf->bridge.nodes[t];
if (node->enable) {
num_ctx += node->parallel;
node->sock = (nng_socket *) nng_alloc(
sizeof(nng_socket));
#if defined(SUPP_QUIC)
Expand All @@ -988,21 +1014,10 @@
#endif
}
}

#if defined(SUPP_AWS_BRIDGE)
for (size_t c = 0; c < nanomq_conf->aws_bridge.count; c++) {
log_debug("AWS bridgging service initialization");
conf_bridge_node *node =
nanomq_conf->aws_bridge.nodes[c];
if (node->enable) {
num_ctx += node->parallel;
}
}
#endif
log_debug("bridge init finished");
}
// MQTT Broker service
struct work **works = nng_zalloc(num_ctx * sizeof(struct work *));
// CTX for MQTT Broker service
struct work **works = nng_zalloc(num_work * sizeof(struct work *));
// create broker ctx
for (i = 0; i < nanomq_conf->parallel; i++) {
works[i] = proto_work_init(sock, inproc_sock, sock,
Expand Down Expand Up @@ -1060,9 +1075,9 @@

// Init exchange part in hook
if (nanomq_conf->exchange.count > 0) {
hook_exchange_init(nanomq_conf, num_ctx);
hook_exchange_init(nanomq_conf, nanomq_conf->total_ctx);
// create exchange senders in hook
hook_exchange_sender_init(nanomq_conf, works, num_ctx);
hook_exchange_sender_init(nanomq_conf, works, nanomq_conf->total_ctx);
// TODO expose this
char url_zzz[128] = "tcp://127.0.0.1:10000";
nng_socket *mq_sock = nanomq_conf->exchange.nodes[0]->sock;
Expand Down Expand Up @@ -1123,7 +1138,7 @@
}
}

for (i = 0; i < num_ctx; i++) {
for (i = 0; i < num_work; i++) {
server_cb(works[i]); // this starts them going (INIT state)
}

Expand Down Expand Up @@ -1180,7 +1195,7 @@
bool is_testing = false;
#endif

#if (defined DEBUG) && (!defined ASAN)
#if (defined DEBUG) && (!defined ASAN)
#if !(defined NANO_PLATFORM_WINDOWS)
struct sigaction act;
i = 0;
Expand Down Expand Up @@ -1223,7 +1238,7 @@
}
for (size_t t = 0; t < conf->bridge.count; t++) {
conf_bridge_node *node = conf->bridge.nodes[t];
size_t aio_count = (conf->parallel + node->parallel * 2);
size_t aio_count = conf->total_ctx;
if (node->enable) {
for (size_t i = 0; i < aio_count; i++) {
nng_aio_finish_error(node->bridge_aio[i], 0);
Expand All @@ -1240,12 +1255,12 @@
// nng_free(
JaylinYu marked this conversation as resolved.
Show resolved Hide resolved
// conf->bridge.nodes, sizeof(conf_bridge_node **));

for (size_t i = 0; i < num_ctx; i++) {
for (size_t i = 0; i < num_work; i++) {
nng_free(works[i]->pipe_ct,
sizeof(struct pipe_content));
nng_free(works[i], sizeof(struct work));
}
nng_free(works, num_ctx * sizeof(struct work *));
nng_free(works, num_work * sizeof(struct work *));
break;
}
nng_msleep(6000);
Expand Down Expand Up @@ -1619,7 +1634,7 @@
int
broker_start(int argc, char **argv)
{
int i, url, temp, rc, num_ctx = 0;
int i, url, temp, rc;

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable i is not used.

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable url is not used.

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable temp is not used.
int pid = 0;

conf *nanomq_conf;
Expand Down
16 changes: 7 additions & 9 deletions nanomq/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,17 +608,16 @@ hybrid_cb(void *arg)
NANO_NNG_FATAL("nng_cv_alloc mem error", rv);
return;
}
uint32_t aio_cnt = bridge_arg->conf->parallel + node->parallel * 2;
// uint32_t aio_cnt = bridge_arg->conf->parallel + node->parallel;
// alloc an AIO for each ctx bridging use only
node->bridge_aio = nng_alloc(aio_cnt * sizeof(nng_aio *));
node->bridge_aio = nng_alloc(bridge_arg->conf->total_ctx * sizeof(nng_aio *));

for (uint32_t num = 0; num < aio_cnt; num++) {
for (uint32_t num = 0; num < bridge_arg->conf->total_ctx; num++) {
if ((rv = nng_aio_alloc(&node->bridge_aio[num], NULL, node)) !=
0) {
NANO_NNG_FATAL("bridge_aio nng_aio_alloc", rv);
}
}
log_debug("parallel %d aios", aio_cnt);

char addr_back[160] = {'\0'};
if (0 != gen_fallback_url(node->address, addr_back)) {
Expand Down Expand Up @@ -1173,20 +1172,19 @@ bridge_client(nng_socket *sock, conf *config, conf_bridge_node *node)
}

// alloc an AIO for each ctx bridging use only
node->bridge_aio = nng_alloc(
(config->parallel + node->parallel * 2) * sizeof(nng_aio *));
node->bridge_aio = nng_alloc(config->total_ctx * sizeof(nng_aio *));
Dismissed Show dismissed Hide dismissed

node->sock = (void *) sock;
node->bridge_arg = (void *) bridge_arg;

for (uint32_t num = 0; num < (config->parallel + node->parallel * 2);
num++) {
uint32_t num;
for ( num = 0; num < config->total_ctx; num++ ) {
if ((rv = nng_aio_alloc(
&node->bridge_aio[num], bridge_send_cb, node)) != 0) {
NANO_NNG_FATAL("bridge_aio nng_aio_alloc", rv);
}
log_debug("parallel %d", num);
}
log_debug("parallel %d", num);
return 0;
}

Expand Down
9 changes: 3 additions & 6 deletions nanomq/include/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ struct work {
END, // Clear state and cache before disconnect
CLOSE // sending disconnect packet and err code
} state;
// 0x00 mqtt_broker
// 0x01 mqtt_bridge
uint8_t proto;
// MQTT version cache
uint8_t proto_ver;
uint8_t flag; // flag for webhook & rule_engine
uint8_t proto; // logic proto
uint8_t proto_ver; // MQTT version cache
uint8_t flag; // flag for webhook & rule_engine
nng_aio * aio;
nng_msg * msg;
nng_msg ** msg_ret;
Expand Down