From 56c4952508ad014fe3fd751b84323e8d0e316aa9 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 15 May 2024 02:41:42 -0400 Subject: [PATCH 1/4] * FIX [quic] Fix the double free when cancel stream send. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index d4755f100..b88bc6169 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -531,6 +531,10 @@ quic_stream_cb(int events, void *arg, int rc) nni_aio_list_remove(aio); QUIC_BUFFER *buf = nni_aio_get_input(aio, 0); free(buf); + // XXX #[FORCANCEL] + nni_msg *m; + if ((m = nni_aio_get_msg(aio)) != NULL) + nni_msg_free(m); if (rc != 0) nni_aio_finish_error(aio, rc); else @@ -745,6 +749,14 @@ quic_stream_dowrite(nni_quic_conn *c) } nni_aio_set_input(aio, 0, buf); + // When streamsend is triggered. The msg is used by nng and msquic. + // But if a cancelled operation happens right now. And nng cancel function + // will free the msg immediately. But this msg is still be used in msquic. + // So here we need a clone. And will free in #[FORCANCEL]. + nni_msg *m; + if ((m = nni_aio_get_msg(aio)) != NULL) + nni_msg_clone(m); + if (QUIC_FAILED(rv = MsQuic->StreamSend(c->qstrm, buf, naiov, QUIC_SEND_FLAG_NONE, NULL))) { log_error("Failed in StreamSend, 0x%x!", rv); @@ -1010,7 +1022,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, QUIC_BUFFER *buf = nni_aio_get_input(aio, 0); free(buf); Event->SEND_COMPLETE.ClientContext = NULL; - nni_msg *msg = nni_aio_get_msg(aio); + //nni_msg *msg = nni_aio_get_msg(aio); // free SUBSCRIBE/UNSUBSCRIBE QoS 1/2 PUBLISH msg here // nni_mqtt_packet_type t = nni_mqtt_msg_get_packet_type(msg); //nni_msg_free(msg); From 82e71116cd0f9fa0e76fbf7113369874489a5386 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 15 May 2024 03:59:17 -0400 Subject: [PATCH 2/4] * NEW [quic] Add refcnt for nni quic conn to avoid the data race in free it. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index b88bc6169..995746b94 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -61,6 +61,7 @@ struct nni_quic_conn { nni_list readq; nni_list writeq; bool closed; + nni_atomic_int ref; nni_mtx mtx; nni_aio * dial_aio; // nni_aio * qstrmaio; // Link to msquic_strm_cb @@ -103,6 +104,7 @@ static void quic_dialer_cb(void *arg); static void quic_stream_error(void *arg, int err); static void quic_stream_close(void *arg); static void quic_stream_dowrite(nni_quic_conn *c); +static void quic_stream_rele(nni_quic_conn *c); static QUIC_STATUS verify_peer_cert_tls(QUIC_CERTIFICATE* cert, QUIC_CERTIFICATE* chain, char *ca); @@ -565,11 +567,10 @@ quic_stream_cb(int events, void *arg, int rc) // case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: // case QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED: + quic_stream_error(arg, NNG_ECONNSHUT); // Marked it as closed, prevent explicit shutdown c->closed = true; - // It's the only place to free msquic stream - msquic_strm_fini(c->qstrm); - quic_stream_error(arg, NNG_ECONNSHUT); + quic_stream_rele(c); break; default: break; @@ -581,12 +582,20 @@ static void quic_stream_fini(void *arg) { nni_quic_conn *c = arg; - quic_stream_close(c); + NNI_FREE_STRUCT(c); +} +static void +quic_stream_rele(nni_quic_conn *c) +{ + quic_stream_close(c); if (c->dialer) { nni_msquic_quic_dialer_rele(c->dialer); } - NNI_FREE_STRUCT(c); + if (nni_atomic_dec_nv(&c->ref) != 0) { + return; + } + quic_stream_fini(c); } //static nni_reap_list quic_reap_list = { @@ -597,7 +606,7 @@ static void quic_stream_free(void *arg) { nni_quic_conn *c = arg; - quic_stream_fini(c); + quic_stream_rele(c); } // Notify upper layer that something happened. @@ -853,6 +862,9 @@ nni_msquic_quic_alloc(nni_quic_conn **cp, nni_quic_dialer *d) return (NNG_ENOMEM); } + nni_atomic_init(&c->ref); + nni_atomic_inc(&c->ref); + c->closed = false; c->dialer = d; @@ -1383,6 +1395,8 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) MsQuic->StreamClose(strm); goto error; } + // Stream is opened and started + nni_atomic_inc(&c->ref); // Not ready for receiving MsQuic->StreamReceiveSetEnabled(strm, FALSE); From b257f193f4d5a551f325b156b68f7513c9712727 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 15 May 2024 05:54:46 -0400 Subject: [PATCH 3/4] * FIX [trans/quic] Fix the max size error in setting. Signed-off-by: wanghaemq --- src/mqtt/transport/quic/mqtt_quic.c | 38 +++++++++++++++++------------ src/supplemental/quic/msquic_dial.c | 2 +- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/src/mqtt/transport/quic/mqtt_quic.c b/src/mqtt/transport/quic/mqtt_quic.c index 3284fa5d4..6c4e91329 100644 --- a/src/mqtt/transport/quic/mqtt_quic.c +++ b/src/mqtt/transport/quic/mqtt_quic.c @@ -211,7 +211,8 @@ mqtt_quictran_pipe_init(void *arg, nni_pipe *npipe) // nni_lmq_init(&p->rslmq, 10240); p->busy = false; p->closed = false; - p->packmax = 0xFFFF; + // set max value by default + p->packmax == 0 ? p->packmax = (uint32_t)0xFFFFFFFF : p->packmax; p->qosmax = 2; p->pingcnt = 1; if (p->ismain == true) { @@ -436,6 +437,7 @@ mqtt_quictran_pipe_nego_cb(void *arg) goto mqtt_error; } else { p->packmax = data->p_value.u32; + log_info("Set max packet size as %ld", p->packmax); } } data = property_get_value(ep->property, PUBLISH_MAXIMUM_QOS); @@ -871,11 +873,12 @@ mqtt_quictran_pipe_send_prior(mqtt_quictran_pipe *p, nni_aio *aio) p->qosmax == 0? *header &= 0XF9: NNI_ARG_UNUSED(*header); } } - // check max packet size - if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) { - nni_aio_finish_error(aio, UNSPECIFIED_ERROR); - return; - } + } + + // check max packet size for v311 and v5 + if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) { + nni_aio_finish_error(aio, UNSPECIFIED_ERROR); + return; } niov = 0; @@ -954,12 +957,13 @@ mqtt_quictran_pipe_send_start(mqtt_quictran_pipe *p) p->qosmax == 0? *header &= 0XF9: NNI_ARG_UNUSED(*header); } } - // check max packet size - if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) { - txaio = p->txaio; - nni_aio_finish_error(txaio, UNSPECIFIED_ERROR); - return; - } + } + + // check max packet size for v311 and v5 + if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) { + txaio = p->txaio; + nni_aio_finish_error(txaio, UNSPECIFIED_ERROR); + return; } txaio = p->txaio; @@ -1127,10 +1131,12 @@ mqtt_quictran_data_pipe_start( ep->refcnt++; - p->conn = conn; - p->ep = ep; - p->rcvmax = 0; - p->sndmax = 65535; + p->conn = conn; + p->ep = ep; + p->packmax = 0; + p->rcvmax = 0; + p->sndmax = 65535; + #ifdef NNG_HAVE_MQTT_BROKER p->cparam = NULL; #endif diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 995746b94..e6a68b70d 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -1385,7 +1385,7 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) } if (d->priority != -1) { - printf("Ready to create a quic stream with priority: %d\n", d->priority); + log_info("Ready to create a quic stream with priority: %d\n", d->priority); MsQuic->SetParam(strm, QUIC_PARAM_STREAM_PRIORITY, sizeof(int), &d->priority); } From 63d4b4df96246760855c67429a25d055eac0ef32 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 15 May 2024 05:58:10 -0400 Subject: [PATCH 4/4] * FIX [trans/quic] packmax should be applied to both mqttv311 and mqtv5. Signed-off-by: wanghaemq --- src/mqtt/transport/tcp/mqtt_tcp.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/mqtt/transport/tcp/mqtt_tcp.c b/src/mqtt/transport/tcp/mqtt_tcp.c index bd8aef050..465094c3c 100644 --- a/src/mqtt/transport/tcp/mqtt_tcp.c +++ b/src/mqtt/transport/tcp/mqtt_tcp.c @@ -781,12 +781,13 @@ mqtt_tcptran_pipe_send_start(mqtt_tcptran_pipe *p) p->qosmax == 0 ? *header &= 0XF9 : NNI_ARG_UNUSED(*header); } } - // check max packet size - if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) { - txaio = p->txaio; - nni_aio_finish_error(txaio, UNSPECIFIED_ERROR); - return; - } + } + + // check max packet size + if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) { + txaio = p->txaio; + nni_aio_finish_error(txaio, UNSPECIFIED_ERROR); + return; } txaio = p->txaio;