From a1e3fc9c672c1cb02d5d3b34ee135cf351773b08 Mon Sep 17 00:00:00 2001 From: jaylin Date: Mon, 5 Feb 2024 12:48:28 +0800 Subject: [PATCH 1/5] * MDF [broker] set recvmax to exchange_sock Signed-off-by: jaylin --- nanomq/apps/broker.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nanomq/apps/broker.c b/nanomq/apps/broker.c index 5b2ccc158..efc829834 100644 --- a/nanomq/apps/broker.c +++ b/nanomq/apps/broker.c @@ -965,9 +965,11 @@ broker(conf *nanomq_conf) continue; } node->sock = (nng_socket *) nng_alloc(sizeof(nng_socket)); + // exchange sock is an embedded Req/Rep sock for MQTT Stream if ((rv = nng_exchange_client_open(node->sock)) != 0) { log_error("nng_exchange_client_open failed %d", rv); } else { + // nng_socket_set_ms(*node->sock, NNG_OPT_RECVMAXSZ, 0xFFFFFFFFu); nng_socket_set_ptr(*node->sock, NNG_OPT_EXCHANGE_BIND, (void *)node); } log_debug("exchange %d init finished!\n", i); @@ -1088,9 +1090,11 @@ broker(conf *nanomq_conf) // TODO expose this char url_zzz[128] = "tcp://127.0.0.1:10000"; nng_socket *mq_sock = nanomq_conf->exchange.nodes[0]->sock; - if ((rv = nano_listen(*mq_sock, url_zzz, NULL, 0, nanomq_conf)) != 0) { + nng_listener *mq_listener; + if ((rv = nano_listen(*mq_sock, url_zzz, mq_listener, 0, nanomq_conf)) != 0) { NANO_NNG_FATAL("broker nng_listen", rv); } + nng_listener_set_size(*mq_listener, NNG_OPT_RECVMAXSZ, 0xFFFFFFFFu); } if (nanomq_conf->enable) { From 7811539528b3bfdf9918fee12ac9df9dc4abcc55 Mon Sep 17 00:00:00 2001 From: jaylin Date: Mon, 5 Feb 2024 17:43:25 +0800 Subject: [PATCH 2/5] * FIX [Doc] fix missing "," in MQTT Stream Signed-off-by: jaylin --- docs/en_US/config-description/parquet.md | 2 +- docs/en_US/mqtt-stream/configuration.md | 4 ++-- docs/zh_CN/config-description/parquet.md | 2 +- docs/zh_CN/mqtt-stream/configuration.md | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/en_US/config-description/parquet.md b/docs/en_US/config-description/parquet.md index 53c821fe5..aaa14cca8 100644 --- a/docs/en_US/config-description/parquet.md +++ b/docs/en_US/config-description/parquet.md @@ -23,7 +23,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 1000 + cap = 1000, # # 2: RB_FULL_RETURN: When the ringbus is full, the data in the ringbus is taken out and returned to the aio fullOp = 2 } diff --git a/docs/en_US/mqtt-stream/configuration.md b/docs/en_US/mqtt-stream/configuration.md index 3e191580a..4ce4182cd 100644 --- a/docs/en_US/mqtt-stream/configuration.md +++ b/docs/en_US/mqtt-stream/configuration.md @@ -16,7 +16,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 1000 + cap = 1000, fullOp = 3 } } @@ -71,7 +71,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 10000 + cap = 10000, fullOp = 3 } } diff --git a/docs/zh_CN/config-description/parquet.md b/docs/zh_CN/config-description/parquet.md index dba5fc62d..af6ec10af 100644 --- a/docs/zh_CN/config-description/parquet.md +++ b/docs/zh_CN/config-description/parquet.md @@ -22,7 +22,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 1000 + cap = 1000, # # 2: RB_FULL_RETURN: When the ringbus is full, the data in the ringbus is taken out and returned to the aio fullOp = 2 } diff --git a/docs/zh_CN/mqtt-stream/configuration.md b/docs/zh_CN/mqtt-stream/configuration.md index 861a75e8b..e98250a75 100644 --- a/docs/zh_CN/mqtt-stream/configuration.md +++ b/docs/zh_CN/mqtt-stream/configuration.md @@ -16,7 +16,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 1000 + cap = 1000, fullOp = 3 } } @@ -71,7 +71,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 10000 + cap = 10000, fullOp = 3 } } From 460a259c082c433fc4f030c49deb81793702c4e5 Mon Sep 17 00:00:00 2001 From: jaylin Date: Mon, 5 Feb 2024 17:44:24 +0800 Subject: [PATCH 3/5] * MDF [hocon/example] add exchange_conf url Signed-off-by: jaylin --- etc/nanomq_example.conf | 2 ++ 1 file changed, 2 insertions(+) diff --git a/etc/nanomq_example.conf b/etc/nanomq_example.conf index 9b1bed489..169635809 100644 --- a/etc/nanomq_example.conf +++ b/etc/nanomq_example.conf @@ -1144,6 +1144,8 @@ rules.mysql.mysql_rule_db { # #==================================================================== # # Initalize multiple MQ exchanger by giving them different name (mq1) exchange_client.mq1 { + # # Currently NanoMQ only support one MQ object. URL shall be exactly same. + exchange_url = "tcp://127.0.0.1:10000" # # exchanges contains multiple MQ exchanger exchange { # # MQTT Topic for filtering messages and saving to queue From 63afc2622e9e1888a6e65f6b891cd87c3fffd2f8 Mon Sep 17 00:00:00 2001 From: jaylin Date: Mon, 5 Feb 2024 17:45:24 +0800 Subject: [PATCH 4/5] * MDF [exchange/broker] use url in conf for exchange & set max recv value Signed-off-by: jaylin --- nanomq/apps/broker.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/nanomq/apps/broker.c b/nanomq/apps/broker.c index efc829834..2b0d78a94 100644 --- a/nanomq/apps/broker.c +++ b/nanomq/apps/broker.c @@ -1087,14 +1087,13 @@ broker(conf *nanomq_conf) hook_exchange_init(nanomq_conf, num_work); // create exchange senders in hook hook_exchange_sender_init(nanomq_conf, works, num_work); - // TODO expose this - char url_zzz[128] = "tcp://127.0.0.1:10000"; + // TODO support multiple MQ coexisitence nng_socket *mq_sock = nanomq_conf->exchange.nodes[0]->sock; - nng_listener *mq_listener; - if ((rv = nano_listen(*mq_sock, url_zzz, mq_listener, 0, nanomq_conf)) != 0) { + nng_listener mq_listener; + if ((rv = nano_listen(*mq_sock, nanomq_conf->exchange.exchange_url, &mq_listener, 0, nanomq_conf)) != 0) { NANO_NNG_FATAL("broker nng_listen", rv); } - nng_listener_set_size(*mq_listener, NNG_OPT_RECVMAXSZ, 0xFFFFFFFFu); + nng_listener_set_size(mq_listener, NNG_OPT_RECVMAXSZ, 0xFFFFFFFFu); } if (nanomq_conf->enable) { From a944cd7f6297a68001f9c103a2be4317d69afb38 Mon Sep 17 00:00:00 2001 From: jaylin Date: Sat, 10 Feb 2024 17:18:23 +0800 Subject: [PATCH 5/5] * MDF [nng] move nng head for CI Signed-off-by: jaylin --- nng | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nng b/nng index 3aa8435fb..fd186e107 160000 --- a/nng +++ b/nng @@ -1 +1 @@ -Subproject commit 3aa8435fbdc4b1becc9ae0e3b22f08b5617ab4cb +Subproject commit fd186e107dac2c7134845df429c9a836ee9e4e9c