Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose exchange url. read it from env or conf #1662

Merged
merged 5 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en_US/config-description/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ exchange_client.mq1 {
# # ring buffer name
name = "ringbus",
# # max length of ring buffer (msg count)
cap = 1000
cap = 1000,
# # 2: RB_FULL_RETURN: When the ringbus is full, the data in the ringbus is taken out and returned to the aio
fullOp = 2
}
Expand Down
4 changes: 2 additions & 2 deletions docs/en_US/mqtt-stream/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exchange_client.mq1 {
# # ring buffer name
name = "ringbus",
# # max length of ring buffer (msg count)
cap = 1000
cap = 1000,
fullOp = 3
}
}
Expand Down Expand Up @@ -71,7 +71,7 @@ exchange_client.mq1 {
# # ring buffer name
name = "ringbus",
# # max length of ring buffer (msg count)
cap = 10000
cap = 10000,
fullOp = 3
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/zh_CN/config-description/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ exchange_client.mq1 {
# # ring buffer name
name = "ringbus",
# # max length of ring buffer (msg count)
cap = 1000
cap = 1000,
# # 2: RB_FULL_RETURN: When the ringbus is full, the data in the ringbus is taken out and returned to the aio
fullOp = 2
}
Expand Down
4 changes: 2 additions & 2 deletions docs/zh_CN/mqtt-stream/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exchange_client.mq1 {
# # ring buffer name
name = "ringbus",
# # max length of ring buffer (msg count)
cap = 1000
cap = 1000,
fullOp = 3
}
}
Expand Down Expand Up @@ -71,7 +71,7 @@ exchange_client.mq1 {
# # ring buffer name
name = "ringbus",
# # max length of ring buffer (msg count)
cap = 10000
cap = 10000,
fullOp = 3
}
}
Expand Down
2 changes: 2 additions & 0 deletions etc/nanomq_example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,8 @@ rules.mysql.mysql_rule_db {
# #====================================================================
# # Initalize multiple MQ exchanger by giving them different name (mq1)
exchange_client.mq1 {
# # Currently NanoMQ only support one MQ object. URL shall be exactly same.
exchange_url = "tcp://127.0.0.1:10000"
# # exchanges contains multiple MQ exchanger
exchange {
# # MQTT Topic for filtering messages and saving to queue
Expand Down
9 changes: 6 additions & 3 deletions nanomq/apps/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -965,9 +965,11 @@
continue;
}
node->sock = (nng_socket *) nng_alloc(sizeof(nng_socket));
// exchange sock is an embedded Req/Rep sock for MQTT Stream
if ((rv = nng_exchange_client_open(node->sock)) != 0) {
log_error("nng_exchange_client_open failed %d", rv);
} else {
// nng_socket_set_ms(*node->sock, NNG_OPT_RECVMAXSZ, 0xFFFFFFFFu);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
nng_socket_set_ptr(*node->sock, NNG_OPT_EXCHANGE_BIND, (void *)node);
}
log_debug("exchange %d init finished!\n", i);
Expand Down Expand Up @@ -1085,12 +1087,13 @@
hook_exchange_init(nanomq_conf, num_work);
// create exchange senders in hook
hook_exchange_sender_init(nanomq_conf, works, num_work);
// TODO expose this
char url_zzz[128] = "tcp://127.0.0.1:10000";
// TODO support multiple MQ coexisitence
nng_socket *mq_sock = nanomq_conf->exchange.nodes[0]->sock;
if ((rv = nano_listen(*mq_sock, url_zzz, NULL, 0, nanomq_conf)) != 0) {
nng_listener mq_listener;
if ((rv = nano_listen(*mq_sock, nanomq_conf->exchange.exchange_url, &mq_listener, 0, nanomq_conf)) != 0) {
NANO_NNG_FATAL("broker nng_listen", rv);
}
nng_listener_set_size(mq_listener, NNG_OPT_RECVMAXSZ, 0xFFFFFFFFu);
}

if (nanomq_conf->enable) {
Expand Down