Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some gap between nng reap thread and msquic. #947

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions src/mqtt/transport/quic/mqtt_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ mqtt_quictran_pipe_init(void *arg, nni_pipe *npipe)
// nni_lmq_init(&p->rslmq, 10240);
p->busy = false;
p->closed = false;
p->packmax = 0xFFFF;
// set max value by default
p->packmax == 0 ? p->packmax = (uint32_t)0xFFFFFFFF : p->packmax;
p->qosmax = 2;
p->pingcnt = 1;
if (p->ismain == true) {
Expand Down Expand Up @@ -436,6 +437,7 @@ mqtt_quictran_pipe_nego_cb(void *arg)
goto mqtt_error;
} else {
p->packmax = data->p_value.u32;
log_info("Set max packet size as %ld", p->packmax);
}
}
data = property_get_value(ep->property, PUBLISH_MAXIMUM_QOS);
Expand Down Expand Up @@ -871,11 +873,12 @@ mqtt_quictran_pipe_send_prior(mqtt_quictran_pipe *p, nni_aio *aio)
p->qosmax == 0? *header &= 0XF9: NNI_ARG_UNUSED(*header);
}
}
// check max packet size
if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) {
nni_aio_finish_error(aio, UNSPECIFIED_ERROR);
return;
}
}

// check max packet size for v311 and v5
if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) {
nni_aio_finish_error(aio, UNSPECIFIED_ERROR);
return;
}

niov = 0;
Expand Down Expand Up @@ -954,12 +957,13 @@ mqtt_quictran_pipe_send_start(mqtt_quictran_pipe *p)
p->qosmax == 0? *header &= 0XF9: NNI_ARG_UNUSED(*header);
}
}
// check max packet size
if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) {
txaio = p->txaio;
nni_aio_finish_error(txaio, UNSPECIFIED_ERROR);
return;
}
}

// check max packet size for v311 and v5
if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) {
txaio = p->txaio;
nni_aio_finish_error(txaio, UNSPECIFIED_ERROR);
return;
}

txaio = p->txaio;
Expand Down Expand Up @@ -1127,10 +1131,12 @@ mqtt_quictran_data_pipe_start(

ep->refcnt++;

p->conn = conn;
p->ep = ep;
p->rcvmax = 0;
p->sndmax = 65535;
p->conn = conn;
p->ep = ep;
p->packmax = 0;
p->rcvmax = 0;
p->sndmax = 65535;

#ifdef NNG_HAVE_MQTT_BROKER
p->cparam = NULL;
#endif
Expand Down
13 changes: 7 additions & 6 deletions src/mqtt/transport/tcp/mqtt_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,13 @@ mqtt_tcptran_pipe_send_start(mqtt_tcptran_pipe *p)
p->qosmax == 0 ? *header &= 0XF9 : NNI_ARG_UNUSED(*header);
}
}
// check max packet size
if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) {
txaio = p->txaio;
nni_aio_finish_error(txaio, UNSPECIFIED_ERROR);
return;
}
}

// check max packet size
if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) {
txaio = p->txaio;
nni_aio_finish_error(txaio, UNSPECIFIED_ERROR);
return;
}

