diff --git a/docs/en_US/config-description/parquet.md b/docs/en_US/config-description/parquet.md index 53c821fe5..aaa14cca8 100644 --- a/docs/en_US/config-description/parquet.md +++ b/docs/en_US/config-description/parquet.md @@ -23,7 +23,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 1000 + cap = 1000, # # 2: RB_FULL_RETURN: When the ringbus is full, the data in the ringbus is taken out and returned to the aio fullOp = 2 } diff --git a/docs/en_US/mqtt-stream/configuration.md b/docs/en_US/mqtt-stream/configuration.md index 3e191580a..4ce4182cd 100644 --- a/docs/en_US/mqtt-stream/configuration.md +++ b/docs/en_US/mqtt-stream/configuration.md @@ -16,7 +16,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 1000 + cap = 1000, fullOp = 3 } } @@ -71,7 +71,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 10000 + cap = 10000, fullOp = 3 } } diff --git a/docs/zh_CN/config-description/parquet.md b/docs/zh_CN/config-description/parquet.md index dba5fc62d..af6ec10af 100644 --- a/docs/zh_CN/config-description/parquet.md +++ b/docs/zh_CN/config-description/parquet.md @@ -22,7 +22,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 1000 + cap = 1000, # # 2: RB_FULL_RETURN: When the ringbus is full, the data in the ringbus is taken out and returned to the aio fullOp = 2 } diff --git a/docs/zh_CN/mqtt-stream/configuration.md b/docs/zh_CN/mqtt-stream/configuration.md index 861a75e8b..e98250a75 100644 --- a/docs/zh_CN/mqtt-stream/configuration.md +++ b/docs/zh_CN/mqtt-stream/configuration.md @@ -16,7 +16,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 1000 + cap = 1000, fullOp = 3 } } @@ -71,7 +71,7 @@ exchange_client.mq1 { # # ring buffer name name = "ringbus", # # max length of ring buffer (msg count) - cap = 10000 + cap = 10000, fullOp = 3 } } diff --git a/etc/nanomq_example.conf b/etc/nanomq_example.conf index 9b1bed489..169635809 100644 --- a/etc/nanomq_example.conf +++ b/etc/nanomq_example.conf @@ -1144,6 +1144,8 @@ rules.mysql.mysql_rule_db { # #==================================================================== # # Initalize multiple MQ exchanger by giving them different name (mq1) exchange_client.mq1 { + # # Currently NanoMQ only support one MQ object. URL shall be exactly same. + exchange_url = "tcp://127.0.0.1:10000" # # exchanges contains multiple MQ exchanger exchange { # # MQTT Topic for filtering messages and saving to queue diff --git a/nanomq/apps/broker.c b/nanomq/apps/broker.c index 5b2ccc158..2b0d78a94 100644 --- a/nanomq/apps/broker.c +++ b/nanomq/apps/broker.c @@ -965,9 +965,11 @@ broker(conf *nanomq_conf) continue; } node->sock = (nng_socket *) nng_alloc(sizeof(nng_socket)); + // exchange sock is an embedded Req/Rep sock for MQTT Stream if ((rv = nng_exchange_client_open(node->sock)) != 0) { log_error("nng_exchange_client_open failed %d", rv); } else { + // nng_socket_set_ms(*node->sock, NNG_OPT_RECVMAXSZ, 0xFFFFFFFFu); nng_socket_set_ptr(*node->sock, NNG_OPT_EXCHANGE_BIND, (void *)node); } log_debug("exchange %d init finished!\n", i); @@ -1085,12 +1087,13 @@ broker(conf *nanomq_conf) hook_exchange_init(nanomq_conf, num_work); // create exchange senders in hook hook_exchange_sender_init(nanomq_conf, works, num_work); - // TODO expose this - char url_zzz[128] = "tcp://127.0.0.1:10000"; + // TODO support multiple MQ coexisitence nng_socket *mq_sock = nanomq_conf->exchange.nodes[0]->sock; - if ((rv = nano_listen(*mq_sock, url_zzz, NULL, 0, nanomq_conf)) != 0) { + nng_listener mq_listener; + if ((rv = nano_listen(*mq_sock, nanomq_conf->exchange.exchange_url, &mq_listener, 0, nanomq_conf)) != 0) { NANO_NNG_FATAL("broker nng_listen", rv); } + nng_listener_set_size(mq_listener, NNG_OPT_RECVMAXSZ, 0xFFFFFFFFu); } if (nanomq_conf->enable) { diff --git a/nng b/nng index 3aa8435fb..fd186e107 160000 --- a/nng +++ b/nng @@ -1 +1 @@ -Subproject commit 3aa8435fbdc4b1becc9ae0e3b22f08b5617ab4cb +Subproject commit fd186e107dac2c7134845df429c9a836ee9e4e9c