From 9549e023f3f00b0d57dca620441e65d34c21a6bd Mon Sep 17 00:00:00 2001 From: jaylin Date: Wed, 20 Sep 2023 19:37:13 +0800 Subject: [PATCH 1/3] * NEW [bridge] support multi-streaming of QUIC Signed-off-by: jaylin --- nanomq/bridge.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nanomq/bridge.c b/nanomq/bridge.c index 15c2b2bc1..3f5b6d81b 100644 --- a/nanomq/bridge.c +++ b/nanomq/bridge.c @@ -840,7 +840,9 @@ bridge_quic_connect_cb(nng_pipe p, nng_pipe_ev ev, void *arg) nng_mqtt_topic_qos_array_create(1); nng_mqtt_topic_qos_array_set(topic_qos, 0, param->config->sub_list[i]->remote_topic, - param->config->sub_list[i]->qos, 1, 0, 0); + param->config->sub_list[i]->qos, 1, + param->config->sub_list[i]->retain_as_published, + param->config->sub_list[i]->retain_handling); log_info("Quic bridge client subscribe to " "topic (QoS %d)%s.", param->config->sub_list[i]->qos, @@ -961,9 +963,12 @@ bridge_quic_client(nng_socket *sock, conf *config, conf_bridge_node *node, bridg // set backoff param to 24s nng_duration duration = 240000; nng_dialer_set(dialer, NNG_OPT_MQTT_RECONNECT_BACKOFF_MAX, &duration, sizeof(nng_duration)); - // nng_dialer_set_bool(dialer, NNG_OPT_QUIC_ENABLE_0RTT, true); - // nng_dialer_set_bool(dialer, NNG_OPT_QUIC_ENABLE_MULTISTREAM, true); - + nng_dialer_set_bool(dialer, NNG_OPT_QUIC_ENABLE_0RTT, true); + if (node->multi_stream) { + //better remove the option from dialer + nng_dialer_set_bool(dialer, NNG_OPT_QUIC_ENABLE_MULTISTREAM, true); + nng_socket_set_bool(*sock, NNG_OPT_QUIC_ENABLE_MULTISTREAM, true); + } bridge_arg->client = nng_mqtt_client_alloc(*sock, &send_callback, true); // create a CONNECT message From 1a29055243e41f868c19628c0a4cc1e8323f7b4a Mon Sep 17 00:00:00 2001 From: Moi Ran Date: Thu, 21 Sep 2023 12:32:35 +0800 Subject: [PATCH 2/3] * ADD [quic] Support mqttv5 over quic client Signed-off-by: Moi Ran --- nanomq/bridge.c | 9 +++------ nanomq_cli/client.c | 8 +++----- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/nanomq/bridge.c b/nanomq/bridge.c index 3f5b6d81b..0139738cb 100644 --- a/nanomq/bridge.c +++ b/nanomq/bridge.c @@ -514,13 +514,10 @@ hybrid_quic_client(bridge_param *bridge_arg) // keepalive here is for QUIC only if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) { - log_info("MQTT V5 OVER QUIC is not supported yet."); - /* - if ((rv = nng_mqtt_quic_client_open(new)) != 0) { - nng_fatal("nng_mqtt_quic_client_open", rv); + if ((rv = nng_mqttv5_quic_client_open(new)) != 0) { + nng_fatal("nng_mqttv5_quic_client_open", rv); return rv; } - */ } else { if ((rv = nng_mqtt_quic_client_open(new)) != 0) { nng_fatal("nng_mqtt_quic_client_open", rv); @@ -943,7 +940,7 @@ bridge_quic_client(nng_socket *sock, conf *config, conf_bridge_node *node, bridg log_debug("Quic bridge service start.\n"); if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) { - if ((rv = nng_mqtt_quic_client_open(sock)) != 0) { + if ((rv = nng_mqttv5_quic_client_open(sock)) != 0) { nng_fatal("nng_mqttv5_quic_client_open", rv); return rv; } diff --git a/nanomq_cli/client.c b/nanomq_cli/client.c index 984a11899..d0547fecf 100644 --- a/nanomq_cli/client.c +++ b/nanomq_cli/client.c @@ -1462,11 +1462,9 @@ create_client(nng_socket *sock, struct work **works, size_t id, size_t nwork, if (isquic) { #if defined(SUPP_QUIC) - if (param->opts->version == MQTT_PROTOCOL_VERSION_v5) { - console("MQTT V5 OVER QUIC is not supported yet"); - return; - } - rv = nng_mqtt_quic_client_open(sock); + rv = param->opts->version == MQTT_PROTOCOL_VERSION_v5 + ? nng_mqttv5_quic_client_open(sock) + : nng_mqtt_quic_client_open(sock); if (rv != 0) { nng_fatal("nng_socket", rv); } From 8be6240e6ece5187f79db99bae27eceab22104b4 Mon Sep 17 00:00:00 2001 From: Moi Ran Date: Thu, 21 Sep 2023 19:52:29 +0800 Subject: [PATCH 3/3] * MDF [client] Update quic client callback set function Signed-off-by: Moi Ran --- nanomq_cli/client.c | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/nanomq_cli/client.c b/nanomq_cli/client.c index d0547fecf..909448547 100644 --- a/nanomq_cli/client.c +++ b/nanomq_cli/client.c @@ -1453,6 +1453,24 @@ disconnect_cb(nng_pipe p, nng_pipe_ev ev, void *arg) console("disconnected reason : %d\n", reason); } +static int +quic_connect_cb(void *rmsg, void *arg) +{ + struct connect_param *param = arg; + int reason = 0; + + console("%s: %s connect\n", __FUNCTION__, param->opts->url); + + return 0; +} + +static int +quic_disconnect_cb(void *rmsg, void *arg) +{ + console("bridge client disconnected!\n"); + return 0; +} + static void create_client(nng_socket *sock, struct work **works, size_t id, size_t nwork, struct connect_param *param, bool isquic) @@ -1468,6 +1486,13 @@ create_client(nng_socket *sock, struct work **works, size_t id, size_t nwork, if (rv != 0) { nng_fatal("nng_socket", rv); } + if (param->opts->version == MQTT_PROTOCOL_VERSION_v5) { + nng_mqttv5_quic_set_connect_cb(sock, quic_connect_cb, param); + nng_mqttv5_quic_set_disconnect_cb(sock, quic_disconnect_cb, param); + } else { + nng_mqtt_quic_set_connect_cb(sock, quic_connect_cb, param); + nng_mqtt_quic_set_disconnect_cb(sock, quic_disconnect_cb, param); + } #endif } else { rv = param->opts->version == MQTT_PROTOCOL_VERSION_v5 @@ -1476,6 +1501,8 @@ create_client(nng_socket *sock, struct work **works, size_t id, size_t nwork, if (rv != 0) { nng_fatal("nng_socket", rv); } + nng_mqtt_set_connect_cb(*sock, connect_cb, param); + nng_mqtt_set_disconnect_cb(*sock, disconnect_cb, conn_msg); } for (size_t i = 0; i < opts->parallel; i++) { @@ -1506,9 +1533,6 @@ create_client(nng_socket *sock, struct work **works, size_t id, size_t nwork, param->opts = opts; param->id = id; - nng_mqtt_set_connect_cb(*sock, connect_cb, param); - nng_mqtt_set_disconnect_cb(*sock, disconnect_cb, conn_msg); - if ((rv = nng_dialer_start(dialer, NNG_FLAG_ALLOC)) != 0) { nng_fatal("nng_dialer_start", rv); }