txaio = p->txaio;
Expand Down
42 changes: 34 additions & 8 deletions src/supplemental/quic/msquic_dial.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct nni_quic_conn {
nni_list readq;
nni_list writeq;
bool closed;
nni_atomic_int ref;
nni_mtx mtx;
nni_aio * dial_aio;
// nni_aio * qstrmaio; // Link to msquic_strm_cb
Expand Down Expand Up @@ -103,6 +104,7 @@ static void quic_dialer_cb(void *arg);
static void quic_stream_error(void *arg, int err);
static void quic_stream_close(void *arg);
static void quic_stream_dowrite(nni_quic_conn *c);
static void quic_stream_rele(nni_quic_conn *c);

static QUIC_STATUS verify_peer_cert_tls(QUIC_CERTIFICATE* cert, QUIC_CERTIFICATE* chain, char *ca);

Expand Down Expand Up @@ -531,6 +533,10 @@ quic_stream_cb(int events, void *arg, int rc)
nni_aio_list_remove(aio);
QUIC_BUFFER *buf = nni_aio_get_input(aio, 0);
free(buf);
// XXX #[FORCANCEL]
nni_msg *m;
if ((m = nni_aio_get_msg(aio)) != NULL)
nni_msg_free(m);
if (rc != 0)
nni_aio_finish_error(aio, rc);
else
Expand Down Expand Up @@ -561,11 +567,10 @@ quic_stream_cb(int events, void *arg, int rc)
// case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE:
case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE:
// case QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED:
quic_stream_error(arg, NNG_ECONNSHUT);
// Marked it as closed, prevent explicit shutdown
c->closed = true;
// It's the only place to free msquic stream
msquic_strm_fini(c->qstrm);
quic_stream_error(arg, NNG_ECONNSHUT);
quic_stream_rele(c);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an essential fix

break;
default:
break;
Expand All @@ -577,12 +582,20 @@ static void
quic_stream_fini(void *arg)
{
nni_quic_conn *c = arg;
quic_stream_close(c);
NNI_FREE_STRUCT(c);
}

static void
quic_stream_rele(nni_quic_conn *c)
{
quic_stream_close(c);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow a big change

if (c->dialer) {
nni_msquic_quic_dialer_rele(c->dialer);
}
NNI_FREE_STRUCT(c);
if (nni_atomic_dec_nv(&c->ref) != 0) {
return;
}
quic_stream_fini(c);
}

//static nni_reap_list quic_reap_list = {
Expand All @@ -593,7 +606,7 @@ static void
quic_stream_free(void *arg)
{
nni_quic_conn *c = arg;
quic_stream_fini(c);
quic_stream_rele(c);
}

// Notify upper layer that something happened.
Expand Down Expand Up @@ -745,6 +758,14 @@ quic_stream_dowrite(nni_quic_conn *c)
}
nni_aio_set_input(aio, 0, buf);

// When streamsend is triggered. The msg is used by nng and msquic.
// But if a cancelled operation happens right now. And nng cancel function
// will free the msg immediately. But this msg is still be used in msquic.
// So here we need a clone. And will free in #[FORCANCEL].
nni_msg *m;
if ((m = nni_aio_get_msg(aio)) != NULL)
nni_msg_clone(m);

if (QUIC_FAILED(rv = MsQuic->StreamSend(c->qstrm, buf,
naiov, QUIC_SEND_FLAG_NONE, NULL))) {
log_error("Failed in StreamSend, 0x%x!", rv);
Expand Down Expand Up @@ -841,6 +862,9 @@ nni_msquic_quic_alloc(nni_quic_conn **cp, nni_quic_dialer *d)
return (NNG_ENOMEM);
}

nni_atomic_init(&c->ref);
nni_atomic_inc(&c->ref);

c->closed = false;
c->dialer = d;

Expand Down Expand Up @@ -1010,7 +1034,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context,
QUIC_BUFFER *buf = nni_aio_get_input(aio, 0);
free(buf);
Event->SEND_COMPLETE.ClientContext = NULL;
nni_msg *msg = nni_aio_get_msg(aio);
//nni_msg *msg = nni_aio_get_msg(aio);
// free SUBSCRIBE/UNSUBSCRIBE QoS 1/2 PUBLISH msg here
// nni_mqtt_packet_type t = nni_mqtt_msg_get_packet_type(msg);
//nni_msg_free(msg);
Expand Down Expand Up @@ -1361,7 +1385,7 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d)
}

if (d->priority != -1) {
printf("Ready to create a quic stream with priority: %d\n", d->priority);
log_info("Ready to create a quic stream with priority: %d\n", d->priority);
MsQuic->SetParam(strm, QUIC_PARAM_STREAM_PRIORITY, sizeof(int), &d->priority);
}

Expand All @@ -1371,6 +1395,8 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d)
MsQuic->StreamClose(strm);
goto error;
}
// Stream is opened and started
nni_atomic_inc(&c->ref);

// Not ready for receiving
MsQuic->StreamReceiveSetEnabled(strm, FALSE);
Expand Down
Loading