-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix some gap between nng reap thread and msquic. #947
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
56c4952
* FIX [quic] Fix the double free when cancel stream send.
wanghaEMQ 82e7111
* NEW [quic] Add refcnt for nni quic conn to avoid the data race in f…
wanghaEMQ b257f19
* FIX [trans/quic] Fix the max size error in setting.
wanghaEMQ 63d4b4d
* FIX [trans/quic] packmax should be applied to both mqttv311 and mqtv5.
wanghaEMQ File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,7 @@ struct nni_quic_conn { | |
nni_list readq; | ||
nni_list writeq; | ||
bool closed; | ||
nni_atomic_int ref; | ||
nni_mtx mtx; | ||
nni_aio * dial_aio; | ||
// nni_aio * qstrmaio; // Link to msquic_strm_cb | ||
|
@@ -103,6 +104,7 @@ static void quic_dialer_cb(void *arg); | |
static void quic_stream_error(void *arg, int err); | ||
static void quic_stream_close(void *arg); | ||
static void quic_stream_dowrite(nni_quic_conn *c); | ||
static void quic_stream_rele(nni_quic_conn *c); | ||
|
||
static QUIC_STATUS verify_peer_cert_tls(QUIC_CERTIFICATE* cert, QUIC_CERTIFICATE* chain, char *ca); | ||
|
||
|
@@ -531,6 +533,10 @@ quic_stream_cb(int events, void *arg, int rc) | |
nni_aio_list_remove(aio); | ||
QUIC_BUFFER *buf = nni_aio_get_input(aio, 0); | ||
free(buf); | ||
// XXX #[FORCANCEL] | ||
nni_msg *m; | ||
if ((m = nni_aio_get_msg(aio)) != NULL) | ||
nni_msg_free(m); | ||
if (rc != 0) | ||
nni_aio_finish_error(aio, rc); | ||
else | ||
|
@@ -561,11 +567,10 @@ quic_stream_cb(int events, void *arg, int rc) | |
// case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: | ||
case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: | ||
// case QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED: | ||
quic_stream_error(arg, NNG_ECONNSHUT); | ||
// Marked it as closed, prevent explicit shutdown | ||
c->closed = true; | ||
// It's the only place to free msquic stream | ||
msquic_strm_fini(c->qstrm); | ||
quic_stream_error(arg, NNG_ECONNSHUT); | ||
quic_stream_rele(c); | ||
break; | ||
default: | ||
break; | ||
|
@@ -577,12 +582,20 @@ static void | |
quic_stream_fini(void *arg) | ||
{ | ||
nni_quic_conn *c = arg; | ||
quic_stream_close(c); | ||
NNI_FREE_STRUCT(c); | ||
} | ||
|
||
static void | ||
quic_stream_rele(nni_quic_conn *c) | ||
{ | ||
quic_stream_close(c); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wow a big change |
||
if (c->dialer) { | ||
nni_msquic_quic_dialer_rele(c->dialer); | ||
} | ||
NNI_FREE_STRUCT(c); | ||
if (nni_atomic_dec_nv(&c->ref) != 0) { | ||
return; | ||
} | ||
quic_stream_fini(c); | ||
} | ||
|
||
//static nni_reap_list quic_reap_list = { | ||
|
@@ -593,7 +606,7 @@ static void | |
quic_stream_free(void *arg) | ||
{ | ||
nni_quic_conn *c = arg; | ||
quic_stream_fini(c); | ||
quic_stream_rele(c); | ||
} | ||
|
||
// Notify upper layer that something happened. | ||
|
@@ -745,6 +758,14 @@ quic_stream_dowrite(nni_quic_conn *c) | |
} | ||
nni_aio_set_input(aio, 0, buf); | ||
|
||
// When streamsend is triggered. The msg is used by nng and msquic. | ||
// But if a cancelled operation happens right now. And nng cancel function | ||
// will free the msg immediately. But this msg is still be used in msquic. | ||
// So here we need a clone. And will free in #[FORCANCEL]. | ||
nni_msg *m; | ||
if ((m = nni_aio_get_msg(aio)) != NULL) | ||
nni_msg_clone(m); | ||
|
||
if (QUIC_FAILED(rv = MsQuic->StreamSend(c->qstrm, buf, | ||
naiov, QUIC_SEND_FLAG_NONE, NULL))) { | ||
log_error("Failed in StreamSend, 0x%x!", rv); | ||
|
@@ -841,6 +862,9 @@ nni_msquic_quic_alloc(nni_quic_conn **cp, nni_quic_dialer *d) | |
return (NNG_ENOMEM); | ||
} | ||
|
||
nni_atomic_init(&c->ref); | ||
nni_atomic_inc(&c->ref); | ||
|
||
c->closed = false; | ||
c->dialer = d; | ||
|
||
|
@@ -1010,7 +1034,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, | |
QUIC_BUFFER *buf = nni_aio_get_input(aio, 0); | ||
free(buf); | ||
Event->SEND_COMPLETE.ClientContext = NULL; | ||
nni_msg *msg = nni_aio_get_msg(aio); | ||
//nni_msg *msg = nni_aio_get_msg(aio); | ||
// free SUBSCRIBE/UNSUBSCRIBE QoS 1/2 PUBLISH msg here | ||
// nni_mqtt_packet_type t = nni_mqtt_msg_get_packet_type(msg); | ||
//nni_msg_free(msg); | ||
|
@@ -1361,7 +1385,7 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) | |
} | ||
|
||
if (d->priority != -1) { | ||
printf("Ready to create a quic stream with priority: %d\n", d->priority); | ||
log_info("Ready to create a quic stream with priority: %d\n", d->priority); | ||
MsQuic->SetParam(strm, QUIC_PARAM_STREAM_PRIORITY, sizeof(int), &d->priority); | ||
} | ||
|
||
|
@@ -1371,6 +1395,8 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) | |
MsQuic->StreamClose(strm); | ||
goto error; | ||
} | ||
// Stream is opened and started | ||
nni_atomic_inc(&c->ref); | ||
|
||
// Not ready for receiving | ||
MsQuic->StreamReceiveSetEnabled(strm, FALSE); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an essential fix