diff --git a/include/nng/mqtt/mqtt_quic.h b/include/nng/mqtt/mqtt_quic.h index 87ae75a2f..8de55ed2b 100644 --- a/include/nng/mqtt/mqtt_quic.h +++ b/include/nng/mqtt/mqtt_quic.h @@ -19,6 +19,7 @@ extern "C" { #endif #if defined(SUPP_QUIC) NNG_DECL int nng_mqtt_quic_client_open(nng_socket *, const char *url); +NNG_DECL int nng_mqtt_quic_client_close(nng_socket *); NNG_DECL int nng_mqtt_quic_open_conf( nng_socket *sock, const char *url, void *node); NNG_DECL int nng_mqtt_quic_set_connect_cb( diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 2aa61d4a9..1cac9390e 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -1001,9 +1001,9 @@ mqtt_ctx_send(void *arg, nni_aio *aio) static void mqtt_ctx_recv(void *arg, nni_aio *aio) { - mqtt_ctx_t * ctx = arg; + mqtt_ctx_t *ctx = arg; mqtt_sock_t *s = ctx->mqtt_sock; - mqtt_pipe_t *p = s->mqtt_pipe; + mqtt_pipe_t *p; nni_msg *msg = NULL; if (nni_aio_begin(aio) != 0) { @@ -1011,6 +1011,7 @@ mqtt_ctx_recv(void *arg, nni_aio *aio) } nni_mtx_lock(&s->mtx); + p = s->mqtt_pipe; if (p == NULL) { goto wait; } diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index c9ae2b7b1..8911433d6 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -89,6 +89,8 @@ struct mqtt_sock_s { nni_list recv_queue; // aio pending to receive nni_list send_queue; // aio pending to send + void *qsock; // The matrix of quic sock. Which only be allow to use when disconnect. + // Or lock first. nni_lmq send_messages; // send messages queue (only for major stream) nni_lmq *ack_lmq; nni_id_map *streams; // pipes, only effective in multi-stream mode @@ -1304,6 +1306,8 @@ mqtt_quic_sock_close(void *arg) nni_aio *aio; mqtt_sock_t *s = arg; + nni_atomic_set_bool(&s->closed, true); + nni_aio_stop(&s->time_aio); nni_aio_close(&s->time_aio); @@ -1311,9 +1315,10 @@ mqtt_quic_sock_close(void *arg) // Pipe was closed. just push an error back to the // entire socket, because we only have one pipe nni_list_remove(&s->recv_queue, aio); - nni_aio_finish_error(aio, NNG_ECONNABORTED); + nni_aio_finish_error(aio, NNG_ECLOSED); } nni_lmq_flush(&s->send_messages); + nni_sock_rele(s->nsock); } @@ -1770,6 +1775,7 @@ mqtt_quic_ctx_recv(void *arg, nni_aio *aio) } nni_mtx_lock(&s->mtx); + // TODO Should socket is closed be check first? if (p == NULL) { goto wait; } @@ -1886,6 +1892,9 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node) { int rv = 0; nni_sock *nsock = NULL; + void *qsock = NULL; + + mqtt_sock_t *msock = NULL; // Quic settings if ((rv = nni_proto_open(sock, &mqtt_msquic_proto)) == 0) { @@ -1896,7 +1905,11 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node) quic_open(); quic_proto_open(&mqtt_msquic_proto); quic_proto_set_bridge_conf(node); - rv = quic_connect_ipv4(url, nsock, NULL); + rv = quic_connect_ipv4(url, nsock, NULL, &qsock); + if (rv == 0) { + msock = nni_sock_proto_data(nsock); + msock->qsock = qsock; + } } else { rv = -1; } @@ -1905,6 +1918,33 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node) return rv; } +int +nng_mqtt_quic_client_close(nng_socket *sock) +{ + nni_sock *nsock = NULL; + mqtt_sock_t *s= NULL; + + nni_sock_find(&nsock, sock->id); + if (nsock) { + s= nni_sock_proto_data(nsock); + if (!s) + return -1; + if (s->pipe && s->pipe->qpipe) { + quic_disconnect(s->qsock, s->pipe->qpipe); + } else { + quic_disconnect(s->qsock, NULL); + } + + // nni_sock_close(nsock); + nni_sock_rele(nsock); + + return 0; + } + + + return -2; +} + /** * init an AIO for Acknoledgement message only, in order to make QoS/connect truly asychrounous * For QoS 0 message, we do not care the result of sending diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index b5c2a924a..d814c098f 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -49,6 +49,7 @@ struct quic_sock_s { HQUIC qconn; // QUIC connection nni_sock *sock; void *pipe; //main mqtt_pipe + int closed; // True when actively close nni_mtx mtx; // for reconnect nni_aio close_aio; @@ -304,6 +305,7 @@ quic_sock_init(quic_sock_t *qsock) qsock->qconn = NULL; qsock->sock = NULL; qsock->pipe = NULL; + qsock->closed = 0; nni_mtx_init(&qsock->mtx); nni_aio_init(&qsock->close_aio, quic_sock_close_cb, qsock); @@ -318,7 +320,8 @@ quic_sock_init(quic_sock_t *qsock) static void quic_sock_fini(quic_sock_t *qsock) { - nni_mtx_fini(&qsock->mtx); + // nni_mtx_fini(&qsock->mtx); + qsock->closed = 1; if (qsock->url_s) nng_url_free(qsock->url_s); @@ -613,7 +616,7 @@ quic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, pipe_ops->pipe_fini(qsock->pipe); qsock->pipe = NULL; // No bridge_node if NOT bridge mode - if (bridge_node && bridge_node->hybrid) { + if (bridge_node && (bridge_node->hybrid || qsock->closed)) { nni_mtx_unlock(&qsock->mtx); break; } @@ -685,23 +688,25 @@ quic_disconnect(void *qsock, void *qpipe) if (!qsock) { return -1; } - if (qstrm->closed == true || qstrm->stream != NULL) { - return -1; + if (qstrm != NULL) { + if (qstrm->closed == true || qstrm->stream == NULL) + return -2; + MsQuic->StreamShutdown( + qstrm->stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, NNG_ECONNSHUT); } - MsQuic->StreamClose(qstrm->stream); - MsQuic->StreamShutdown( - qstrm->stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, NNG_ECONNSHUT); nni_mtx_lock(&qs->mtx); MsQuic->ConnectionShutdown( qs->qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, NNG_ECLOSED); nni_mtx_unlock(&qs->mtx); + quic_sock_fini(qs); + return 0; } int -quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index) +quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index, void **qsockp) { // Load the client configuration if (!quic_load_config(bridge_node)) { @@ -778,6 +783,8 @@ quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index) // Here mutex should be unnecessary. qsock->qconn = conn; + *qsockp = qsock; + // // Start/ReStart the nng pipe // const nni_proto_pipe_ops *pipe_ops = g_quic_proto->proto_pipe_ops; // if ((qsock->pipe = nng_alloc(pipe_ops->pipe_size)) == NULL) { @@ -1188,7 +1195,7 @@ quic_pipe_recv_cb(void *arg) if (qstrm->rrlen > 0) if (!nni_list_empty(&qstrm->recvq)) - nni_aio_finish_sync(&qstrm->rraio, 0, 0); + nni_aio_finish(&qstrm->rraio, 0, 0); memmove(qstrm->rrbuf, qstrm->rrbuf+qstrm->rrpos, qstrm->rrlen); qstrm->rrpos = 0; diff --git a/src/supplemental/quic/quic_api.h b/src/supplemental/quic/quic_api.h index 35026e679..f12c3a966 100644 --- a/src/supplemental/quic/quic_api.h +++ b/src/supplemental/quic/quic_api.h @@ -29,7 +29,7 @@ extern void quic_proto_set_bridge_conf(void *arg); // Establish a quic connection to target url. Return 0 if success. // And the handle of connection(qsock) would pass to callback .pipe_init(,qsock,) // Or the connection is failed in eastablishing. -extern int quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index); +extern int quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index, void **qsockp); // Close connection extern int quic_disconnect(void *qsock, void *qpipe);