From a050d4807ad958b7100f220028de3e4bc4ed422e Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 27 Apr 2023 16:14:43 +0800 Subject: [PATCH 1/8] * NEW [bridge] Add quic_client_close API. --- include/nng/mqtt/mqtt_quic.h | 1 + src/mqtt/protocol/mqtt/mqtt_quic.c | 38 +++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/include/nng/mqtt/mqtt_quic.h b/include/nng/mqtt/mqtt_quic.h index 87ae75a2f..8de55ed2b 100644 --- a/include/nng/mqtt/mqtt_quic.h +++ b/include/nng/mqtt/mqtt_quic.h @@ -19,6 +19,7 @@ extern "C" { #endif #if defined(SUPP_QUIC) NNG_DECL int nng_mqtt_quic_client_open(nng_socket *, const char *url); +NNG_DECL int nng_mqtt_quic_client_close(nng_socket *); NNG_DECL int nng_mqtt_quic_open_conf( nng_socket *sock, const char *url, void *node); NNG_DECL int nng_mqtt_quic_set_connect_cb( diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index c9ae2b7b1..b051d28bd 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -89,6 +89,8 @@ struct mqtt_sock_s { nni_list recv_queue; // aio pending to receive nni_list send_queue; // aio pending to send + void *qsock; // The matrix of quic sock. Which only be allow to use when disconnect. + // Or lock first. nni_lmq send_messages; // send messages queue (only for major stream) nni_lmq *ack_lmq; nni_id_map *streams; // pipes, only effective in multi-stream mode @@ -1886,6 +1888,9 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node) { int rv = 0; nni_sock *nsock = NULL; + void *qsock = NULL; + + mqtt_sock_t *msock = NULL; // Quic settings if ((rv = nni_proto_open(sock, &mqtt_msquic_proto)) == 0) { @@ -1896,7 +1901,11 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node) quic_open(); quic_proto_open(&mqtt_msquic_proto); quic_proto_set_bridge_conf(node); - rv = quic_connect_ipv4(url, nsock, NULL); + rv = quic_connect_ipv4(url, nsock, NULL, &qsock); + if (rv == 0) { + msock = nni_sock_proto_data(nsock); + msock->qsock = qsock; + } } else { rv = -1; } @@ -1905,6 +1914,33 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node) return rv; } +int +nng_mqtt_quic_client_close(nng_socket *sock) +{ + nni_sock *nsock = NULL; + mqtt_sock_t *s= NULL; + + nni_sock_find(&nsock, sock->id); + if (nsock) { + s= nni_sock_proto_data(nsock); + if (!s) + return -1; + if (s->pipe && s->pipe->qpipe) { + quic_disconnect(s->qsock, s->pipe->qpipe); + } else { + quic_disconnect(s->qsock, NULL); + } + + // nni_sock_close(nsock); + nni_sock_rele(nsock); + + return 0; + } + + + return -2; +} + /** * init an AIO for Acknoledgement message only, in order to make QoS/connect truly asychrounous * For QoS 0 message, we do not care the result of sending From 012995c01a77e95f96721980bd2d84c912e8a15f Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 27 Apr 2023 16:16:08 +0800 Subject: [PATCH 2/8] * NEW [quic_api] Fix the error in quic_disconnect. --- src/supplemental/quic/quic_api.c | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index b5c2a924a..033396794 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -49,6 +49,7 @@ struct quic_sock_s { HQUIC qconn; // QUIC connection nni_sock *sock; void *pipe; //main mqtt_pipe + int closed; // True when actively close nni_mtx mtx; // for reconnect nni_aio close_aio; @@ -304,6 +305,7 @@ quic_sock_init(quic_sock_t *qsock) qsock->qconn = NULL; qsock->sock = NULL; qsock->pipe = NULL; + qsock->closed = 0; nni_mtx_init(&qsock->mtx); nni_aio_init(&qsock->close_aio, quic_sock_close_cb, qsock); @@ -318,7 +320,8 @@ quic_sock_init(quic_sock_t *qsock) static void quic_sock_fini(quic_sock_t *qsock) { - nni_mtx_fini(&qsock->mtx); + // nni_mtx_fini(&qsock->mtx); + qsock->closed = 1; if (qsock->url_s) nng_url_free(qsock->url_s); @@ -613,7 +616,7 @@ quic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, pipe_ops->pipe_fini(qsock->pipe); qsock->pipe = NULL; // No bridge_node if NOT bridge mode - if (bridge_node && bridge_node->hybrid) { + if (bridge_node && (bridge_node->hybrid || qsock->closed)) { nni_mtx_unlock(&qsock->mtx); break; } @@ -685,23 +688,25 @@ quic_disconnect(void *qsock, void *qpipe) if (!qsock) { return -1; } - if (qstrm->closed == true || qstrm->stream != NULL) { - return -1; + if (qstrm != NULL) { + if (qstrm->closed == true || qstrm->stream == NULL) + return -2; + MsQuic->StreamShutdown( + qstrm->stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, NNG_ECONNSHUT); } - MsQuic->StreamClose(qstrm->stream); - MsQuic->StreamShutdown( - qstrm->stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, NNG_ECONNSHUT); nni_mtx_lock(&qs->mtx); MsQuic->ConnectionShutdown( qs->qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, NNG_ECLOSED); nni_mtx_unlock(&qs->mtx); + quic_sock_fini(qs); + return 0; } int -quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index) +quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index, void **qsockp) { // Load the client configuration if (!quic_load_config(bridge_node)) { @@ -778,6 +783,8 @@ quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index) // Here mutex should be unnecessary. qsock->qconn = conn; + *qsockp = qsock; + // // Start/ReStart the nng pipe // const nni_proto_pipe_ops *pipe_ops = g_quic_proto->proto_pipe_ops; // if ((qsock->pipe = nng_alloc(pipe_ops->pipe_size)) == NULL) { From c75a46961c5eca09abd4ffab5df1d9ad4647c71f Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 27 Apr 2023 16:17:35 +0800 Subject: [PATCH 3/8] * FIX [quic_api] Fix the error of calling stack overflow when receive msg. --- src/supplemental/quic/quic_api.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index 033396794..711d38e96 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -1137,7 +1137,7 @@ quic_pipe_recv_cb(void *arg) nni_mtx_unlock(&qstrm->mtx); // Wait to be re-schedule if (!nni_list_empty(&qstrm->recvq)) { - nni_aio_finish_sync(&qstrm->rraio, 0, 0); + nni_aio_finish(&qstrm->rraio, 0, 0); } qdebug("3after rxlen %d rwlen %d.\n", qstrm->rxlen, qstrm->rwlen); return; @@ -1195,7 +1195,7 @@ quic_pipe_recv_cb(void *arg) if (qstrm->rrlen > 0) if (!nni_list_empty(&qstrm->recvq)) - nni_aio_finish_sync(&qstrm->rraio, 0, 0); + nni_aio_finish(&qstrm->rraio, 0, 0); memmove(qstrm->rrbuf, qstrm->rrbuf+qstrm->rrpos, qstrm->rrlen); qstrm->rrpos = 0; From a0e50999fbc2e9fb3917f73c57582b3eabbcbb19 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 27 Apr 2023 16:18:46 +0800 Subject: [PATCH 4/8] * NEW [quic_api] Add quic_disconnect API to header file. --- src/supplemental/quic/quic_api.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/supplemental/quic/quic_api.h b/src/supplemental/quic/quic_api.h index 35026e679..f12c3a966 100644 --- a/src/supplemental/quic/quic_api.h +++ b/src/supplemental/quic/quic_api.h @@ -29,7 +29,7 @@ extern void quic_proto_set_bridge_conf(void *arg); // Establish a quic connection to target url. Return 0 if success. // And the handle of connection(qsock) would pass to callback .pipe_init(,qsock,) // Or the connection is failed in eastablishing. -extern int quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index); +extern int quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index, void **qsockp); // Close connection extern int quic_disconnect(void *qsock, void *qpipe); From 79a2678d16d8cb1ad07f9edbcf1743fdcb5ed2ff Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 27 Apr 2023 16:37:20 +0800 Subject: [PATCH 5/8] * FIX [mqtt_quic] Change the error code. --- src/mqtt/protocol/mqtt/mqtt_quic.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index b051d28bd..dd902dde8 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -1313,9 +1313,10 @@ mqtt_quic_sock_close(void *arg) // Pipe was closed. just push an error back to the // entire socket, because we only have one pipe nni_list_remove(&s->recv_queue, aio); - nni_aio_finish_error(aio, NNG_ECONNABORTED); + nni_aio_finish_error(aio, NNG_ECLOSED); } nni_lmq_flush(&s->send_messages); + nni_sock_rele(s->nsock); } From 3623e6e71e083902a5437c990fd1720358832e19 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 27 Apr 2023 18:03:48 +0800 Subject: [PATCH 6/8] * FIX [quic_api] Fix the wrong aio finish way in quic_api. --- src/supplemental/quic/quic_api.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index 711d38e96..d814c098f 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -1137,7 +1137,7 @@ quic_pipe_recv_cb(void *arg) nni_mtx_unlock(&qstrm->mtx); // Wait to be re-schedule if (!nni_list_empty(&qstrm->recvq)) { - nni_aio_finish(&qstrm->rraio, 0, 0); + nni_aio_finish_sync(&qstrm->rraio, 0, 0); } qdebug("3after rxlen %d rwlen %d.\n", qstrm->rxlen, qstrm->rwlen); return; From 248a398ad66bc5c50f5c83bbdb82dba5f30fa9a0 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Fri, 28 Apr 2023 17:59:28 +0800 Subject: [PATCH 7/8] * NEW [mqtt_quic] set s->closed to true when socket closed. --- src/mqtt/protocol/mqtt/mqtt_quic.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index dd902dde8..8911433d6 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -1306,6 +1306,8 @@ mqtt_quic_sock_close(void *arg) nni_aio *aio; mqtt_sock_t *s = arg; + nni_atomic_set_bool(&s->closed, true); + nni_aio_stop(&s->time_aio); nni_aio_close(&s->time_aio); @@ -1773,6 +1775,7 @@ mqtt_quic_ctx_recv(void *arg, nni_aio *aio) } nni_mtx_lock(&s->mtx); + // TODO Should socket is closed be check first? if (p == NULL) { goto wait; } From 67a9628c84787542a13c24554926cf7c4982bfe5 Mon Sep 17 00:00:00 2001 From: Jaylin Date: Sun, 30 Apr 2023 09:48:42 +0800 Subject: [PATCH 8/8] * FIX [mqtt_client] fix thread sanitizer complain --- src/mqtt/protocol/mqtt/mqtt_client.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 2aa61d4a9..1cac9390e 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -1001,9 +1001,9 @@ mqtt_ctx_send(void *arg, nni_aio *aio) static void mqtt_ctx_recv(void *arg, nni_aio *aio) { - mqtt_ctx_t * ctx = arg; + mqtt_ctx_t *ctx = arg; mqtt_sock_t *s = ctx->mqtt_sock; - mqtt_pipe_t *p = s->mqtt_pipe; + mqtt_pipe_t *p; nni_msg *msg = NULL; if (nni_aio_begin(aio) != 0) { @@ -1011,6 +1011,7 @@ mqtt_ctx_recv(void *arg, nni_aio *aio) } nni_mtx_lock(&s->mtx); + p = s->mqtt_pipe; if (p == NULL) { goto wait; }