Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add quic_disconnect interface #542

Merged
merged 8 commits into from
Apr 30, 2023
1 change: 1 addition & 0 deletions include/nng/mqtt/mqtt_quic.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extern "C" {
#endif
#if defined(SUPP_QUIC)
NNG_DECL int nng_mqtt_quic_client_open(nng_socket *, const char *url);
NNG_DECL int nng_mqtt_quic_client_close(nng_socket *);
NNG_DECL int nng_mqtt_quic_open_conf(
nng_socket *sock, const char *url, void *node);
NNG_DECL int nng_mqtt_quic_set_connect_cb(
Expand Down
5 changes: 3 additions & 2 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1001,16 +1001,17 @@ mqtt_ctx_send(void *arg, nni_aio *aio)
static void
mqtt_ctx_recv(void *arg, nni_aio *aio)
{
mqtt_ctx_t * ctx = arg;
mqtt_ctx_t *ctx = arg;
mqtt_sock_t *s = ctx->mqtt_sock;
mqtt_pipe_t *p = s->mqtt_pipe;
mqtt_pipe_t *p;
nni_msg *msg = NULL;

if (nni_aio_begin(aio) != 0) {
return;
}

nni_mtx_lock(&s->mtx);
p = s->mqtt_pipe;
if (p == NULL) {
goto wait;
}
Expand Down
44 changes: 42 additions & 2 deletions src/mqtt/protocol/mqtt/mqtt_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ struct mqtt_sock_s {
nni_list recv_queue; // aio pending to receive
nni_list send_queue; // aio pending to send

void *qsock; // The matrix of quic sock. Which only be allow to use when disconnect.
// Or lock first.
nni_lmq send_messages; // send messages queue (only for major stream)
nni_lmq *ack_lmq;
nni_id_map *streams; // pipes, only effective in multi-stream mode
Expand Down Expand Up @@ -1304,16 +1306,19 @@ mqtt_quic_sock_close(void *arg)
nni_aio *aio;
mqtt_sock_t *s = arg;

nni_atomic_set_bool(&s->closed, true);

nni_aio_stop(&s->time_aio);
nni_aio_close(&s->time_aio);

while ((aio = nni_list_first(&s->recv_queue)) != NULL) {
// Pipe was closed. just push an error back to the
// entire socket, because we only have one pipe
nni_list_remove(&s->recv_queue, aio);
nni_aio_finish_error(aio, NNG_ECONNABORTED);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
nni_lmq_flush(&s->send_messages);

nni_sock_rele(s->nsock);
}

Expand Down Expand Up @@ -1770,6 +1775,7 @@ mqtt_quic_ctx_recv(void *arg, nni_aio *aio)
}

nni_mtx_lock(&s->mtx);
// TODO Should socket is closed be check first?
if (p == NULL) {
goto wait;
}
Expand Down Expand Up @@ -1886,6 +1892,9 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node)
{
int rv = 0;
nni_sock *nsock = NULL;
void *qsock = NULL;

mqtt_sock_t *msock = NULL;

// Quic settings
if ((rv = nni_proto_open(sock, &mqtt_msquic_proto)) == 0) {
Expand All @@ -1896,7 +1905,11 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node)
quic_open();
quic_proto_open(&mqtt_msquic_proto);
quic_proto_set_bridge_conf(node);
rv = quic_connect_ipv4(url, nsock, NULL);
rv = quic_connect_ipv4(url, nsock, NULL, &qsock);
if (rv == 0) {
msock = nni_sock_proto_data(nsock);
msock->qsock = qsock;
}
} else {
rv = -1;
}
Expand All @@ -1905,6 +1918,33 @@ nng_mqtt_quic_open_conf(nng_socket *sock, const char *url, void *node)
return rv;
}

int
nng_mqtt_quic_client_close(nng_socket *sock)
{
nni_sock *nsock = NULL;
mqtt_sock_t *s= NULL;

nni_sock_find(&nsock, sock->id);
if (nsock) {
s= nni_sock_proto_data(nsock);
if (!s)
return -1;
if (s->pipe && s->pipe->qpipe) {
quic_disconnect(s->qsock, s->pipe->qpipe);
} else {
quic_disconnect(s->qsock, NULL);
}

// nni_sock_close(nsock);
nni_sock_rele(nsock);

return 0;
}


return -2;
}

/**
* init an AIO for Acknoledgement message only, in order to make QoS/connect truly asychrounous
* For QoS 0 message, we do not care the result of sending
Expand Down
25 changes: 16 additions & 9 deletions src/supplemental/quic/quic_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct quic_sock_s {
HQUIC qconn; // QUIC connection
nni_sock *sock;
void *pipe; //main mqtt_pipe
int closed; // True when actively close

nni_mtx mtx; // for reconnect
nni_aio close_aio;
Expand Down Expand Up @@ -304,6 +305,7 @@ quic_sock_init(quic_sock_t *qsock)
qsock->qconn = NULL;
qsock->sock = NULL;
qsock->pipe = NULL;
qsock->closed = 0;

nni_mtx_init(&qsock->mtx);
nni_aio_init(&qsock->close_aio, quic_sock_close_cb, qsock);
Expand All @@ -318,7 +320,8 @@ quic_sock_init(quic_sock_t *qsock)
static void
quic_sock_fini(quic_sock_t *qsock)
{
nni_mtx_fini(&qsock->mtx);
// nni_mtx_fini(&qsock->mtx);
qsock->closed = 1;
if (qsock->url_s)
nng_url_free(qsock->url_s);

Expand Down Expand Up @@ -613,7 +616,7 @@ quic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context,
pipe_ops->pipe_fini(qsock->pipe);
qsock->pipe = NULL;
// No bridge_node if NOT bridge mode
if (bridge_node && bridge_node->hybrid) {
if (bridge_node && (bridge_node->hybrid || qsock->closed)) {
nni_mtx_unlock(&qsock->mtx);
break;
}
Expand Down Expand Up @@ -685,23 +688,25 @@ quic_disconnect(void *qsock, void *qpipe)
if (!qsock) {
return -1;
}
if (qstrm->closed == true || qstrm->stream != NULL) {
return -1;
if (qstrm != NULL) {
if (qstrm->closed == true || qstrm->stream == NULL)
return -2;
MsQuic->StreamShutdown(
qstrm->stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, NNG_ECONNSHUT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to close connection alone

}
MsQuic->StreamClose(qstrm->stream);
MsQuic->StreamShutdown(
qstrm->stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, NNG_ECONNSHUT);

nni_mtx_lock(&qs->mtx);
MsQuic->ConnectionShutdown(
qs->qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, NNG_ECLOSED);
nni_mtx_unlock(&qs->mtx);

quic_sock_fini(qs);

return 0;
}

int
quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index)
quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index, void **qsockp)
{
// Load the client configuration
if (!quic_load_config(bridge_node)) {
Expand Down Expand Up @@ -778,6 +783,8 @@ quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index)
// Here mutex should be unnecessary.
qsock->qconn = conn;

*qsockp = qsock;

// // Start/ReStart the nng pipe
// const nni_proto_pipe_ops *pipe_ops = g_quic_proto->proto_pipe_ops;
// if ((qsock->pipe = nng_alloc(pipe_ops->pipe_size)) == NULL) {
Expand Down Expand Up @@ -1188,7 +1195,7 @@ quic_pipe_recv_cb(void *arg)

if (qstrm->rrlen > 0)
if (!nni_list_empty(&qstrm->recvq))
nni_aio_finish_sync(&qstrm->rraio, 0, 0);
nni_aio_finish(&qstrm->rraio, 0, 0);

memmove(qstrm->rrbuf, qstrm->rrbuf+qstrm->rrpos, qstrm->rrlen);
qstrm->rrpos = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/supplemental/quic/quic_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ extern void quic_proto_set_bridge_conf(void *arg);
// Establish a quic connection to target url. Return 0 if success.
// And the handle of connection(qsock) would pass to callback .pipe_init(,qsock,)
// Or the connection is failed in eastablishing.
extern int quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index);
extern int quic_connect_ipv4(const char *url, nni_sock *sock, uint32_t *index, void **qsockp);
// Close connection
extern int quic_disconnect(void *qsock, void *qpipe);

Expand Down