diff --git a/include/nng/exchange/exchange_client.h b/include/nng/exchange/exchange_client.h index d1577fe07..cec77b912 100644 --- a/include/nng/exchange/exchange_client.h +++ b/include/nng/exchange/exchange_client.h @@ -13,6 +13,7 @@ typedef struct exchange_node_s exchange_node_t; struct exchange_node_s { exchange_t *ex; + exchange_sock_t *sock; nni_aio saio; nni_lmq send_messages; nni_list_node exnode; @@ -22,6 +23,7 @@ struct exchange_node_s { struct exchange_sock_s { nni_mtx mtx; + nni_atomic_bool closed; nni_list ex_queue; }; diff --git a/src/mqtt/protocol/exchange/exchange_client.c b/src/mqtt/protocol/exchange/exchange_client.c index 85661e2f8..eb4fc5836 100644 --- a/src/mqtt/protocol/exchange/exchange_client.c +++ b/src/mqtt/protocol/exchange/exchange_client.c @@ -33,8 +33,9 @@ exchange_add_ex(exchange_sock_t *s, exchange_t *ex) node->isBusy = false; node->ex = ex; + node->sock = s; - nni_aio_init(&node->saio, exchange_send_cb, ex); + nni_aio_init(&node->saio, exchange_send_cb, node); nni_lmq_init(&node->send_messages, 1024); NNI_LIST_NODE_INIT(&node->exnode); @@ -53,6 +54,8 @@ exchange_sock_init(void *arg, nni_sock *sock) NNI_ARG_UNUSED(sock); exchange_sock_t *s = arg; + nni_atomic_init_bool(&s->closed); + nni_atomic_set_bool(&s->closed, false); nni_mtx_init(&s->mtx); NNI_LIST_INIT(&s->ex_queue, exchange_node_t, exnode); @@ -63,6 +66,25 @@ static void exchange_sock_fini(void *arg) { exchange_sock_t *s = arg; + exchange_node_t *ex_node; + + NNI_LIST_FOREACH (&s->ex_queue, ex_node) { + nni_aio_fini(&ex_node->saio); + nni_lmq_fini(&ex_node->send_messages); + nni_mtx_fini(&ex_node->mtx); + exchange_release(ex_node->ex); + } + + ex_node = NULL; + while (!nni_list_empty(&s->ex_queue)) { + ex_node = nni_list_last(&s->ex_queue); + if (ex_node) { + nni_list_remove(&s->ex_queue, ex_node); + nng_free(ex_node, sizeof(*ex_node)); + ex_node = NULL; + } + } + nni_mtx_fini(&s->mtx); return; } @@ -80,18 +102,9 @@ exchange_sock_close(void *arg) exchange_sock_t *s = arg; exchange_node_t *ex_node; + nni_atomic_set_bool(&s->closed, true); NNI_LIST_FOREACH (&s->ex_queue, ex_node) { - exchange_release(ex_node->ex); - nni_lmq_fini(&ex_node->send_messages); - } - - ex_node = NULL; - while (!nni_list_empty(&s->ex_queue)) { - ex_node = nni_list_last(&s->ex_queue); - if (ex_node) { - nni_list_remove(&s->ex_queue, ex_node); - nng_free(ex_node, sizeof(*ex_node)); - } + nni_aio_close(&ex_node->saio); } return; @@ -157,7 +170,23 @@ exchange_send_cb(void *arg) exchange_node_t *ex_node = arg; nni_msg * msg = NULL; + if (ex_node == NULL) { + return; + } + + exchange_sock_t *s = ex_node->sock; + if (nni_atomic_get_bool(&s->closed)) { + // This occurs if the mqtt_pipe_close has been called. + // In that case we don't want any more processing. + return; + } + nni_mtx_lock(&ex_node->mtx); + if (nni_aio_result(&ex_node->saio) != 0) { + nni_mtx_unlock(&ex_node->mtx); + return; + } + if (nni_lmq_get(&ex_node->send_messages, &msg) == 0) { nni_mtx_unlock(&ex_node->mtx); (void)exchange_handle_msg(ex_node->ex, msg);