Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

* NEW [bridge] support multi-streaming of QUIC #1452

Merged
merged 4 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 11 additions & 8 deletions nanomq/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,7 @@ hybrid_quic_client(bridge_param *bridge_arg)

// keepalive here is for QUIC only
if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
log_error("MQTT V5 OVER QUIC is not supported yet.");
if ((rv = nng_mqtt_quic_client_open(new)) != 0) {
if ((rv = nng_mqttv5_quic_client_open(new)) != 0) {
nng_fatal("nng_mqttv5_quic_client_open", rv);
return rv;
}
Expand Down Expand Up @@ -743,7 +742,9 @@ bridge_quic_connect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
nng_mqtt_topic_qos_array_create(1);
nng_mqtt_topic_qos_array_set(topic_qos, 0,
param->config->sub_list[i]->remote_topic,
param->config->sub_list[i]->qos, 1, 0, 0);
param->config->sub_list[i]->qos, 1,
param->config->sub_list[i]->retain_as_published,
param->config->sub_list[i]->retain_handling);
log_info("Quic bridge client subscribe to "
"topic (QoS %d)%s.",
param->config->sub_list[i]->qos,
Expand Down Expand Up @@ -844,8 +845,7 @@ bridge_quic_client(nng_socket *sock, conf *config, conf_bridge_node *node, bridg
log_debug("Quic bridge service start.\n");

if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
log_error("MQTT V5 OVER QUIC is not supported yet.");
if ((rv = nng_mqtt_quic_client_open(sock)) != 0) {
if ((rv = nng_mqttv5_quic_client_open(sock)) != 0) {
nng_fatal("nng_mqttv5_quic_client_open", rv);
return rv;
}
Expand All @@ -865,9 +865,12 @@ bridge_quic_client(nng_socket *sock, conf *config, conf_bridge_node *node, bridg
// set backoff param to 24s
nng_duration duration = 240000;
nng_dialer_set(dialer, NNG_OPT_MQTT_RECONNECT_BACKOFF_MAX, &duration, sizeof(nng_duration));
// nng_dialer_set_bool(dialer, NNG_OPT_QUIC_ENABLE_0RTT, true);
// nng_dialer_set_bool(dialer, NNG_OPT_QUIC_ENABLE_MULTISTREAM, true);

nng_dialer_set_bool(dialer, NNG_OPT_QUIC_ENABLE_0RTT, true);
if (node->multi_stream) {
//better remove the option from dialer
nng_dialer_set_bool(dialer, NNG_OPT_QUIC_ENABLE_MULTISTREAM, true);
nng_socket_set_bool(*sock, NNG_OPT_QUIC_ENABLE_MULTISTREAM, true);
}
bridge_arg->client = nng_mqtt_client_alloc(*sock, &send_callback, true);

// create a CONNECT message
Expand Down
38 changes: 30 additions & 8 deletions nanomq_cli/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,24 @@
console("disconnected reason : %d\n", reason);
}

static int
quic_connect_cb(void *rmsg, void *arg)

Check notice

Code scanning / CodeQL

Unused static function Note

Static function quic_connect_cb is unreachable
{
struct connect_param *param = arg;
int reason = 0;

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable reason is not used.

console("%s: %s connect\n", __FUNCTION__, param->opts->url);

return 0;
}

static int
quic_disconnect_cb(void *rmsg, void *arg)

Check notice

Code scanning / CodeQL

Unused static function Note

Static function quic_disconnect_cb is unreachable
{
console("bridge client disconnected!\n");
return 0;
}

static void
create_client(nng_socket *sock, struct work **works, size_t id, size_t nwork,
struct connect_param *param, bool isquic)
Expand All @@ -1462,14 +1480,19 @@

if (isquic) {
#if defined(SUPP_QUIC)
if (param->opts->version == MQTT_PROTOCOL_VERSION_v5) {
console("MQTT V5 OVER QUIC is not supported yet");
return;
}
rv = nng_mqtt_quic_client_open(sock);
rv = param->opts->version == MQTT_PROTOCOL_VERSION_v5
? nng_mqttv5_quic_client_open(sock)
: nng_mqtt_quic_client_open(sock);
if (rv != 0) {
nng_fatal("nng_socket", rv);
}
if (param->opts->version == MQTT_PROTOCOL_VERSION_v5) {
nng_mqttv5_quic_set_connect_cb(sock, quic_connect_cb, param);
nng_mqttv5_quic_set_disconnect_cb(sock, quic_disconnect_cb, param);
} else {
nng_mqtt_quic_set_connect_cb(sock, quic_connect_cb, param);
nng_mqtt_quic_set_disconnect_cb(sock, quic_disconnect_cb, param);
}
#endif
} else {
rv = param->opts->version == MQTT_PROTOCOL_VERSION_v5
Expand All @@ -1478,6 +1501,8 @@
if (rv != 0) {
nng_fatal("nng_socket", rv);
}
nng_mqtt_set_connect_cb(*sock, connect_cb, param);
nng_mqtt_set_disconnect_cb(*sock, disconnect_cb, conn_msg);
}

for (size_t i = 0; i < opts->parallel; i++) {
Expand Down Expand Up @@ -1508,9 +1533,6 @@
param->opts = opts;
param->id = id;

nng_mqtt_set_connect_cb(*sock, connect_cb, param);
nng_mqtt_set_disconnect_cb(*sock, disconnect_cb, conn_msg);

if ((rv = nng_dialer_start(dialer, NNG_FLAG_ALLOC)) != 0) {
nng_fatal("nng_dialer_start", rv);
}
Expand Down